blob: fe8510c115d6e9b11bb662c038e8dfecd9b65516 [file] [log] [blame]
// Copyright 2017 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "peridot/lib/module_manifest_source/firebase_source.h"
#include <errno.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <thread>
#include <lib/async/cpp/task.h>
#include <lib/backoff/exponential_backoff.h>
#include <lib/fdio/watcher.h>
#include <lib/fxl/logging.h>
#include <lib/fxl/memory/weak_ptr.h>
#include <lib/network_wrapper/network_wrapper_impl.h>
#include "peridot/lib/fidl/json_xdr.h"
#include "peridot/lib/firebase/firebase_impl.h"
#include "peridot/lib/module_manifest/module_manifest_xdr.h"
#include "rapidjson/document.h"
namespace modular {
namespace http = ::fuchsia::net::oldhttp;
class FirebaseModuleManifestSource::Watcher : public firebase::WatchClient {
public:
Watcher(async_dispatcher_t* dispatcher, ModuleManifestSource::IdleFn idle_fn,
ModuleManifestSource::NewEntryFn new_fn,
ModuleManifestSource::RemovedEntryFn removed_fn,
fxl::WeakPtr<FirebaseModuleManifestSource> owner)
: dispatcher_(dispatcher),
idle_fn_(idle_fn),
new_fn_(new_fn),
removed_fn_(removed_fn),
reconnect_wait_seconds_(0),
owner_(owner) {}
~Watcher() override = default;
private:
void OnPut(const std::string& path, const rapidjson::Value& value) override {
// Successful connection established. Reset our reconnect counter.
reconnect_wait_seconds_ = 0;
// The first time we're called, we get a dump of all existing values as a
// JSON object at path "/".
//
// From then on, we get notifications of individual values changing with
// paths like "/module-id".
if (path == "/") {
if (value.IsNull()) {
idle_fn_();
return;
}
if (!value.IsObject()) {
FXL_LOG(ERROR) << "Got update at /, but it's not a JSON object??";
idle_fn_();
return;
}
for (rapidjson::Value::ConstMemberIterator it = value.MemberBegin();
it != value.MemberEnd(); ++it) {
// When we get updates, the path is "/<module id>", so we add "/" here
// to the member name.
ProcessEntry(path + it->name.GetString(), it->value);
}
// We've read all existing entries.
idle_fn_();
} else {
if (value.IsNull()) {
removed_fn_(path);
} else {
ProcessEntry(path, value);
}
}
}
void OnPatch(const std::string& path,
const rapidjson::Value& value) override {}
void OnCancel() override {}
void OnAuthRevoked(const std::string& reason) override {}
void OnMalformedEvent() override {}
void OnConnectionError() override {
idle_fn_();
// Simple exponential backoff counter.
if (reconnect_wait_seconds_ == 0) {
reconnect_wait_seconds_ = 1;
} else {
reconnect_wait_seconds_ *= 2;
if (reconnect_wait_seconds_ > 60)
reconnect_wait_seconds_ = 60;
}
// Try to reconnect.
FXL_LOG(INFO) << "Reconnecting to Firebase in " << reconnect_wait_seconds_
<< " seconds.";
async::PostDelayedTask(dispatcher_,
[owner = owner_, this]() {
if (!owner)
return;
owner->StartWatching(this);
},
zx::sec(reconnect_wait_seconds_));
}
void ProcessEntry(const std::string& name, const rapidjson::Value& value) {
// We have to deep-copy |it->value| because XdrRead() must have a
// rapidjson::Document.
rapidjson::Document doc;
doc.CopyFrom(value, doc.GetAllocator());
// Handle bad manifests, including older files expressed as an array.
// Any mismatch causes XdrRead to DCHECK.
if (!doc.IsObject()) {
FXL_LOG(WARNING) << "Ignored invalid manifest: " << name;
return;
}
fuchsia::modular::ModuleManifest entry;
if (!XdrRead(&doc, &entry, XdrModuleManifest)) {
FXL_LOG(WARNING) << "Could not parse Module manifest from: " << name;
return;
}
new_fn_(name, std::move(entry));
}
async_dispatcher_t* const dispatcher_;
ModuleManifestSource::IdleFn idle_fn_;
ModuleManifestSource::NewEntryFn new_fn_;
ModuleManifestSource::RemovedEntryFn removed_fn_;
int reconnect_wait_seconds_;
fxl::WeakPtr<FirebaseModuleManifestSource> owner_;
};
FirebaseModuleManifestSource::FirebaseModuleManifestSource(
async_dispatcher_t* dispatcher,
std::function<http::HttpServicePtr()> network_service_factory,
std::string db_id, std::string prefix)
: db_id_(db_id),
prefix_(prefix),
network_wrapper_(new network_wrapper::NetworkWrapperImpl(
dispatcher, std::make_unique<backoff::ExponentialBackoff>(),
std::move(network_service_factory))),
client_(
new firebase::FirebaseImpl(network_wrapper_.get(), db_id, prefix)),
weak_factory_(this) {}
FirebaseModuleManifestSource::~FirebaseModuleManifestSource() {
for (auto& watcher : watchers_) {
client_->UnWatch(watcher.get());
}
}
void FirebaseModuleManifestSource::Watch(async_dispatcher_t* dispatcher,
IdleFn idle_fn, NewEntryFn new_fn,
RemovedEntryFn removed_fn) {
auto watcher = std::make_unique<Watcher>(
dispatcher, std::move(idle_fn), std::move(new_fn), std::move(removed_fn),
weak_factory_.GetWeakPtr());
StartWatching(watcher.get());
watchers_.push_back(std::move(watcher));
}
void FirebaseModuleManifestSource::StartWatching(Watcher* watcher) {
client_->Watch("/manifests", {}, watcher);
}
} // namespace modular