blob: c3a1b2f52ca7f2580dafc838d0f3eb06b7792fdd [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#![feature(async_await, await_macro, futures_api)]
#![deny(warnings)]
use {
failure::{format_err, Error, ResultExt},
fidl::{encoding::OutOfLine, endpoints},
fidl_fuchsia_vsock::{
AcceptorMarker, AcceptorRequest, ConnectionMarker, ConnectionProxy, ConnectionTransport,
ConnectorMarker,
},
fuchsia_app::client::connect_to_service,
fuchsia_async as fasync,
fuchsia_zircon::{self as zx, AsHandleRef},
futures::{
io::{AsyncReadExt, AsyncWriteExt},
StreamExt,
},
};
const TEST_DATA_LEN: u64 = 60000;
fn make_socket_pair() -> Result<(fasync::Socket, zx::Socket), Error> {
let (a, b) = zx::Socket::create(zx::SocketOpts::STREAM)?;
let info = a.info()?;
a.set_tx_threshold(&(info.tx_buf_max as usize))?;
let a_stream = fasync::Socket::from_socket(a)?;
Ok((a_stream, b))
}
fn wait_socket_empty(socket: &fasync::Socket) {
socket
.as_handle_ref()
.wait(zx::Signals::SOCKET_WRITE_THRESHOLD, zx::Time::INFINITE)
.unwrap();
}
async fn test_read_write<'a>(
socket: &'a mut fasync::Socket, con: &'a ConnectionProxy,
) -> Result<(), Error> {
let data = Box::new([42u8; TEST_DATA_LEN as usize]);
await!(socket.write_all(&*data))?;
wait_socket_empty(&socket);
// Send two back to back vmos
let vmo = zx::Vmo::create(TEST_DATA_LEN)?;
let complete1 = con.send_vmo(vmo.clone(0, TEST_DATA_LEN)?, 0, TEST_DATA_LEN);
let complete2 = con.send_vmo(vmo.clone(0, TEST_DATA_LEN)?, 0, TEST_DATA_LEN);
await!(complete1)?;
await!(complete2)?;
// Now write into the socket again
await!(socket.write_all(&*data))?;
wait_socket_empty(&socket);
// Expect a single value back
let mut val = [0];
await!(socket.read_exact(&mut val))?;
if val[0] != 42 {
return Err(format_err!("Expected to read '42' not '{}'", val[0]));
}
Ok(())
}
fn make_con() -> Result<(fasync::Socket, ConnectionProxy, ConnectionTransport), failure::Error> {
let (data_stream, server_socket) = make_socket_pair()?;
let (client_end, server_end) = endpoints::create_endpoints::<ConnectionMarker>()?;
let client_end = client_end.into_proxy().unwrap();
let con = ConnectionTransport {
data: server_socket,
con: server_end,
};
Ok((data_stream, client_end, con))
}
#[fasync::run_singlethreaded]
async fn main() -> Result<(), Error> {
let vsock =
connect_to_service::<ConnectorMarker>().context("failed to connect to vsock service")?;
// Register the listener early to avoid any race conditions later
let (acceptor_client, acceptor) = endpoints::create_endpoints::<AcceptorMarker>()?;
await!(vsock.listen(8001, acceptor_client))?;
let mut acceptor = acceptor.into_stream()?;
let (mut data_stream, client_end, mut con) = make_con()?;
let _port = await!(vsock.connect(2, 8000, &mut con))?.0;
await!(test_read_write(&mut data_stream, &client_end))?;
client_end.shutdown()?;
data_stream
.as_handle_ref()
.wait(zx::Signals::SOCKET_PEER_CLOSED, zx::Time::INFINITE)?;
// Wait for a connection
let AcceptorRequest::Accept { addr: _, responder } =
await!(acceptor.next()).ok_or(format_err!("Failed to get incoming connection"))??;
let (mut data_stream, client_end, mut con) = make_con()?;
responder.send(Some(OutOfLine(&mut con)))?;
// Send data then wait for other end to shut us down.
await!(test_read_write(&mut data_stream, &client_end))?;
data_stream
.as_handle_ref()
.wait(zx::Signals::SOCKET_PEER_CLOSED, zx::Time::INFINITE)?;
// Get next connection
let AcceptorRequest::Accept { addr: _, responder } =
await!(acceptor.next()).ok_or(format_err!("Failed to get incoming connection"))??;
let (mut data_stream, _client_end, mut con) = make_con()?;
responder.send(Some(OutOfLine(&mut con)))?;
// Send data until the peer closes
let data = Box::new([42u8; TEST_DATA_LEN as usize]);
loop {
let result = await!(data_stream.write_all(&*data));
if let Err(e) = result {
if e.kind() == std::io::ErrorKind::ConnectionAborted {
break;
}
}
}
// Get next connection
{
let AcceptorRequest::Accept { addr: _, responder } =
await!(acceptor.next()).ok_or(format_err!("Failed to get incoming connection"))??;
let (mut data_stream, _client_end, mut con) = make_con()?;
responder.send(Some(OutOfLine(&mut con)))?;
// Read some data then suddenly close the connection.
let mut val = [0];
await!(data_stream.read_exact(&mut val))?;
if val[0] != 0 {
return Err(format_err!("Expected to read '0' no '{}'", val[0]));
}
}
println!("PASS");
Ok(())
}