blob: f710fd443ff85a28b78ecaa826dac56cc8ac5f8a [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "peridot/bin/ledger/p2p_provider/impl/p2p_provider_impl.h"
#include <algorithm>
#include <iterator>
#include <lib/fxl/files/file.h>
#include <lib/fxl/logging.h>
#include "peridot/bin/ledger/p2p_provider/impl/envelope_generated.h"
namespace p2p_provider {
namespace {
constexpr char kRespondingServiceName[] = "ledger-p2p-";
const uint16_t kCurrentVersion = 0;
bool ValidateHandshake(const Envelope* envelope, const Handshake** message) {
if (envelope->message_type() != EnvelopeMessage_Handshake) {
FXL_LOG(ERROR) << "Incorrect message type: " << envelope->message_type();
return false;
}
*message = static_cast<const Handshake*>(envelope->message());
if ((*message)->version() != kCurrentVersion) {
FXL_LOG(ERROR) << "Incorrect message version: " << (*message)->version();
return false;
}
return true;
}
} // namespace
P2PProviderImpl::P2PProviderImpl(
std::string host_name, fuchsia::netconnector::NetConnectorPtr net_connector,
std::unique_ptr<p2p_provider::UserIdProvider> user_id_provider)
: host_name_(std::move(host_name)),
net_connector_(std::move(net_connector)),
user_id_provider_(std::move(user_id_provider)) {}
P2PProviderImpl::~P2PProviderImpl() {}
void P2PProviderImpl::Start(Client* client) {
FXL_DCHECK(!client_);
FXL_DCHECK(client);
client_ = client;
user_id_provider_->GetUserId(
[this](UserIdProvider::Status status, std::string user_id) {
if (status != UserIdProvider::Status::OK) {
FXL_LOG(ERROR) << "Unable to retrieve the user ID necessary to start "
"the peer-to-peer provider.";
return;
}
user_id_ = user_id;
StartService();
});
}
bool P2PProviderImpl::SendMessage(fxl::StringView destination,
fxl::StringView data) {
flatbuffers::FlatBufferBuilder buffer;
flatbuffers::Offset<Message> message =
CreateMessage(buffer, convert::ToFlatBufferVector(&buffer, data));
flatbuffers::Offset<Envelope> envelope =
CreateEnvelope(buffer, EnvelopeMessage_Message, message.Union());
buffer.Finish(envelope);
char* buf = reinterpret_cast<char*>(buffer.GetBufferPointer());
size_t size = buffer.GetSize();
auto it = connection_map_.find(destination);
if (it == connection_map_.end()) {
return false;
}
it->second->SendMessage(fxl::StringView(buf, size));
return true;
}
void P2PProviderImpl::StartService() {
fidl::InterfaceHandle<fuchsia::sys::ServiceProvider> handle;
// When the service provider is reset and its connection cut, NetConnector
// stops responding for its services.
network_service_provider_.AddBinding(handle.NewRequest());
network_service_provider_.AddServiceForName(
[this](zx::channel channel) {
RemoteConnection& connection = connections_.emplace(host_name_);
connection.set_on_message(
[this, &connection](std::vector<uint8_t> data) {
ProcessHandshake(&connection, std::move(data), true,
fxl::StringView());
});
connection.Start(std::move(channel));
},
kRespondingServiceName + user_id_);
net_connector_->RegisterServiceProvider(kRespondingServiceName + user_id_,
std::move(handle));
ListenForNewDevices(fuchsia::netconnector::kInitialKnownDeviceNames);
}
void P2PProviderImpl::ProcessHandshake(RemoteConnection* connection,
std::vector<uint8_t> data,
bool should_send_handshake,
fxl::StringView network_remote_name) {
flatbuffers::Verifier verifier(
reinterpret_cast<const unsigned char*>(data.data()), data.size());
if (!VerifyEnvelopeBuffer(verifier)) {
// Wrong serialization, abort.
FXL_LOG(ERROR) << "The message received is malformed.";
connection->Disconnect();
return;
};
const Envelope* envelope = GetEnvelope(data.data());
const Handshake* message;
if (!ValidateHandshake(envelope, &message)) {
FXL_LOG(ERROR) << "The message received is not valid.";
connection->Disconnect();
return;
}
std::string remote_name(message->host_name()->begin(),
message->host_name()->end());
if (!network_remote_name.empty() && network_remote_name != remote_name) {
// The name of the remote device as given by the network is different from
// the self-declared name. Something is wrong here, let's abort.
FXL_LOG(ERROR) << "Network name " << network_remote_name
<< " different from declared name " << remote_name
<< ", aborting.";
connection->Disconnect();
return;
}
bool existed_before = false;
auto it = connection_map_.find(remote_name);
if (it != connection_map_.end()) {
if (remote_name < host_name_) {
it->second->Disconnect();
existed_before = true;
} else {
connection->Disconnect();
return;
}
}
connection_map_[remote_name] = connection;
connection->set_on_close([this, remote_name]() {
connection_map_.erase(remote_name);
OnDeviceChange(remote_name, DeviceChangeType::DELETED);
});
connection->set_on_message([this, remote_name](std::vector<uint8_t> data) {
Dispatch(remote_name, std::move(data));
});
if (should_send_handshake) {
// We send an handshake to signal to the other side the connection is
// indeed established.
flatbuffers::FlatBufferBuilder buffer;
flatbuffers::Offset<Handshake> request =
CreateHandshake(buffer, kCurrentVersion,
convert::ToFlatBufferVector(&buffer, host_name_));
flatbuffers::Offset<Envelope> envelope =
CreateEnvelope(buffer, EnvelopeMessage_Handshake, request.Union());
buffer.Finish(envelope);
char* buf = reinterpret_cast<char*>(buffer.GetBufferPointer());
size_t size = buffer.GetSize();
connection->SendMessage(fxl::StringView(buf, size));
}
if (!existed_before) {
// If the connection existed before, we don't need to notify again.
OnDeviceChange(remote_name, DeviceChangeType::NEW);
}
}
void P2PProviderImpl::ListenForNewDevices(uint64_t version) {
net_connector_->GetKnownDeviceNames(
version,
[this](uint64_t new_version, std::vector<std::string> devices) {
std::vector<std::string> seen_devices;
for (auto& remote_name : devices) {
seen_devices.push_back(remote_name);
if (contacted_hosts_.find(remote_name) != contacted_hosts_.end()) {
continue;
}
if (remote_name == host_name_) {
continue;
}
std::string remote_name_str(remote_name);
zx::channel local;
zx::channel remote;
zx_status_t status = zx::channel::create(0u, &local, &remote);
FXL_CHECK(status == ZX_OK)
<< "zx::channel::create failed, status " << status;
fuchsia::sys::ServiceProviderPtr device_service_provider;
net_connector_->GetDeviceServiceProvider(
remote_name, device_service_provider.NewRequest());
device_service_provider->ConnectToService(
kRespondingServiceName + user_id_, std::move(remote));
flatbuffers::FlatBufferBuilder buffer;
flatbuffers::Offset<Handshake> request =
CreateHandshake(buffer, kCurrentVersion,
convert::ToFlatBufferVector(&buffer, host_name_));
flatbuffers::Offset<Envelope> envelope = CreateEnvelope(
buffer, EnvelopeMessage_Handshake, request.Union());
buffer.Finish(envelope);
RemoteConnection& connection = connections_.emplace(host_name_);
connection.set_on_message(
[this, &connection, remote_name_str](std::vector<uint8_t> data) {
ProcessHandshake(&connection, std::move(data), false,
remote_name_str);
});
connection.Start(std::move(local));
char* buf = reinterpret_cast<char*>(buffer.GetBufferPointer());
size_t size = buffer.GetSize();
connection.SendMessage(fxl::StringView(buf, size));
contacted_hosts_.insert(std::move(remote_name_str));
}
// Devices that disappeared can be recontacted again later as they might
// have changed.
std::vector<fxl::StringView> to_be_removed;
std::set_difference(contacted_hosts_.begin(), contacted_hosts_.end(),
seen_devices.begin(), seen_devices.end(),
std::back_inserter(to_be_removed));
for (const fxl::StringView& host : to_be_removed) {
contacted_hosts_.erase(contacted_hosts_.find(host));
}
ListenForNewDevices(new_version);
});
}
void P2PProviderImpl::Dispatch(fxl::StringView source,
std::vector<uint8_t> data) {
FXL_DCHECK(client_);
flatbuffers::Verifier verifier(
reinterpret_cast<const unsigned char*>(data.data()), data.size());
if (!VerifyEnvelopeBuffer(verifier)) {
// Wrong serialization, abort.
FXL_LOG(ERROR) << "The message received is malformed.";
return;
};
const Envelope* envelope = GetEnvelope(data.data());
if (envelope->message_type() != EnvelopeMessage_Message) {
FXL_LOG(ERROR) << "The message received is unexpected at this point.";
return;
}
const Message* message = static_cast<const Message*>(envelope->message());
fxl::StringView data_view(
reinterpret_cast<const char*>(message->data()->data()),
message->data()->size());
client_->OnNewMessage(source, data_view);
}
void P2PProviderImpl::OnDeviceChange(fxl::StringView remote_device,
DeviceChangeType change_type) {
FXL_DCHECK(client_);
client_->OnDeviceChange(remote_device, change_type);
}
} // namespace p2p_provider