blob: 9a33a7d66d9d21c0eebf85c5a8205ca70bc67c6a [file] [log] [blame]
// Copyright 2023 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "examples/components/pw_rpc/runner/connection.h"
#include <lib/syslog/cpp/macros.h>
#include <algorithm>
#include <optional>
#include "pw_span/span.h"
// Reads any available data from the socket into the buffer.
pw::Status HdlcChannelSocketConnection::ReadIntoBuffer() {
auto res = stream_.Read(data_);
if (!res.ok()) {
return res.status();
}
pw::ByteSpan& read_data = *res;
input_buffer_.insert(input_buffer_.end(), read_data.begin(), read_data.end());
return pw::OkStatus();
}
pw::Result<std::pair<pw::ConstByteSpan, uint32_t>> HdlcChannelSocketConnection::GetNextRpcPacket() {
if (returned_packet_) {
// A packet was returned last time, so reset the buffer.
std::vector<std::byte> new_input_buffer;
new_input_buffer.reserve(input_buffer_.size() - pos_);
std::copy(input_buffer_.begin() + pos_, input_buffer_.end(), new_input_buffer.begin());
input_buffer_.swap(new_input_buffer);
pos_ = 0;
returned_packet_ = false;
}
while (pos_ < input_buffer_.size()) {
const std::byte byte = input_buffer_[pos_++];
const auto res = decoder_.Process(byte);
if (!res.ok()) {
continue;
}
const pw::hdlc::Frame& frame = *res;
// Bind the packet's address to this stream.
if (!address_.has_value()) {
address_.emplace(frame.address());
}
returned_packet_ = true;
auto bytes = pw::span{&input_buffer_[0], &input_buffer_[pos_]};
return std::make_pair(bytes, frame.address());
}
return pw::Status::ResourceExhausted();
}
pw::Status HdlcChannelSocketConnection::Write(pw::ConstByteSpan data) {
return stream_.Write(data);
}
void HdlcChannelSocketConnection::Close() {
stream_.Close();
input_buffer_.clear();
pos_ = 0;
returned_packet_ = false;
address_.reset();
}
ConnectionGroup::ConnectionGroup(pw::stream::SocketStream stream)
: real_connection(std::move(stream)) {}
HdlcChannelSocketConnection* ConnectionGroup::LookupByFd(int fd) {
if (real_connection.connection_fd() == fd) {
return &real_connection;
}
for (HdlcChannelSocketConnection& c : virtual_connections) {
if (c.connection_fd() == fd) {
return &c;
}
}
return nullptr;
}
HdlcChannelSocketConnection* ConnectionGroup::LookupVirtualByAddress(uint64_t address) {
for (HdlcChannelSocketConnection& c : virtual_connections) {
if (c.address().has_value() && *c.address() == address) {
return &c;
}
}
return nullptr;
}
void ConnectionGroup::CloseConnection(int fd) {
if (real_connection.connection_fd() == fd) {
real_connection.Close();
return;
}
for (auto i = virtual_connections.begin(); i != virtual_connections.end(); ++i) {
HdlcChannelSocketConnection& c = *i;
if (c.connection_fd() == fd) {
c.Close();
virtual_connections.erase(i);
return;
}
}
}
std::vector<int> ConnectionGroup::GetAllFds() {
std::vector<int> res;
res.reserve(virtual_connections.size() + 1);
res.push_back(real_connection.connection_fd());
for (HdlcChannelSocketConnection& c : virtual_connections) {
res.push_back(c.connection_fd());
}
return res;
}
bool ConnectionGroup::RealDisconnected() { return real_connection.connection_fd() < 0; }