// Copyright 2016 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "peridot/lib/firebase/firebase_impl.h"

#include <memory>
#include <sstream>
#include <utility>

#include <lib/fit/function.h>
#include <lib/fsl/vmo/strings.h>
#include <src/lib/fxl/logging.h>
#include <src/lib/fxl/strings/ascii.h>
#include <src/lib/fxl/strings/join_strings.h>

#include "peridot/lib/socket/socket_drainer_client.h"

namespace firebase {

namespace http = ::fuchsia::net::oldhttp;

namespace {

fit::function<http::URLRequest()> MakeRequest(const std::string& url, const std::string& method,
                                              const std::string& message,
                                              bool stream_request = false) {
  fsl::SizedVmo body;
  if (!message.empty()) {
    if (!fsl::VmoFromString(message, &body)) {
      FXL_LOG(ERROR) << "Unable to create VMO from string.";
      return nullptr;
    }
  }

  return [url, method, body = std::move(body), stream_request]() {
    http::URLRequest request;
    request.url = url;
    request.method = method;
    request.auto_follow_redirects = true;
    if (body) {
      fsl::SizedVmo duplicated_body;
      zx_status_t status = body.Duplicate(ZX_RIGHTS_BASIC | ZX_RIGHT_READ, &duplicated_body);
      if (status != ZX_OK) {
        FXL_LOG(WARNING) << "Unable to duplicate a vmo. Status: " << status;
        return http::URLRequest();
      }
      request.body = http::URLBody::New();
      request.body->set_buffer(std::move(duplicated_body).ToTransport());
    }
    if (stream_request) {
      http::HttpHeader accept_header;
      accept_header.name = "Accept";
      accept_header.value = "text/event-stream";
      request.headers.emplace({std::move(accept_header)});
    }
    return request;
  };
}

}  // namespace

struct FirebaseImpl::WatchData {
  WatchData();
  ~WatchData();

  callback::AutoCancel request;
  std::unique_ptr<EventStream> event_stream;
  std::unique_ptr<socket::SocketDrainerClient> drainer;
};

FirebaseImpl::WatchData::WatchData() {}
FirebaseImpl::WatchData::~WatchData() {}

FirebaseImpl::FirebaseImpl(network_wrapper::NetworkWrapper* network_wrapper,
                           const std::string& db_id, const std::string& prefix)
    : network_wrapper_(network_wrapper), api_url_(BuildApiUrl(db_id, prefix)) {
  FXL_DCHECK(network_wrapper_);
}

FirebaseImpl::~FirebaseImpl() {}

void FirebaseImpl::Get(
    const std::string& key, const std::vector<std::string>& query_params,
    fit::function<void(Status status, std::unique_ptr<rapidjson::Value> value)> callback) {
  auto request_callback = [callback = std::move(callback)](Status status,
                                                           const std::string& response) {
    if (status != Status::OK) {
      callback(status, std::make_unique<rapidjson::Value>());
      return;
    }

    auto document = std::make_unique<rapidjson::Document>();
    document->Parse(response.c_str(), response.size());
    if (document->HasParseError()) {
      callback(Status::PARSE_ERROR, std::make_unique<rapidjson::Value>());
      return;
    }

    callback(Status::OK, std::move(document));
  };

  Request(BuildRequestUrl(key, query_params), "GET", "", std::move(request_callback));
}

void FirebaseImpl::Put(const std::string& key, const std::vector<std::string>& query_params,
                       const std::string& data, fit::function<void(Status status)> callback) {
  Request(BuildRequestUrl(key, query_params), "PUT", data,
          [callback = std::move(callback)](Status status, const std::string& response) {
            // Ignore the response body, which is the same data we sent to the
            // server.
            callback(status);
          });
}

void FirebaseImpl::Patch(const std::string& key, const std::vector<std::string>& query_params,
                         const std::string& data, fit::function<void(Status status)> callback) {
  Request(BuildRequestUrl(key, query_params), "PATCH", data,
          [callback = std::move(callback)](Status status, const std::string& response) {
            // Ignore the response body, which is the same data we sent to the
            // server.
            callback(status);
          });
}

void FirebaseImpl::Delete(const std::string& key, const std::vector<std::string>& query_params,
                          fit::function<void(Status status)> callback) {
  Request(BuildRequestUrl(key, query_params), "DELETE", "",
          [callback = std::move(callback)](Status status, const std::string& response) {
            callback(status);
          });
}

void FirebaseImpl::Watch(const std::string& key, const std::vector<std::string>& query_params,
                         WatchClient* watch_client) {
  watch_data_[watch_client] = std::make_unique<WatchData>();
  watch_data_[watch_client]->request.Reset(
      network_wrapper_->Request(MakeRequest(BuildRequestUrl(key, query_params), "GET", "", true),
                                [this, watch_client](http::URLResponse response) {
                                  OnStream(watch_client, std::move(response));
                                }));
}

void FirebaseImpl::UnWatch(WatchClient* watch_client) { watch_data_.erase(watch_client); }

std::string FirebaseImpl::BuildApiUrl(const std::string& db_id, const std::string& prefix) {
  std::string api_url = "https://" + db_id + ".firebaseio.com";

  if (!prefix.empty()) {
    FXL_DCHECK(prefix.front() != '/');
    FXL_DCHECK(prefix.back() != '/');
    api_url.append("/");
    api_url.append(prefix);
  }

  FXL_DCHECK(api_url.back() != '/');
  return api_url;
}

std::string FirebaseImpl::BuildRequestUrl(const std::string& key,
                                          const std::vector<std::string>& query_params) const {
  std::ostringstream result;
  result << api_url_;
  result << "/" << key << ".json";
  if (query_params.empty()) {
    return result.str();
  }
  result << "?" << fxl::JoinStrings(query_params, "&");
  return result.str();
}

void FirebaseImpl::Request(const std::string& url, const std::string& method,
                           const std::string& message,
                           fit::function<void(Status status, std::string response)> callback) {
  requests_.emplace(network_wrapper_->Request(
      MakeRequest(url, method, message),
      [this, callback = std::move(callback)](http::URLResponse response) mutable {
        OnResponse(std::move(callback), std::move(response));
      }));
}

void FirebaseImpl::OnResponse(fit::function<void(Status status, std::string response)> callback,
                              http::URLResponse response) {
  if (response.error) {
    FXL_LOG(ERROR) << response.url << " error " << response.error->description;
    callback(Status::NETWORK_ERROR, "");
    return;
  }

  if (response.status_code != 200 && response.status_code != 204) {
    const std::string& url = response.url.value_or("");
    const std::string& status_line = response.status_line.value_or("");
    FXL_DCHECK(response.body->is_stream());
    auto& drainer = drainers_.emplace();
    drainer.Start(std::move(response.body->stream()),
                  [callback = std::move(callback), url, status_line](const std::string& body) {
                    FXL_LOG(ERROR) << url << " error " << status_line << ":" << std::endl << body;
                    callback(Status::SERVER_ERROR, "");
                  });
    return;
  }

  FXL_DCHECK(response.body->is_stream());
  auto& drainer = drainers_.emplace();
  drainer.Start(
      std::move(response.body->stream()),
      [callback = std::move(callback)](const std::string& body) { callback(Status::OK, body); });
}

void FirebaseImpl::OnStream(WatchClient* watch_client, http::URLResponse response) {
  if (response.error) {
    FXL_LOG(ERROR) << response.url << " error " << response.error->description;
    watch_client->OnConnectionError();
    watch_data_.erase(watch_client);
    return;
  }

  FXL_DCHECK(response.body->is_stream());

  if (response.status_code != 200 && response.status_code != 204) {
    const std::string& url = response.url.value_or("");
    const std::string& status_line = response.status_line.value_or("");
    watch_data_[watch_client]->drainer = std::make_unique<socket::SocketDrainerClient>();
    watch_data_[watch_client]->drainer->Start(
        std::move(response.body->stream()),
        [this, watch_client, url, status_line](const std::string& body) {
          FXL_LOG(ERROR) << url << " error " << status_line << ":" << std::endl << body;
          watch_client->OnConnectionError();
          watch_data_.erase(watch_client);
        });
    return;
  }

  watch_data_[watch_client]->event_stream = std::make_unique<EventStream>();
  watch_data_[watch_client]->event_stream->Start(
      std::move(response.body->stream()),
      [this, watch_client](Status status, const std::string& event, const std::string& data) {
        OnStreamEvent(watch_client, status, event, data);
      },
      [this, watch_client]() { OnStreamComplete(watch_client); });
}

void FirebaseImpl::OnStreamComplete(WatchClient* watch_client) {
  watch_data_[watch_client]->event_stream.reset();
  watch_client->OnConnectionError();
  watch_data_.erase(watch_client);
}

void FirebaseImpl::OnStreamEvent(WatchClient* watch_client, Status /*status*/,
                                 const std::string& event, const std::string& payload) {
  if (event == "put" || event == "patch") {
    rapidjson::Document parsed_payload;
    parsed_payload.Parse(payload.c_str(), payload.size());
    if (parsed_payload.HasParseError()) {
      HandleMalformedEvent(watch_client, event, payload, "failed to parse the event payload");
      return;
    }

    // Both 'put' and 'patch' events must carry a dictionary of "path" and
    // "data".
    if (!parsed_payload.IsObject()) {
      HandleMalformedEvent(watch_client, event, payload,
                           "event payload doesn't appear to be an object");
      return;
    }
    if (!parsed_payload.HasMember("path") || !parsed_payload["path"].IsString()) {
      HandleMalformedEvent(watch_client, event, payload,
                           "event payload doesn't contain the `path` string");
      return;
    }
    if (!parsed_payload.HasMember("data")) {
      HandleMalformedEvent(watch_client, event, payload,
                           "event payload doesn't contain the `data` member");
      return;
    }

    if (event == "put") {
      watch_client->OnPut(parsed_payload["path"].GetString(), parsed_payload["data"]);
    } else if (event == "patch") {
      // In case of patch, data must be a dictionary itself.
      if (!parsed_payload["data"].IsObject()) {
        HandleMalformedEvent(watch_client, event, payload,
                             "event payload `data` member doesn't appear to be an object");
        return;
      }

      watch_client->OnPatch(parsed_payload["path"].GetString(), parsed_payload["data"]);
    } else {
      FXL_NOTREACHED();
    }
  } else if (event == "keep-alive") {
    // Do nothing.
  } else if (event == "cancel") {
    watch_client->OnCancel();
  } else if (event == "auth_revoked") {
    watch_client->OnAuthRevoked(payload);
  } else {
    HandleMalformedEvent(watch_client, event, payload, "unrecognized event type");
  }
}

void FirebaseImpl::HandleMalformedEvent(WatchClient* watch_client, const std::string& event,
                                        const std::string& payload,
                                        const char error_description[]) {
  FXL_LOG(ERROR) << "Error processing a Firebase event: " << error_description;
  FXL_LOG(ERROR) << "Event: " << event;
  FXL_LOG(ERROR) << "Data: " << payload;
  watch_client->OnMalformedEvent();
}

}  // namespace firebase
