blob: 2a75038a9a68c0d673e4898fa13c881012189069 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
anyhow::{Context as _, Error},
fidl_fuchsia_developer_remotecontrol::{RemoteControlMarker, RemoteControlProxy},
fuchsia_async as fasync,
fuchsia_component::client::connect_to_service,
futures::{future::try_join, prelude::*},
hoist::{hoist, OvernetInstance},
std::io::{Read, Write},
};
async fn copy_stdin_to_socket(
mut tx_socket: futures::io::WriteHalf<fidl::AsyncSocket>,
) -> Result<(), Error> {
let (mut tx_stdin, mut rx_stdin) = futures::channel::mpsc::channel::<Vec<u8>>(2);
std::thread::Builder::new()
.spawn(move || -> Result<(), Error> {
let mut buf = [0u8; 1024];
let mut stdin = std::io::stdin();
loop {
let n = stdin.read(&mut buf)?;
if n == 0 {
return Ok(());
}
let buf = &buf[..n];
futures::executor::block_on(tx_stdin.send(buf.to_vec()))?;
}
})
.context("Spawning blocking thread")?;
while let Some(buf) = rx_stdin.next().await {
tx_socket.write(buf.as_slice()).await?;
}
Ok(())
}
async fn copy_socket_to_stdout(
mut rx_socket: futures::io::ReadHalf<fidl::AsyncSocket>,
) -> Result<(), Error> {
let (mut tx_stdout, mut rx_stdout) = futures::channel::mpsc::channel::<Vec<u8>>(2);
std::thread::Builder::new()
.spawn(move || -> Result<(), Error> {
let mut stdout = std::io::stdout();
while let Some(buf) = futures::executor::block_on(rx_stdout.next()) {
let mut buf = buf.as_slice();
loop {
let n = stdout.write(buf)?;
if n == buf.len() {
stdout.flush()?;
break;
}
buf = &buf[n..];
}
}
Ok(())
})
.context("Spawning blocking thread")?;
let mut buf = [0u8; 1024];
loop {
let n = rx_socket.read(&mut buf).await?;
tx_stdout.send((&buf[..n]).to_vec()).await?;
}
}
async fn send_request(proxy: RemoteControlProxy) -> Result<(), Error> {
// We just need to make a request to the RCS - it doesn't really matter
// what we choose here so long as there are no side effects.
let _ = proxy.identify_host().await?;
Ok(())
}
#[fasync::run_singlethreaded]
async fn main() -> Result<(), Error> {
let rcs_proxy = connect_to_service::<RemoteControlMarker>()?;
send_request(rcs_proxy).await?;
let (local_socket, remote_socket) = fidl::Socket::create(fidl::SocketOpts::STREAM)?;
let local_socket = fidl::AsyncSocket::from_socket(local_socket)?;
let (rx_socket, tx_socket) = futures::AsyncReadExt::split(local_socket);
hoist().connect_as_mesh_controller()?.attach_socket_link(remote_socket)?;
try_join(copy_socket_to_stdout(rx_socket), copy_stdin_to_socket(tx_socket)).await?;
Ok(())
}
#[cfg(test)]
mod test {
use {
crate::send_request,
anyhow::Error,
fidl::endpoints::create_proxy_and_stream,
fidl_fuchsia_developer_remotecontrol::{
IdentifyHostResponse, RemoteControlMarker, RemoteControlProxy, RemoteControlRequest,
},
fuchsia_async as fasync,
futures::prelude::*,
};
fn setup_fake_rcs(handle_stream: bool) -> RemoteControlProxy {
let (proxy, mut stream) = create_proxy_and_stream::<RemoteControlMarker>().unwrap();
if !handle_stream {
return proxy;
}
fasync::Task::spawn(async move {
while let Ok(req) = stream.try_next().await {
match req {
Some(RemoteControlRequest::IdentifyHost { responder }) => {
let _ = responder
.send(&mut Ok(IdentifyHostResponse {
nodename: Some("".to_string()),
addresses: Some(vec![]),
..IdentifyHostResponse::EMPTY
}))
.unwrap();
}
_ => assert!(false),
}
}
})
.detach();
proxy
}
#[fasync::run_singlethreaded(test)]
async fn test_handles_successful_response() -> Result<(), Error> {
let rcs_proxy = setup_fake_rcs(true);
assert!(send_request(rcs_proxy).await.is_ok());
Ok(())
}
#[fasync::run_singlethreaded(test)]
async fn test_handles_failed_response() -> Result<(), Error> {
let rcs_proxy = setup_fake_rcs(false);
assert!(send_request(rcs_proxy).await.is_err());
Ok(())
}
}