blob: ad3bad207c7343e025906260517ec53a015f51da [file] [edit]
// Copyright 2024 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use fdomain_client::Client;
use fdomain_client::fidl::{ProtocolMarker, Proxy};
use fidl_fuchsia_developer_remotecontrol_connector::ConnectorMarker;
use fuchsia_component::client::connect_to_protocol;
use futures::stream::Stream;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll, ready};
use {fdomain_fuchsia_developer_remotecontrol as rcs, fdomain_fuchsia_io as fio};
/// An FDomain client transport that works over a Fuchsia socket. Uses 32-bit
/// little endian frame sizes before each frame per the documentation of the
/// Connector protocol.
struct SocketTransport {
socket: fuchsia_async::Socket,
out_buf: Vec<u8>,
in_buf: Vec<u8>,
}
impl fdomain_client::FDomainTransport for SocketTransport {
fn poll_send_message(
mut self: Pin<&mut Self>,
msg: &[u8],
ctx: &mut Context<'_>,
) -> Poll<fdomain_client::Result<(), Option<io::Error>>> {
if self.out_buf.is_empty() {
let len = msg.len();
let len: u32 = len.try_into().unwrap();
self.out_buf.extend_from_slice(&len.to_le_bytes());
self.out_buf.extend_from_slice(msg);
}
while !self.out_buf.is_empty() {
let res = ready!(self.socket.poll_write_ref(ctx, &self.out_buf))
.map_err(|x| Some(x.into()))?;
if res == 0 {
return Poll::Ready(Err(None));
}
self.out_buf.drain(..res);
}
Poll::Ready(Ok(()))
}
}
impl Stream for SocketTransport {
type Item = io::Result<Box<[u8]>>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut orig_size = self.in_buf.len();
let this = self.get_mut();
loop {
let mut blocked = false;
extend_buf(&mut this.in_buf);
let sock = &mut this.socket;
let buf = &mut this.in_buf;
match sock.poll_read_ref(cx, &mut buf[orig_size..]) {
Poll::Ready(Ok(got)) => {
if got == 0 {
return Poll::Ready(None);
}
orig_size += got;
}
Poll::Ready(Err(e)) => {
this.in_buf.resize(orig_size, 0);
if e == fidl::Status::PEER_CLOSED {
return Poll::Ready(None);
} else {
return Poll::Ready(Some(Err(e.into())));
}
}
Poll::Pending => {
this.in_buf.resize(orig_size, 0);
blocked = true;
}
}
if orig_size < 4 {
if blocked {
return Poll::Pending;
} else {
continue;
}
}
let packet_size = u32::from_le_bytes(this.in_buf[..4].try_into().unwrap());
let packet_size: usize = packet_size.try_into().unwrap();
if orig_size < packet_size + 4 {
if blocked {
return Poll::Pending;
} else {
continue;
}
}
let tail = this.in_buf.split_off(packet_size + 4);
let mut packet = std::mem::replace(&mut this.in_buf, tail);
this.in_buf.resize(orig_size - (packet_size + 4), 0);
packet.drain(..4);
return Poll::Ready(Some(Ok(packet.into())));
}
}
}
/// Add padding to the end of a buffer to be used as space to read more data.
fn extend_buf(buf: &mut Vec<u8>) {
const BLOCK_SIZE: usize = 4096;
let size = buf.len();
let target_size = (size + (BLOCK_SIZE - 1)) / BLOCK_SIZE * BLOCK_SIZE;
let target_size =
if target_size - size < BLOCK_SIZE / 2 { target_size + BLOCK_SIZE } else { target_size };
buf.resize(target_size, 0);
}
#[fuchsia::test]
async fn test_fdomain_socket() {
let rcs_proxy = connect_to_protocol::<ConnectorMarker>().unwrap();
let (local_socket, remote_socket) = fidl::Socket::create_stream();
rcs_proxy.fdomain_toolbox_socket(remote_socket).await.unwrap();
let local_socket = fuchsia_async::Socket::from_socket(local_socket);
let (client, fut) = Client::new(SocketTransport {
socket: local_socket,
in_buf: Vec::new(),
out_buf: Vec::new(),
});
fuchsia_async::Task::spawn(fut).detach();
let ns = client.namespace().await.unwrap();
let ns = fio::DirectoryProxy::from_channel(ns);
let (rcs_client, server_end) = client.create_proxy::<rcs::RemoteControlMarker>();
ns.open(
rcs::RemoteControlMarker::DEBUG_NAME,
fio::Flags::PROTOCOL_SERVICE,
&fio::Options::default(),
server_end.into_channel(),
)
.unwrap();
assert_eq!("bob", rcs_client.echo_string("bob").await.unwrap());
}