blob: 8e416c5c02b1e4d9faa5ae4aacc16832cb08731f [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "bus.h"
#include <lib/async/cpp/task.h>
#include <zircon/assert.h>
#include "callback_watch.h"
namespace netemul {
class WaitForClientsWatch : public CallbackWatch<Bus::WaitForClientsCallback> {
public:
using MissingClientsCallback =
fit::function<std::vector<std::string>(const std::vector<std::string>&)>;
WaitForClientsWatch(std::vector<std::string> clients, Bus::WaitForClientsCallback callback)
: CallbackWatch(std::move(callback)), clients_(std::move(clients)) {}
const std::vector<std::string>& clients() const { return clients_; }
void OnTimeout() override {
ZX_ASSERT(missing_callback_);
FireCallback(false, fidl::VectorPtr<std::string>(missing_callback_(clients_)));
}
void SetMissingClientsCallback(MissingClientsCallback callback) {
missing_callback_ = std::move(callback);
}
private:
std::vector<std::string> clients_;
MissingClientsCallback missing_callback_;
};
class WaitForEventWatch : public CallbackWatch<Bus::WaitForEventCallback> {
public:
WaitForEventWatch(Bus::FEvent event, Bus::WaitForEventCallback callback)
: CallbackWatch(std::move(callback)), event_(std::move(event)) {}
void OnTimeout() override { FireCallback(false); }
bool EventMatches(const Bus::FEvent& cmp) const {
if (event_.has_code() && (!cmp.has_code() || cmp.code() != event_.code())) {
return false;
} else if (event_.has_message() && (!cmp.has_message() || cmp.message() != event_.message())) {
return false;
} else if (event_.has_arguments() &&
(!cmp.has_arguments() || cmp.arguments().size() != event_.arguments().size() ||
memcmp(cmp.arguments().data(), event_.arguments().data(),
event_.arguments().size()) != 0)) {
return false;
} else {
return true;
}
}
private:
Bus::FEvent event_;
};
class BusBinding : public fuchsia::netemul::sync::Bus {
public:
using FBus = fuchsia::netemul::sync::Bus;
using FEvent = fuchsia::netemul::sync::Event;
BusBinding(std::string name, ::netemul::Bus* parent, async_dispatcher_t* dispatcher,
fidl::InterfaceRequest<FBus> request)
: client_name_(std::move(name)),
parent_(parent),
binding_(this, std::move(request), dispatcher) {}
void Publish(FEvent data) override { parent_->Publish(std::move(data), client_name_); }
void EnsurePublish(FEvent data, FBus::EnsurePublishCallback callback) override {
parent_->Publish(std::move(data), client_name_);
callback();
}
void GetClients(GetClientsCallback callback) override {
std::vector<std::string> data;
const auto& clients = parent_->clients();
data.reserve(clients.size());
for (const auto& x : clients) {
data.push_back(x.first);
}
callback(std::move(data));
}
void WaitForEvent(fuchsia::netemul::sync::Event data, int64_t timeout,
WaitForEventCallback callback) override {
auto watcher = std::make_unique<WaitForEventWatch>(std::move(data), std::move(callback));
if (timeout > 0) {
watcher->PostTimeout(binding_.dispatcher(), timeout);
}
event_watches_.push_back(std::move(watcher));
}
void WaitForClients(::std::vector<::std::string> clients, int64_t timeout,
WaitForClientsCallback callback) override {
parent_->WaitForClients(std::move(clients), timeout, std::move(callback));
}
void OnBusData(const std::string& from, FEvent event) {
if (from != client_name_) {
// upon publishing any events on the bus, check all event waits
for (auto i = event_watches_.begin(); i != event_watches_.end();) {
if ((*i)->valid()) {
if ((*i)->EventMatches(event)) {
(*i)->FireCallback(true);
i = event_watches_.erase(i);
} else {
++i;
}
} else {
i = event_watches_.erase(i);
}
}
binding_.events().OnBusData(std::move(event));
}
}
void OnClientAttached(const std::string& client) {
if (client != client_name_) {
binding_.events().OnClientAttached(client);
}
}
void OnClientDetached(const std::string& client) {
if (client != client_name_) {
binding_.events().OnClientDetached(client);
}
}
void SetErrorHandler(fit::function<void(zx_status_t)> handler) {
binding_.set_error_handler(std::move(handler));
}
private:
const std::string client_name_;
// Pointer to parent bus. Not owned.
::netemul::Bus* parent_;
fidl::Binding<FBus> binding_;
std::vector<std::unique_ptr<WaitForEventWatch>> event_watches_;
};
Bus::Bus(async_dispatcher_t* dispatcher) : dispatcher_(dispatcher) {}
void Bus::Subscribe(const std::string& clientName, fidl::InterfaceRequest<Bus::FBus> request) {
// reject if clientName is already in clients
if (clients_.find(clientName) != clients_.end()) {
return;
}
auto binding = std::make_unique<BusBinding>(clientName, this, dispatcher_, std::move(request));
binding->SetErrorHandler([this, clientName](zx_status_t err) {
// notify other clients this client is gone
NotifyClientDetached(clientName);
clients_.erase(clientName);
});
// For every currently existing client, fire an OnClientAttachedEvent, so the client can get a
// view on all the other clients that are on the bus:
for (auto& c : clients_) {
binding->OnClientAttached(c.first);
}
// notify other clients of new client
NotifyClientAttached(clientName);
clients_.insert(std::pair<std::string, ClientBinding>(clientName, std::move(binding)));
// Whenever a new client attaches, we check the client watches to fire
// anything pending
for (auto i = client_watches_.begin(); i != client_watches_.end();) {
if (CheckClientWatch(i->get())) {
i = client_watches_.erase(i);
} else {
++i;
}
}
}
void Bus::NotifyClientAttached(const std::string& client) {
for (const auto& cli : clients_) {
cli.second->OnClientAttached(client);
}
}
void Bus::NotifyClientDetached(const std::string& client) {
for (const auto& cli : clients_) {
cli.second->OnClientDetached(client);
}
}
const std::unordered_map<std::string, Bus::ClientBinding>& Bus::clients() { return clients_; }
void Bus::Publish(Bus::FEvent data, const std::string& from) {
for (const auto& cli : clients_) {
Bus::FEvent clone;
ZX_ASSERT(data.Clone(&clone) == ZX_OK);
cli.second->OnBusData(from, std::move(clone));
}
}
void Bus::WaitForClients(std::vector<std::string> clients, int64_t timeout,
WaitForClientsCallback callback) {
auto client_watch =
std::make_unique<WaitForClientsWatch>(std::move(clients), std::move(callback));
if (CheckClientWatch(client_watch.get())) {
return;
} else {
if (timeout > 0) {
client_watch->SetMissingClientsCallback([this](const std::vector<std::string>& wait) {
std::vector<std::string> missing;
for (const auto& client : wait) {
if (clients_.find(client) == clients_.end()) {
missing.push_back(client);
}
}
return missing;
});
client_watch->PostTimeout(dispatcher_, timeout);
}
client_watches_.push_back(std::move(client_watch));
}
}
bool Bus::CheckClientWatch(WaitForClientsWatch* watch) {
if (!watch->valid()) {
// already fired
return true;
}
for (const auto& cli : watch->clients()) {
if (clients_.find(cli) == clients_.end()) {
return false;
}
}
watch->FireCallback(true, nullptr);
return true;
}
Bus::~Bus() = default;
} // namespace netemul