blob: 862cbdd099cd43776911084589db8b0a264fbd81 [file] [log] [blame]
// Copyright 2016 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "peridot/lib/firebase/firebase_impl.h"
#include <memory>
#include <sstream>
#include <utility>
#include <lib/fit/function.h>
#include <lib/fsl/vmo/strings.h>
#include <lib/fxl/functional/make_copyable.h>
#include <lib/fxl/logging.h>
#include <lib/fxl/strings/ascii.h>
#include <lib/fxl/strings/join_strings.h>
#include "peridot/lib/socket/socket_drainer_client.h"
namespace firebase {
namespace http = ::fuchsia::net::oldhttp;
namespace {
fit::function<http::URLRequest()> MakeRequest(const std::string& url,
const std::string& method,
const std::string& message,
bool stream_request = false) {
fsl::SizedVmo body;
if (!message.empty()) {
if (!fsl::VmoFromString(message, &body)) {
FXL_LOG(ERROR) << "Unable to create VMO from string.";
return nullptr;
}
}
return fxl::MakeCopyable(
[url, method, body = std::move(body), stream_request]() {
http::URLRequest request;
request.url = url;
request.method = method;
request.auto_follow_redirects = true;
if (body) {
fsl::SizedVmo duplicated_body;
zx_status_t status =
body.Duplicate(ZX_RIGHTS_BASIC | ZX_RIGHT_READ, &duplicated_body);
if (status != ZX_OK) {
FXL_LOG(WARNING) << "Unable to duplicate a vmo. Status: " << status;
return http::URLRequest();
}
request.body = http::URLBody::New();
request.body->set_buffer(std::move(duplicated_body).ToTransport());
}
if (stream_request) {
http::HttpHeader accept_header;
accept_header.name = "Accept";
accept_header.value = "text/event-stream";
request.headers.push_back(std::move(accept_header));
}
return request;
});
}
} // namespace
struct FirebaseImpl::WatchData {
WatchData();
~WatchData();
callback::AutoCancel request;
std::unique_ptr<EventStream> event_stream;
std::unique_ptr<socket::SocketDrainerClient> drainer;
};
FirebaseImpl::WatchData::WatchData() {}
FirebaseImpl::WatchData::~WatchData() {}
FirebaseImpl::FirebaseImpl(network_wrapper::NetworkWrapper* network_wrapper,
const std::string& db_id, const std::string& prefix)
: network_wrapper_(network_wrapper), api_url_(BuildApiUrl(db_id, prefix)) {
FXL_DCHECK(network_wrapper_);
}
FirebaseImpl::~FirebaseImpl() {}
void FirebaseImpl::Get(
const std::string& key, const std::vector<std::string>& query_params,
fit::function<void(Status status, std::unique_ptr<rapidjson::Value> value)>
callback) {
auto request_callback = [callback = std::move(callback)](
Status status, const std::string& response) {
if (status != Status::OK) {
callback(status, std::make_unique<rapidjson::Value>());
return;
}
auto document = std::make_unique<rapidjson::Document>();
document->Parse(response.c_str(), response.size());
if (document->HasParseError()) {
callback(Status::PARSE_ERROR, std::make_unique<rapidjson::Value>());
return;
}
callback(Status::OK, std::move(document));
};
Request(BuildRequestUrl(key, query_params), "GET", "",
std::move(request_callback));
}
void FirebaseImpl::Put(const std::string& key,
const std::vector<std::string>& query_params,
const std::string& data,
fit::function<void(Status status)> callback) {
Request(BuildRequestUrl(key, query_params), "PUT", data,
[callback = std::move(callback)](Status status,
const std::string& response) {
// Ignore the response body, which is the same data we sent to the
// server.
callback(status);
});
}
void FirebaseImpl::Patch(const std::string& key,
const std::vector<std::string>& query_params,
const std::string& data,
fit::function<void(Status status)> callback) {
Request(BuildRequestUrl(key, query_params), "PATCH", data,
[callback = std::move(callback)](Status status,
const std::string& response) {
// Ignore the response body, which is the same data we sent to the
// server.
callback(status);
});
}
void FirebaseImpl::Delete(const std::string& key,
const std::vector<std::string>& query_params,
fit::function<void(Status status)> callback) {
Request(
BuildRequestUrl(key, query_params), "DELETE", "",
[callback = std::move(callback)](
Status status, const std::string& response) { callback(status); });
}
void FirebaseImpl::Watch(const std::string& key,
const std::vector<std::string>& query_params,
WatchClient* watch_client) {
watch_data_[watch_client] = std::make_unique<WatchData>();
watch_data_[watch_client]->request.Reset(network_wrapper_->Request(
MakeRequest(BuildRequestUrl(key, query_params), "GET", "", true),
[this, watch_client](http::URLResponse response) {
OnStream(watch_client, std::move(response));
}));
}
void FirebaseImpl::UnWatch(WatchClient* watch_client) {
watch_data_.erase(watch_client);
}
std::string FirebaseImpl::BuildApiUrl(const std::string& db_id,
const std::string& prefix) {
std::string api_url = "https://" + db_id + ".firebaseio.com";
if (!prefix.empty()) {
FXL_DCHECK(prefix.front() != '/');
FXL_DCHECK(prefix.back() != '/');
api_url.append("/");
api_url.append(prefix);
}
FXL_DCHECK(api_url.back() != '/');
return api_url;
}
std::string FirebaseImpl::BuildRequestUrl(
const std::string& key,
const std::vector<std::string>& query_params) const {
std::ostringstream result;
result << api_url_;
result << "/" << key << ".json";
if (query_params.empty()) {
return result.str();
}
result << "?" << fxl::JoinStrings(query_params, "&");
return result.str();
}
void FirebaseImpl::Request(
const std::string& url, const std::string& method,
const std::string& message,
fit::function<void(Status status, std::string response)> callback) {
requests_.emplace(network_wrapper_->Request(
MakeRequest(url, method, message),
[this,
callback = std::move(callback)](http::URLResponse response) mutable {
OnResponse(std::move(callback), std::move(response));
}));
}
void FirebaseImpl::OnResponse(
fit::function<void(Status status, std::string response)> callback,
http::URLResponse response) {
if (response.error) {
FXL_LOG(ERROR) << response.url << " error " << response.error->description;
callback(Status::NETWORK_ERROR, "");
return;
}
if (response.status_code != 200 && response.status_code != 204) {
const std::string& url = response.url;
const std::string& status_line = response.status_line;
FXL_DCHECK(response.body->is_stream());
auto& drainer = drainers_.emplace();
drainer.Start(std::move(response.body->stream()),
[callback = std::move(callback), url,
status_line](const std::string& body) {
FXL_LOG(ERROR)
<< url << " error " << status_line << ":" << std::endl
<< body;
callback(Status::SERVER_ERROR, "");
});
return;
}
FXL_DCHECK(response.body->is_stream());
auto& drainer = drainers_.emplace();
drainer.Start(std::move(response.body->stream()),
[callback = std::move(callback)](const std::string& body) {
callback(Status::OK, body);
});
}
void FirebaseImpl::OnStream(WatchClient* watch_client,
http::URLResponse response) {
if (response.error) {
FXL_LOG(ERROR) << response.url << " error " << response.error->description;
watch_client->OnConnectionError();
watch_data_.erase(watch_client);
return;
}
FXL_DCHECK(response.body->is_stream());
if (response.status_code != 200 && response.status_code != 204) {
const std::string& url = response.url;
const std::string& status_line = response.status_line;
watch_data_[watch_client]->drainer =
std::make_unique<socket::SocketDrainerClient>();
watch_data_[watch_client]->drainer->Start(
std::move(response.body->stream()),
[this, watch_client, url, status_line](const std::string& body) {
FXL_LOG(ERROR) << url << " error " << status_line << ":" << std::endl
<< body;
watch_client->OnConnectionError();
watch_data_.erase(watch_client);
});
return;
}
watch_data_[watch_client]->event_stream = std::make_unique<EventStream>();
watch_data_[watch_client]->event_stream->Start(
std::move(response.body->stream()),
[this, watch_client](Status status, const std::string& event,
const std::string& data) {
OnStreamEvent(watch_client, status, event, data);
},
[this, watch_client]() { OnStreamComplete(watch_client); });
}
void FirebaseImpl::OnStreamComplete(WatchClient* watch_client) {
watch_data_[watch_client]->event_stream.reset();
watch_client->OnConnectionError();
watch_data_.erase(watch_client);
}
void FirebaseImpl::OnStreamEvent(WatchClient* watch_client, Status /*status*/,
const std::string& event,
const std::string& payload) {
if (event == "put" || event == "patch") {
rapidjson::Document parsed_payload;
parsed_payload.Parse(payload.c_str(), payload.size());
if (parsed_payload.HasParseError()) {
HandleMalformedEvent(watch_client, event, payload,
"failed to parse the event payload");
return;
}
// Both 'put' and 'patch' events must carry a dictionary of "path" and
// "data".
if (!parsed_payload.IsObject()) {
HandleMalformedEvent(watch_client, event, payload,
"event payload doesn't appear to be an object");
return;
}
if (!parsed_payload.HasMember("path") ||
!parsed_payload["path"].IsString()) {
HandleMalformedEvent(watch_client, event, payload,
"event payload doesn't contain the `path` string");
return;
}
if (!parsed_payload.HasMember("data")) {
HandleMalformedEvent(watch_client, event, payload,
"event payload doesn't contain the `data` member");
return;
}
if (event == "put") {
watch_client->OnPut(parsed_payload["path"].GetString(),
parsed_payload["data"]);
} else if (event == "patch") {
// In case of patch, data must be a dictionary itself.
if (!parsed_payload["data"].IsObject()) {
HandleMalformedEvent(
watch_client, event, payload,
"event payload `data` member doesn't appear to be an object");
return;
}
watch_client->OnPatch(parsed_payload["path"].GetString(),
parsed_payload["data"]);
} else {
FXL_NOTREACHED();
}
} else if (event == "keep-alive") {
// Do nothing.
} else if (event == "cancel") {
watch_client->OnCancel();
} else if (event == "auth_revoked") {
watch_client->OnAuthRevoked(payload);
} else {
HandleMalformedEvent(watch_client, event, payload,
"unrecognized event type");
}
}
void FirebaseImpl::HandleMalformedEvent(WatchClient* watch_client,
const std::string& event,
const std::string& payload,
const char error_description[]) {
FXL_LOG(ERROR) << "Error processing a Firebase event: " << error_description;
FXL_LOG(ERROR) << "Event: " << event;
FXL_LOG(ERROR) << "Data: " << payload;
watch_client->OnMalformedEvent();
}
} // namespace firebase