| // Copyright 2018 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "peridot/bin/ledger/p2p_provider/impl/p2p_provider_impl.h" |
| |
| #include <algorithm> |
| #include <iterator> |
| |
| #include <lib/fxl/files/file.h> |
| #include <lib/fxl/logging.h> |
| |
| #include "peridot/bin/ledger/p2p_provider/impl/envelope_generated.h" |
| |
| namespace p2p_provider { |
| namespace { |
| constexpr char kRespondingServiceName[] = "ledger-p2p-"; |
| const uint16_t kCurrentVersion = 0; |
| |
| bool ValidateHandshake(const Envelope* envelope, const Handshake** message) { |
| if (envelope->message_type() != EnvelopeMessage_Handshake) { |
| FXL_LOG(ERROR) << "Incorrect message type: " << envelope->message_type(); |
| return false; |
| } |
| *message = static_cast<const Handshake*>(envelope->message()); |
| if ((*message)->version() != kCurrentVersion) { |
| FXL_LOG(ERROR) << "Incorrect message version: " << (*message)->version(); |
| return false; |
| } |
| return true; |
| } |
| |
| } // namespace |
| |
| P2PProviderImpl::P2PProviderImpl( |
| std::string host_name, fuchsia::netconnector::NetConnectorPtr net_connector, |
| std::unique_ptr<p2p_provider::UserIdProvider> user_id_provider) |
| : host_name_(std::move(host_name)), |
| net_connector_(std::move(net_connector)), |
| user_id_provider_(std::move(user_id_provider)) {} |
| |
| P2PProviderImpl::~P2PProviderImpl() {} |
| |
| void P2PProviderImpl::Start(Client* client) { |
| FXL_DCHECK(!client_); |
| FXL_DCHECK(client); |
| client_ = client; |
| user_id_provider_->GetUserId( |
| [this](UserIdProvider::Status status, std::string user_id) { |
| if (status != UserIdProvider::Status::OK) { |
| FXL_LOG(ERROR) << "Unable to retrieve the user ID necessary to start " |
| "the peer-to-peer provider."; |
| return; |
| } |
| user_id_ = user_id; |
| StartService(); |
| }); |
| } |
| |
| bool P2PProviderImpl::SendMessage(fxl::StringView destination, |
| fxl::StringView data) { |
| flatbuffers::FlatBufferBuilder buffer; |
| flatbuffers::Offset<Message> message = |
| CreateMessage(buffer, convert::ToFlatBufferVector(&buffer, data)); |
| flatbuffers::Offset<Envelope> envelope = |
| CreateEnvelope(buffer, EnvelopeMessage_Message, message.Union()); |
| buffer.Finish(envelope); |
| |
| char* buf = reinterpret_cast<char*>(buffer.GetBufferPointer()); |
| size_t size = buffer.GetSize(); |
| auto it = connection_map_.find(destination); |
| if (it == connection_map_.end()) { |
| return false; |
| } |
| it->second->SendMessage(fxl::StringView(buf, size)); |
| return true; |
| } |
| |
| void P2PProviderImpl::StartService() { |
| fidl::InterfaceHandle<fuchsia::sys::ServiceProvider> handle; |
| // When the service provider is reset and its connection cut, NetConnector |
| // stops responding for its services. |
| network_service_provider_.AddBinding(handle.NewRequest()); |
| network_service_provider_.AddServiceForName( |
| [this](zx::channel channel) { |
| RemoteConnection& connection = connections_.emplace(host_name_); |
| connection.set_on_message( |
| [this, &connection](std::vector<uint8_t> data) { |
| ProcessHandshake(&connection, std::move(data), true, |
| fxl::StringView()); |
| }); |
| connection.Start(std::move(channel)); |
| }, |
| kRespondingServiceName + user_id_); |
| net_connector_->RegisterServiceProvider(kRespondingServiceName + user_id_, |
| std::move(handle)); |
| |
| ListenForNewDevices(fuchsia::netconnector::kInitialKnownDeviceNames); |
| } |
| |
| void P2PProviderImpl::ProcessHandshake(RemoteConnection* connection, |
| std::vector<uint8_t> data, |
| bool should_send_handshake, |
| fxl::StringView network_remote_name) { |
| flatbuffers::Verifier verifier( |
| reinterpret_cast<const unsigned char*>(data.data()), data.size()); |
| if (!VerifyEnvelopeBuffer(verifier)) { |
| // Wrong serialization, abort. |
| FXL_LOG(ERROR) << "The message received is malformed."; |
| connection->Disconnect(); |
| return; |
| }; |
| const Envelope* envelope = GetEnvelope(data.data()); |
| const Handshake* message; |
| if (!ValidateHandshake(envelope, &message)) { |
| FXL_LOG(ERROR) << "The message received is not valid."; |
| connection->Disconnect(); |
| return; |
| } |
| |
| std::string remote_name(message->host_name()->begin(), |
| message->host_name()->end()); |
| if (!network_remote_name.empty() && network_remote_name != remote_name) { |
| // The name of the remote device as given by the network is different from |
| // the self-declared name. Something is wrong here, let's abort. |
| FXL_LOG(ERROR) << "Network name " << network_remote_name |
| << " different from declared name " << remote_name |
| << ", aborting."; |
| connection->Disconnect(); |
| return; |
| } |
| bool existed_before = false; |
| auto it = connection_map_.find(remote_name); |
| if (it != connection_map_.end()) { |
| if (remote_name < host_name_) { |
| it->second->Disconnect(); |
| existed_before = true; |
| } else { |
| connection->Disconnect(); |
| return; |
| } |
| } |
| |
| connection_map_[remote_name] = connection; |
| |
| connection->set_on_close([this, remote_name]() { |
| connection_map_.erase(remote_name); |
| OnDeviceChange(remote_name, DeviceChangeType::DELETED); |
| }); |
| |
| connection->set_on_message([this, remote_name](std::vector<uint8_t> data) { |
| Dispatch(remote_name, std::move(data)); |
| }); |
| |
| if (should_send_handshake) { |
| // We send an handshake to signal to the other side the connection is |
| // indeed established. |
| flatbuffers::FlatBufferBuilder buffer; |
| flatbuffers::Offset<Handshake> request = |
| CreateHandshake(buffer, kCurrentVersion, |
| convert::ToFlatBufferVector(&buffer, host_name_)); |
| flatbuffers::Offset<Envelope> envelope = |
| CreateEnvelope(buffer, EnvelopeMessage_Handshake, request.Union()); |
| buffer.Finish(envelope); |
| char* buf = reinterpret_cast<char*>(buffer.GetBufferPointer()); |
| size_t size = buffer.GetSize(); |
| connection->SendMessage(fxl::StringView(buf, size)); |
| } |
| |
| if (!existed_before) { |
| // If the connection existed before, we don't need to notify again. |
| OnDeviceChange(remote_name, DeviceChangeType::NEW); |
| } |
| } |
| |
| void P2PProviderImpl::ListenForNewDevices(uint64_t version) { |
| net_connector_->GetKnownDeviceNames( |
| version, |
| [this](uint64_t new_version, std::vector<std::string> devices) { |
| std::vector<std::string> seen_devices; |
| for (auto& remote_name : devices) { |
| seen_devices.push_back(remote_name); |
| if (contacted_hosts_.find(remote_name) != contacted_hosts_.end()) { |
| continue; |
| } |
| if (remote_name == host_name_) { |
| continue; |
| } |
| std::string remote_name_str(remote_name); |
| |
| zx::channel local; |
| zx::channel remote; |
| zx_status_t status = zx::channel::create(0u, &local, &remote); |
| |
| FXL_CHECK(status == ZX_OK) |
| << "zx::channel::create failed, status " << status; |
| |
| fuchsia::sys::ServiceProviderPtr device_service_provider; |
| net_connector_->GetDeviceServiceProvider( |
| remote_name, device_service_provider.NewRequest()); |
| |
| device_service_provider->ConnectToService( |
| kRespondingServiceName + user_id_, std::move(remote)); |
| |
| flatbuffers::FlatBufferBuilder buffer; |
| flatbuffers::Offset<Handshake> request = |
| CreateHandshake(buffer, kCurrentVersion, |
| convert::ToFlatBufferVector(&buffer, host_name_)); |
| flatbuffers::Offset<Envelope> envelope = CreateEnvelope( |
| buffer, EnvelopeMessage_Handshake, request.Union()); |
| buffer.Finish(envelope); |
| |
| RemoteConnection& connection = connections_.emplace(host_name_); |
| connection.set_on_message( |
| [this, &connection, remote_name_str](std::vector<uint8_t> data) { |
| ProcessHandshake(&connection, std::move(data), false, |
| remote_name_str); |
| }); |
| connection.Start(std::move(local)); |
| |
| char* buf = reinterpret_cast<char*>(buffer.GetBufferPointer()); |
| size_t size = buffer.GetSize(); |
| connection.SendMessage(fxl::StringView(buf, size)); |
| contacted_hosts_.insert(std::move(remote_name_str)); |
| } |
| // Devices that disappeared can be recontacted again later as they might |
| // have changed. |
| std::vector<fxl::StringView> to_be_removed; |
| std::set_difference(contacted_hosts_.begin(), contacted_hosts_.end(), |
| seen_devices.begin(), seen_devices.end(), |
| std::back_inserter(to_be_removed)); |
| for (const fxl::StringView& host : to_be_removed) { |
| contacted_hosts_.erase(contacted_hosts_.find(host)); |
| } |
| ListenForNewDevices(new_version); |
| }); |
| } |
| |
| void P2PProviderImpl::Dispatch(fxl::StringView source, |
| std::vector<uint8_t> data) { |
| FXL_DCHECK(client_); |
| flatbuffers::Verifier verifier( |
| reinterpret_cast<const unsigned char*>(data.data()), data.size()); |
| if (!VerifyEnvelopeBuffer(verifier)) { |
| // Wrong serialization, abort. |
| FXL_LOG(ERROR) << "The message received is malformed."; |
| return; |
| }; |
| const Envelope* envelope = GetEnvelope(data.data()); |
| if (envelope->message_type() != EnvelopeMessage_Message) { |
| FXL_LOG(ERROR) << "The message received is unexpected at this point."; |
| return; |
| } |
| |
| const Message* message = static_cast<const Message*>(envelope->message()); |
| |
| fxl::StringView data_view( |
| reinterpret_cast<const char*>(message->data()->data()), |
| message->data()->size()); |
| client_->OnNewMessage(source, data_view); |
| } |
| |
| void P2PProviderImpl::OnDeviceChange(fxl::StringView remote_device, |
| DeviceChangeType change_type) { |
| FXL_DCHECK(client_); |
| client_->OnDeviceChange(remote_device, change_type); |
| } |
| |
| } // namespace p2p_provider |