blob: ed46fe5dd0dec0febc3c50d0c46c6fbb10a4c284 [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//! Triangular overnet tests - tests that involve passing a channel amongst at least three nodes
#![cfg(test)]
use {
super::Overnet,
anyhow::{Context as _, Error},
fidl::{
endpoints::{ClientEnd, ServerEnd},
prelude::*,
},
fidl_fuchsia_overnet::{
Peer, ServiceConsumerProxyInterface, ServiceProviderRequest, ServiceProviderRequestStream,
ServicePublisherProxyInterface,
},
fidl_fuchsia_overnet_triangletests as triangle, fidl_test_placeholders as echo,
fuchsia_async::Task,
futures::prelude::*,
overnet_core::{NodeId, NodeIdGenerator},
std::sync::Arc,
};
////////////////////////////////////////////////////////////////////////////////
// Test scenarios
#[fuchsia::test]
async fn simple_loop(run: usize) -> Result<(), Error> {
let mut node_id_gen = NodeIdGenerator::new("simple_loop", run);
// Three nodes, fully connected
// A creates a channel, passes either end to B, C to do an echo request
let a = Overnet::new(&mut node_id_gen)?;
let b = Overnet::new(&mut node_id_gen)?;
let c = Overnet::new(&mut node_id_gen)?;
super::connect(&a, &b)?;
super::connect(&b, &c)?;
super::connect(&a, &c)?;
run_triangle_echo_test(
b.node_id(),
c.node_id(),
a,
vec![],
vec![b, c],
Some("HELLO INTEGRATION TEST WORLD"),
)
.await
}
#[fuchsia::test]
async fn simple_flat(run: usize) -> Result<(), Error> {
let mut node_id_gen = NodeIdGenerator::new("simple_flat", run);
// Three nodes, connected linearly: C - A - B
// A creates a channel, passes either end to B, C to do an echo request
let a = Overnet::new(&mut node_id_gen)?;
let b = Overnet::new(&mut node_id_gen)?;
let c = Overnet::new(&mut node_id_gen)?;
super::connect(&a, &b)?;
super::connect(&a, &c)?;
run_triangle_echo_test(
b.node_id(),
c.node_id(),
a,
vec![],
vec![b, c],
Some("HELLO INTEGRATION TEST WORLD"),
)
.await
}
#[fuchsia::test]
async fn full_transfer(run: usize) -> Result<(), Error> {
let mut node_id_gen = NodeIdGenerator::new("full_transfer", run);
// Two nodes connected
// A creates a channel, passes both ends to B to do an echo request
let a = Overnet::new(&mut node_id_gen)?;
let b = Overnet::new(&mut node_id_gen)?;
super::connect(&a, &b)?;
run_triangle_echo_test(
b.node_id(),
b.node_id(),
a,
vec![],
vec![b],
Some("HELLO INTEGRATION TEST WORLD"),
)
.await
}
#[fuchsia::test]
async fn forwarded_twice_to_separate_nodes(run: usize) -> Result<(), Error> {
let mut node_id_gen = NodeIdGenerator::new("forwarded_twice_to_separate_nodes", run);
// Five nodes connected in a line: A - B - C - D - E
// A creates a channel, passes either end to B & C
// B & C forward to D & E (respectively) and then do an echo request
let a = Overnet::new(&mut node_id_gen)?;
let b = Overnet::new(&mut node_id_gen)?;
let c = Overnet::new(&mut node_id_gen)?;
let d = Overnet::new(&mut node_id_gen)?;
let e = Overnet::new(&mut node_id_gen)?;
log::info!(
"NODEIDS:-> A:{:?} B:{:?} C:{:?} D:{:?} E:{:?}",
a.node_id(),
b.node_id(),
c.node_id(),
d.node_id(),
e.node_id()
);
super::connect(&a, &b)?;
super::connect(&b, &c)?;
super::connect(&c, &d)?;
super::connect(&d, &e)?;
run_triangle_echo_test(
b.node_id(),
c.node_id(),
a,
vec![(b, d.node_id()), (c, e.node_id())],
vec![d, e],
Some("HELLO INTEGRATION TEST WORLD"),
)
.await
}
#[fuchsia::test]
async fn forwarded_twice_full_transfer(run: usize) -> Result<(), Error> {
let mut node_id_gen = NodeIdGenerator::new("forwarded_twice_full_transfer", run);
// Four nodes connected in a line: A - B - C - D
// A creates a channel, passes either end to B & C
// B & C forward to D which then does an echo request
let a = Overnet::new(&mut node_id_gen)?;
let b = Overnet::new(&mut node_id_gen)?;
let c = Overnet::new(&mut node_id_gen)?;
let d = Overnet::new(&mut node_id_gen)?;
super::connect(&a, &b)?;
super::connect(&b, &c)?;
super::connect(&c, &d)?;
run_triangle_echo_test(
b.node_id(),
c.node_id(),
a,
vec![(b, d.node_id()), (c, d.node_id())],
vec![d],
Some("HELLO INTEGRATION TEST WORLD"),
)
.await
}
////////////////////////////////////////////////////////////////////////////////
// Utilities
fn has_peer_conscript(peers: &[Peer], peer_id: NodeId) -> bool {
let is_peer_ready = |peer: &Peer| -> bool {
peer.id == peer_id.into()
&& peer.description.services.is_some()
&& peer
.description
.services
.as_ref()
.unwrap()
.iter()
.any(|name| *name == triangle::ConscriptMarker::PROTOCOL_NAME)
};
peers.iter().any(|p| is_peer_ready(p))
}
fn connect_peer(
svc: &impl ServiceConsumerProxyInterface,
node_id: NodeId,
) -> Result<triangle::ConscriptProxy, Error> {
let (s, p) = fidl::Channel::create().context("failed to create zx channel")?;
svc.connect_to_service(&mut node_id.into(), triangle::ConscriptMarker::PROTOCOL_NAME, s)
.unwrap();
let proxy = fidl::AsyncChannel::from_channel(p).context("failed to make async channel")?;
Ok(triangle::ConscriptProxy::new(proxy))
}
////////////////////////////////////////////////////////////////////////////////
// Client implementation
async fn exec_captain(
client: NodeId,
server: NodeId,
overnet: Arc<Overnet>,
text: Option<&str>,
) -> Result<(), Error> {
let svc = overnet.connect_as_service_consumer()?;
loop {
let peers = svc.list_peers().await?;
log::info!("[{:?}] Got peers: {:?}", overnet.node_id(), peers);
if has_peer_conscript(&peers, client) && has_peer_conscript(&peers, server) {
let client = connect_peer(&svc, client)?;
let server = connect_peer(&svc, server)?;
let (s, p) = fidl::Channel::create().context("failed to create zx channel")?;
log::info!("[{:?}] server/proxy hdls: {:?} {:?}", overnet.node_id(), s, p);
log::info!("[{:?}] ENGAGE CONSCRIPTS", overnet.node_id());
server.serve(ServerEnd::new(s))?;
let response = client
.issue(ClientEnd::new(p), text)
.await
.context(format!("awaiting issue response for captain {:?}", overnet.node_id()))?;
log::info!("[{:?}] Captain got response: {:?}", overnet.node_id(), response);
assert_eq!(response, text.map(|s| s.to_string()));
return Ok(());
}
}
}
////////////////////////////////////////////////////////////////////////////////
// Conscript implementation
async fn exec_server(node_id: NodeId, server: ServerEnd<echo::EchoMarker>) -> Result<(), Error> {
let mut stream = server.into_stream()?;
log::info!("{:?} server begins", node_id);
while let Some(echo::EchoRequest::EchoString { value, responder }) =
stream.try_next().await.context("error running echo server")?
{
log::info!("{:?} Received echo request for string {:?}", node_id, value);
responder.send(value.as_ref().map(|s| &**s)).context("error sending response")?;
log::info!("{:?} echo response sent successfully", node_id);
}
log::info!("{:?} server done", node_id);
Ok(())
}
async fn exec_client(
node_id: NodeId,
client: ClientEnd<echo::EchoMarker>,
text: Option<String>,
responder: triangle::ConscriptIssueResponder,
) -> Result<(), Error> {
log::info!("{:?} CLIENT SEND REQUEST: {:?}", node_id, text);
let response = client.into_proxy()?.echo_string(text.as_deref()).await.unwrap();
log::info!("{:?} CLIENT GETS RESPONSE: {:?}", node_id, response);
responder.send(response.as_deref())?;
Ok(())
}
async fn exec_conscript<
F: 'static + Send + Clone + Fn(triangle::ConscriptRequest) -> Fut,
Fut: Future<Output = Result<(), Error>>,
>(
overnet: Arc<Overnet>,
action: F,
) -> Result<(), Error> {
let (s, p) = fidl::Channel::create().context("failed to create zx channel")?;
let chan = fidl::AsyncChannel::from_channel(s).context("failed to make async channel")?;
let node_id = overnet.node_id();
overnet
.connect_as_service_publisher()?
.publish_service(triangle::ConscriptMarker::PROTOCOL_NAME, ClientEnd::new(p))?;
ServiceProviderRequestStream::from_channel(chan)
.map_err(Into::into)
.try_for_each_concurrent(None, |req| {
let action = action.clone();
async move {
let ServiceProviderRequest::ConnectToService { chan, info: _, control_handle: _ } =
req;
log::info!("{:?} Received service request for service", node_id);
let chan = fidl::AsyncChannel::from_channel(chan)
.context("failed to make async channel")?;
log::info!("{:?} Started service handler", node_id);
triangle::ConscriptRequestStream::from_channel(chan)
.map_err(Into::into)
.try_for_each_concurrent(None, |request| {
let action = action.clone();
async move {
log::info!("{:?} Received request {:?}", node_id, request);
action(request).await
}
})
.await?;
log::info!("{:?} Finished service handler", node_id);
Ok(())
}
})
.await
}
async fn conscript_leaf_action(
own_id: NodeId,
request: triangle::ConscriptRequest,
) -> Result<(), Error> {
log::info!("{:?} Handling it", own_id);
match request {
triangle::ConscriptRequest::Serve { iface, control_handle: _ } => {
exec_server(own_id, iface)
.await
.context(format!("running conscript server {:?}", own_id))
}
triangle::ConscriptRequest::Issue { iface, request, responder } => {
exec_client(own_id, iface, request, responder)
.await
.context(format!("running conscript client {:?}", own_id))
}
}
}
async fn conscript_forward_action(
own_id: NodeId,
node_id: NodeId,
request: triangle::ConscriptRequest,
target: triangle::ConscriptProxy,
) -> Result<(), Error> {
log::info!("{:?} Forwarding request to {:?}", own_id, node_id);
match request {
triangle::ConscriptRequest::Serve { iface, control_handle: _ } => {
target.serve(iface)?;
}
triangle::ConscriptRequest::Issue { iface, request, responder } => {
let response = target.issue(iface, request.as_deref()).await?;
log::info!("Forwarder got response: {:?}", response);
responder.send(response.as_deref())?;
}
}
Ok(())
}
////////////////////////////////////////////////////////////////////////////////
// Test driver
async fn run_triangle_echo_test(
client: NodeId,
server: NodeId,
captain: Arc<Overnet>,
forwarders: Vec<(Arc<Overnet>, NodeId)>,
conscripts: Vec<Arc<Overnet>>,
text: Option<&str>,
) -> Result<(), Error> {
let captain_node_id = captain.node_id();
let mut background_tasks = Vec::new();
for (forwarder, target_node_id) in forwarders.into_iter() {
background_tasks.push(Task::spawn(
async move {
let svc = forwarder.clone().connect_as_service_consumer()?;
loop {
let peers = svc.list_peers().await?;
log::info!(
"Waiting for forwarding target {:?}; got peers {:?}",
target_node_id,
peers
);
if has_peer_conscript(&peers, target_node_id) {
break;
}
}
let target = connect_peer(&svc, target_node_id)?;
let own_id = forwarder.node_id();
exec_conscript(forwarder, move |request| {
conscript_forward_action(own_id, target_node_id, request, target.clone())
})
.await
}
.unwrap_or_else(|e: Error| log::warn!("{:?}", e)),
));
}
for conscript in conscripts.into_iter() {
let conscript = conscript.clone();
let own_id = conscript.node_id();
background_tasks.push(Task::spawn(async move {
exec_conscript(conscript, move |request| conscript_leaf_action(own_id, request))
.await
.unwrap()
}));
}
exec_captain(client, server, captain, text).await?;
for (i, task) in background_tasks.into_iter().enumerate() {
log::info!("{:?} drop background task {}", captain_node_id, i);
drop(task);
}
log::info!("{:?} returning from test driver", captain_node_id);
Ok(())
}