blob: 5096c045ce8efef0937afa6ed882e7772d77661f [file] [log] [blame]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//! Fuchsia netdevice client tun test
use assert_matches::assert_matches;
use fidl::{endpoints, AsHandleRef};
use fuchsia_component::client::connect_to_protocol;
use futures::future::{Future, FutureExt as _};
use futures::TryStreamExt as _;
use netdevice_client::{Client, DerivableConfig, Port, Session};
use std::convert::TryInto as _;
use std::io::{Read as _, Write as _};
use std::pin::pin;
use std::task::Poll;
use {
fidl_fuchsia_hardware_network as netdev, fidl_fuchsia_net_tun as tun, fuchsia_async as fasync,
};
const DEFAULT_PORT_ID: u8 = 2;
const DEFAULT_MTU: u32 = 1500;
const DATA_BYTE: u8 = 42;
const DATA_LEN: usize = 4;
#[fasync::run_singlethreaded(test)]
async fn test_rx() {
let (tun, _tun_port, port) = create_tun_device_and_port().await;
let client = create_netdev_client(&tun);
let () = with_netdev_session(
client,
DerivableConfig::default(),
port,
"test_rx",
|session, _client| async move {
let frame = tun::Frame {
frame_type: Some(netdev::FrameType::Ethernet),
data: Some(vec![DATA_BYTE; DATA_LEN]),
port: Some(DEFAULT_PORT_ID),
..Default::default()
};
let () = tun.write_frame(&frame).await.unwrap().expect("failed to write frame");
let buff = session.recv().await.expect("failed to recv buffer");
let mut bytes = [0u8; DATA_LEN];
buff.read_at(0, &mut bytes[..]).expect("failed to read from the buffer");
for i in bytes.iter() {
assert_eq!(*i, DATA_BYTE);
}
},
)
.await;
}
#[fasync::run_singlethreaded(test)]
async fn test_tx() {
let (tun, _tun_port, port) = create_tun_device_and_port().await;
let client = create_netdev_client(&tun);
let () = with_netdev_session(
client,
DerivableConfig::default(),
port,
"test_tx",
|session, _client| async move {
let mut buffer =
session.alloc_tx_buffer(DATA_LEN).await.expect("failed to alloc tx buffer");
assert_eq!(
buffer.write(&[DATA_BYTE; DATA_LEN][..]).expect("failed to write into the buffer"),
DATA_LEN
);
buffer.set_port(port);
buffer.set_frame_type(netdev::FrameType::Ethernet);
session.send(buffer).expect("failed to send the buffer");
let frame = tun
.read_frame()
.await
.unwrap()
.map_err(zx::Status::from_raw)
.expect("failed to read frame from the tun device");
assert_eq!(frame.data, Some(vec![DATA_BYTE; DATA_LEN]));
assert_eq!(frame.frame_type, Some(netdev::FrameType::Ethernet));
assert_eq!(frame.port, Some(DEFAULT_PORT_ID));
},
)
.await;
}
// Receives buffer from session and echoes back. It copies the content from
// half of the buffers, round robin on index.
async fn echo(session: Session, port: Port, frame_count: u32) {
for i in 0..frame_count {
let mut buffer = session.recv().await.expect("failed to recv from session");
assert_eq!(buffer.cap(), DATA_LEN);
let mut bytes = [0u8; DATA_LEN];
assert_eq!(buffer.read(&mut bytes[..]).unwrap(), DATA_LEN);
assert_eq!(u32::from_le_bytes(bytes), i);
if i % 2 == 0 {
let buffer = buffer.into_tx().await;
session.send(buffer).expect("failed to send the buffer back on the zero-copy path");
} else {
let mut buffer =
session.alloc_tx_buffer(DATA_LEN).await.expect("no tx buffer available");
buffer.set_frame_type(netdev::FrameType::Ethernet);
buffer.set_port(port);
assert_eq!(buffer.write(&bytes).unwrap(), DATA_LEN);
session.send(buffer).expect("failed to send the buffer back on the copying path");
}
}
}
#[fasync::run_singlethreaded(test)]
async fn test_echo_tun() {
const FRAME_TOTAL_COUNT: u32 = 512;
let (tun, _tun_port, port) = create_tun_device_and_port().await;
let client = create_netdev_client(&tun);
with_netdev_session(
client,
DerivableConfig::default(),
port,
"test_echo_tun",
|session, _client| async {
let echo_fut = echo(session, port, FRAME_TOTAL_COUNT);
let main_fut = async move {
for i in 0..FRAME_TOTAL_COUNT {
let frame = tun::Frame {
frame_type: Some(netdev::FrameType::Ethernet),
data: Some(Vec::from(i.to_le_bytes())),
port: Some(DEFAULT_PORT_ID),
..Default::default()
};
let () = tun
.write_frame(&frame)
.await
.unwrap()
.map_err(zx::Status::from_raw)
.expect("cannot write frame");
let frame = tun
.read_frame()
.await
.unwrap()
.map_err(zx::Status::from_raw)
.expect("failed to read frame");
let data = frame.data.unwrap();
assert_eq!(data.len(), DATA_LEN);
let bytes: [u8; DATA_LEN] = data.try_into().unwrap();
assert_eq!(u32::from_le_bytes(bytes), i);
}
};
let ((), ()) = futures::join!(echo_fut, main_fut);
},
)
.await;
}
#[fasync::run_singlethreaded(test)]
async fn test_echo_pair() {
const FRAME_TOTAL_COUNT: u32 = 512;
let pair = create_tun_device_pair();
let (client1, port1, client2, port2) = create_netdev_client_pair(&pair).await;
let () = with_netdev_session(
client1,
DerivableConfig::default(),
port1,
"test_echo_pair_1",
|session1, client1| async move {
let () = with_netdev_session(
client2,
DerivableConfig::default(),
port2,
"test_echo_pair_2",
|session2, client2| async move {
// Wait for the ports to be online before we send anything.
assert_matches!(
client1.wait_online(port1).await,
Ok(netdevice_client::PortStatus {
flags: netdev::StatusFlags::ONLINE,
mtu: DEFAULT_MTU
})
);
assert_matches!(
client2.wait_online(port2).await,
Ok(netdevice_client::PortStatus {
flags: netdev::StatusFlags::ONLINE,
mtu: DEFAULT_MTU
})
);
let echo_fut = echo(session1, port1, FRAME_TOTAL_COUNT);
let main_fut = async {
for i in 0..FRAME_TOTAL_COUNT {
let mut buffer = session2
.alloc_tx_buffer(DATA_LEN)
.await
.expect("failed to alloc tx buffer");
buffer.set_frame_type(netdev::FrameType::Ethernet);
buffer.set_port(port2);
let mut bytes = i.to_le_bytes();
assert_eq!(
buffer.write(&bytes[..]).expect("failed to write into the buffer"),
DATA_LEN
);
session2.send(buffer).expect("failed to send the buffer");
let mut buffer =
session2.recv().await.expect("failed to recv from the session");
assert_eq!(
buffer
.read(&mut bytes[..])
.expect("failed to read from the buffer"),
DATA_LEN
);
assert_eq!(u32::from_le_bytes(bytes), i);
}
};
futures::join!(echo_fut, main_fut);
},
)
.await;
},
)
.await;
}
#[fasync::run_singlethreaded(test)]
async fn test_status_stream() {
const TOGGLE_COUNT: usize = 3;
let (tun, tun_port, port) = create_tun_device_and_port().await;
let client = create_netdev_client(&tun);
let mut watcher = client.port_status_stream(port).expect("failed to create a status watcher");
for _ in 0..TOGGLE_COUNT {
assert_matches!(
watcher.try_next().await,
Ok(Some(netdevice_client::PortStatus {
flags: netdev::StatusFlags::ONLINE,
mtu: DEFAULT_MTU
}))
);
tun_port.set_online(false).await.expect("failed to flip online flag");
assert_eq!(
watcher.try_next().await.expect("failed to get next status update"),
Some(netdevice_client::PortStatus {
flags: netdev::StatusFlags::empty(),
mtu: DEFAULT_MTU
})
);
tun_port.set_online(true).await.expect("failed to flip online flag");
}
}
#[fasync::run_singlethreaded(test)]
async fn test_port_stream() {
let (_tun, mut stream, port) = {
let (tun, _tun_port, port) = create_tun_device_and_port().await;
let client = create_netdev_client(&tun);
let mut stream = client.device_port_event_stream().expect("failed to create port stream");
assert_matches!(
stream.try_next().await.expect("failed to get next event"),
Some(netdev::DevicePortEvent::Existing(p)) if p == port.into()
);
assert_matches!(
stream.try_next().await.expect("failed to get next event"),
Some(netdev::DevicePortEvent::Idle(netdev::Empty { .. }))
);
(tun, stream, port)
};
assert_matches!(
stream.try_next().await.expect("failed to get next event"),
Some(netdev::DevicePortEvent::Removed(p)) if p == port.into()
);
}
#[test]
fn tx_wait_idle() {
let mut executor = fasync::TestExecutor::new();
let (tun, _tun_port, port) = executor.run_singlethreaded(create_tun_device_and_port());
let client = create_netdev_client(&tun);
let session = executor.run_singlethreaded(async {
let (session, task) = client
.new_session_with_derivable_config("tx_wait_idle", DerivableConfig::default())
.await
.expect("failed to create session");
session
.attach(port, &[netdev::FrameType::Ethernet])
.await
.expect("failed to attach session");
// Given we're manually running the executor, detaching the task and
// panicking on exit is easier than driving it manually and we don't
// lose any signal.
fasync::Task::spawn(
task.map(|res| panic!("the background task for session terminated with {:?}", res)),
)
.detach();
session
});
let mut fut = pin!(session.wait_tx_idle());
assert_eq!(executor.run_until_stalled(&mut fut), Poll::Ready(()));
// Send 2 buffers.
executor.run_singlethreaded(async {
let buffers = session
.alloc_tx_buffers(DATA_LEN)
.await
.expect("failed to alloc tx buffers")
.take(2)
.collect::<Result<Vec<_>, _>>()
.expect("failed to collect tx buffers");
for mut buffer in buffers {
assert_eq!(
buffer.write(&[DATA_BYTE; DATA_LEN][..]).expect("failed to write into the buffer"),
DATA_LEN
);
buffer.set_port(port);
buffer.set_frame_type(netdev::FrameType::Ethernet);
session.send(buffer).expect("failed to send the buffer");
}
});
// We have now 2 outstanding buffers so we try to wait idle it should block.
let mut fut = pin!(session.wait_tx_idle());
assert_eq!(executor.run_until_stalled(&mut fut), Poll::Pending);
// Complete a single frame.
executor.run_singlethreaded(async {
let tun::Frame { .. } = tun
.read_frame()
.await
.unwrap()
.map_err(zx::Status::from_raw)
.expect("failed to read frame from the tun device");
});
// Must still be pending.
assert_eq!(executor.run_until_stalled(&mut fut), Poll::Pending);
// Complete the second frame then the future will eventually resolve.
let ((), ()) = executor.run_singlethreaded(futures::future::join(fut, async {
let tun::Frame { .. } = tun
.read_frame()
.await
.unwrap()
.map_err(zx::Status::from_raw)
.expect("failed to read frame from the tun device");
}));
}
#[fasync::run_singlethreaded(test)]
async fn watch_rx_leases() {
let (tun, _tun_port, port) = create_tun_device_and_port().await;
let client = create_netdev_client(&tun);
with_netdev_session(
client,
DerivableConfig { watch_rx_leases: true, ..Default::default() },
port,
"watch_rx_leases",
|session, _client| async move {
let mut leases_stream = pin!(session.watch_rx_leases());
for frame_idx in 1..=2 {
let (lease, lease_send) = zx::Channel::create();
tun.delegate_rx_lease(netdev::DelegatedRxLease {
hold_until_frame: Some(frame_idx),
handle: Some(netdev::DelegatedRxLeaseHandle::Channel(lease_send)),
__source_breaking: fidl::marker::SourceBreaking,
})
.expect("delegate lease");
let frame = tun::Frame {
frame_type: Some(netdev::FrameType::Ethernet),
data: Some(vec![1, 2, 3]),
port: Some(DEFAULT_PORT_ID),
..Default::default()
};
tun.write_frame(&frame).await.expect("writing frame").expect("write frame error");
let frame = session.recv().await.expect("receive frame");
assert_matches!(leases_stream.try_next().now_or_never(), None);
drop(frame);
let yielded_lease = leases_stream
.try_next()
.await
.expect("lease error")
.expect("lease stream ended unexpectedly");
let yielded_lease = assert_matches!(yielded_lease.inner(),
netdev::DelegatedRxLeaseHandle::Channel(c) => c);
// Prove we have the same lease.
assert_eq!(
yielded_lease.get_koid().unwrap(),
lease.basic_info().unwrap().related_koid
);
}
},
)
.await;
}
fn default_base_port_config() -> tun::BasePortConfig {
tun::BasePortConfig {
id: Some(DEFAULT_PORT_ID),
mtu: Some(DEFAULT_MTU),
rx_types: Some(vec![netdev::FrameType::Ethernet]),
tx_types: Some(vec![netdev::FrameTypeSupport {
type_: netdev::FrameType::Ethernet,
features: netdev::FRAME_FEATURES_RAW,
supported_flags: netdev::TxFlags::empty(),
}]),
..Default::default()
}
}
async fn create_tun_device_and_port() -> (tun::DeviceProxy, tun::PortProxy, Port) {
let ctrl =
connect_to_protocol::<tun::ControlMarker>().expect("failed to connect to tun.Control");
let (device, server) = endpoints::create_proxy::<tun::DeviceMarker>();
ctrl.create_device(&tun::DeviceConfig { blocking: Some(true), ..Default::default() }, server)
.expect("failed to create device");
let (port, server) = endpoints::create_proxy::<tun::PortMarker>();
device
.add_port(
&tun::DevicePortConfig {
base: Some(default_base_port_config()),
online: Some(true),
..Default::default()
},
server,
)
.expect("failed to add port to device");
let (device_port, server) = endpoints::create_proxy::<netdev::PortMarker>();
port.get_port(server).expect("get device port");
let id = device_port.get_info().await.expect("getting port info").id.expect("missing port id");
(device, port, id.try_into().expect("bad port id"))
}
fn create_tun_device_pair() -> tun::DevicePairProxy {
let ctrl =
connect_to_protocol::<tun::ControlMarker>().expect("failed to connect to tun.Control");
let (pair, server) = endpoints::create_proxy::<tun::DevicePairMarker>();
ctrl.create_pair(&tun::DevicePairConfig::default(), server).expect("create device pair");
pair
}
fn create_netdev_client_and_server() -> (Client, endpoints::ServerEnd<netdev::DeviceMarker>) {
let (device, server) = endpoints::create_proxy::<netdev::DeviceMarker>();
(Client::new(device), server)
}
fn create_netdev_client(tun: &tun::DeviceProxy) -> Client {
let (client, server) = create_netdev_client_and_server();
tun.get_device(server).expect("failed to connect device to tun");
client
}
async fn create_netdev_client_pair(pair: &tun::DevicePairProxy) -> (Client, Port, Client, Port) {
let (device_left, left) = endpoints::create_proxy::<netdev::DeviceMarker>();
let (device_right, right) = endpoints::create_proxy::<netdev::DeviceMarker>();
pair.get_left(left).expect("failed to connect left");
pair.get_right(right).expect("failed to connect right");
pair.add_port(&tun::DevicePairPortConfig {
base: Some(default_base_port_config()),
..Default::default()
})
.await
.unwrap()
.expect("failed to create the default logical port");
async fn get_port_id<F: FnOnce(fidl::endpoints::ServerEnd<netdev::PortMarker>)>(f: F) -> Port {
let (port, server) = fidl::endpoints::create_proxy::<netdev::PortMarker>();
f(server);
port.get_info()
.await
.expect("getting info")
.id
.expect("missing id")
.try_into()
.expect("bad port id")
}
let port_left =
get_port_id(|server| pair.get_left_port(DEFAULT_PORT_ID, server).expect("get left port"))
.await;
let port_right =
get_port_id(|server| pair.get_right_port(DEFAULT_PORT_ID, server).expect("get right port"))
.await;
(Client::new(device_left), port_left, Client::new(device_right), port_right)
}
async fn with_netdev_session<F, Fut>(
client: Client,
config: DerivableConfig,
port: Port,
name: &str,
f: F,
) where
F: FnOnce(Session, Client) -> Fut,
Fut: Future<Output = ()>,
{
let (session, task) = client
.new_session_with_derivable_config(name, config)
.await
.expect("failed to create session");
let () = session
.attach(port, &[netdev::FrameType::Ethernet])
.await
.expect("failed to attach session");
futures::select! {
() = f(session, client).fuse() => {},
res = task.fuse() => panic!("the background task for session terminated with {:?}", res),
}
}