| // Copyright 2022 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| use { |
| crate::{ |
| debug_data_server, |
| run_events::{RunEvent, SuiteEvents}, |
| }, |
| anyhow::Error, |
| fidl::endpoints::create_endpoints, |
| fidl_fuchsia_debugdata as fdebug, fidl_fuchsia_io as fio, |
| fidl_fuchsia_test_debug as ftest_debug, |
| fidl_fuchsia_test_manager::LaunchError, |
| fuchsia_async as fasync, |
| fuchsia_component::{client::connect_to_protocol, server::ServiceFs}, |
| fuchsia_component_test::LocalComponentHandles, |
| fuchsia_fs::{directory::open_channel_in_namespace, OpenFlags}, |
| fuchsia_zircon as zx, |
| futures::{ |
| channel::mpsc, |
| future::FutureExt, |
| pin_mut, select_biased, |
| stream::{FuturesUnordered, StreamExt, TryStreamExt}, |
| SinkExt, |
| }, |
| tracing::info, |
| }; |
| |
| /// Processor that collects debug data and serves the iterator sending data back to a test |
| /// executor. |
| pub(crate) struct DebugDataProcessor { |
| directory: DebugDataDirectory, |
| receiver: mpsc::Receiver<ftest_debug::DebugVmo>, |
| proxy_init_fn: Box<dyn FnOnce() -> Result<ftest_debug::DebugDataProcessorProxy, Error>>, |
| } |
| |
| /// Sender used to pass VMOs back to |DebugDataProcessor|. |
| #[derive(Clone)] |
| pub(crate) struct DebugDataSender { |
| sender: mpsc::Sender<ftest_debug::DebugVmo>, |
| } |
| |
| /// Directory used to store collected debug data. |
| #[derive(Debug)] |
| pub enum DebugDataDirectory { |
| /// An isolated directory is owned purely by the |DebugDataProcessor| it is given to, and will |
| /// be torn down when the |DebugDataProcessor| is terminated. |
| Isolated { parent: &'static str }, |
| /// An accumulated directory may be shared between multiple |DebugDataProcessor|s. Contents |
| /// will not be torn down. |
| Accumulating { dir: &'static str }, |
| } |
| |
| impl DebugDataProcessor { |
| const MAX_SENT_VMOS: usize = 10; |
| /// Create a new |DebugDataProcessor| for processing VMOs, and |DebugDataSender| for passing |
| /// it VMOs. |
| pub fn new(directory: DebugDataDirectory) -> (Self, DebugDataSender) { |
| let (sender, receiver) = futures::channel::mpsc::channel(Self::MAX_SENT_VMOS); |
| ( |
| Self { |
| directory, |
| receiver, |
| proxy_init_fn: Box::new(|| { |
| connect_to_protocol::<ftest_debug::DebugDataProcessorMarker>() |
| .map_err(Error::from) |
| }), |
| }, |
| DebugDataSender { sender }, |
| ) |
| } |
| |
| /// Create a new |DebugDataProcessor| for processing VMOs, |DebugDataSender| for passing |
| /// it VMOs, and the |fuchsia.test.debug.DebugDataProcessor| stream to which the processor |
| /// will connect. |
| #[cfg(test)] |
| pub(crate) fn new_for_test(directory: DebugDataDirectory) -> DebugDataForTestResult { |
| let (sender, receiver) = futures::channel::mpsc::channel(Self::MAX_SENT_VMOS); |
| let (proxy, stream) = |
| fidl::endpoints::create_proxy_and_stream::<ftest_debug::DebugDataProcessorMarker>() |
| .expect("create stream"); |
| let maybe_proxy = std::sync::Mutex::new(Some(proxy)); |
| DebugDataForTestResult { |
| processor: Self { |
| directory, |
| receiver, |
| proxy_init_fn: Box::new(move || Ok(maybe_proxy.lock().unwrap().take().unwrap())), |
| }, |
| sender: DebugDataSender { sender }, |
| stream: stream, |
| } |
| } |
| |
| /// Collect debug data produced by the corresponding |DebugDataSender|, and serve the resulting |
| /// data. In case debug data is produced, sends the event over |run_event_sender|. |
| pub async fn collect_and_serve( |
| self, |
| run_event_sender: mpsc::Sender<RunEvent>, |
| ) -> Result<(), Error> { |
| let Self { directory, receiver, proxy_init_fn } = self; |
| |
| // Avoid setting up resources in the common case where no debug data is produced. |
| let peekable_reciever = receiver.ready_chunks(Self::MAX_SENT_VMOS).peekable(); |
| pin_mut!(peekable_reciever); |
| if peekable_reciever.as_mut().peek().await.is_none() { |
| return Ok(()); |
| } |
| |
| enum MaybeOwnedDirectory { |
| Owned(tempfile::TempDir), |
| Unowned(&'static str), |
| } |
| let debug_directory = match directory { |
| DebugDataDirectory::Isolated { parent } => { |
| MaybeOwnedDirectory::Owned(tempfile::TempDir::new_in(parent)?) |
| } |
| DebugDataDirectory::Accumulating { dir } => MaybeOwnedDirectory::Unowned(dir), |
| }; |
| let debug_directory_path = match &debug_directory { |
| MaybeOwnedDirectory::Owned(tmp) => tmp.path().to_string_lossy(), |
| MaybeOwnedDirectory::Unowned(dir) => std::borrow::Cow::Borrowed(*dir), |
| }; |
| |
| let (directory_proxy, server_end) = create_endpoints::<fio::DirectoryMarker>(); |
| open_channel_in_namespace( |
| &debug_directory_path, |
| OpenFlags::RIGHT_READABLE | OpenFlags::RIGHT_WRITABLE, |
| server_end, |
| )?; |
| |
| let proxy = proxy_init_fn()?; |
| proxy.set_directory(directory_proxy)?; |
| while let Some(chunk) = peekable_reciever.next().await { |
| proxy.add_debug_vmos(chunk).await?; |
| } |
| proxy.finish().await?; |
| |
| debug_data_server::serve_directory(&debug_directory_path, run_event_sender).await?; |
| |
| if let MaybeOwnedDirectory::Owned(tmp) = debug_directory { |
| tmp.close()?; |
| } |
| Ok(()) |
| } |
| |
| /// Collect debug data produced by the corresponding |DebugDataSender|, and serve the resulting |
| /// data. In case debug data is produced, sends the event over |suite_event_sender|. |
| pub async fn collect_and_serve_for_suite( |
| self, |
| suite_event_sender: mpsc::Sender<Result<SuiteEvents, LaunchError>>, |
| ) -> Result<(), Error> { |
| let Self { directory, receiver, proxy_init_fn } = self; |
| |
| // Avoid setting up resources in the common case where no debug data is produced. |
| let peekable_reciever = receiver.ready_chunks(Self::MAX_SENT_VMOS).peekable(); |
| pin_mut!(peekable_reciever); |
| if peekable_reciever.as_mut().peek().await.is_none() { |
| return Ok(()); |
| } |
| |
| enum MaybeOwnedDirectory { |
| Owned(tempfile::TempDir), |
| Unowned(&'static str), |
| } |
| let debug_directory = match directory { |
| DebugDataDirectory::Isolated { parent } => { |
| MaybeOwnedDirectory::Owned(tempfile::TempDir::new_in(parent)?) |
| } |
| DebugDataDirectory::Accumulating { dir } => MaybeOwnedDirectory::Unowned(dir), |
| }; |
| let debug_directory_path = match &debug_directory { |
| MaybeOwnedDirectory::Owned(tmp) => tmp.path().to_string_lossy(), |
| MaybeOwnedDirectory::Unowned(dir) => std::borrow::Cow::Borrowed(*dir), |
| }; |
| |
| let (directory_proxy, server_end) = create_endpoints::<fio::DirectoryMarker>(); |
| open_channel_in_namespace( |
| &debug_directory_path, |
| OpenFlags::RIGHT_READABLE | OpenFlags::RIGHT_WRITABLE, |
| server_end, |
| )?; |
| |
| let proxy = proxy_init_fn()?; |
| proxy.set_directory(directory_proxy)?; |
| while let Some(chunk) = peekable_reciever.next().await { |
| proxy.add_debug_vmos(chunk).await?; |
| } |
| proxy.finish().await?; |
| |
| debug_data_server::serve_directory_for_suite(&debug_directory_path, suite_event_sender) |
| .await?; |
| |
| if let MaybeOwnedDirectory::Owned(tmp) = debug_directory { |
| tmp.close()?; |
| } |
| Ok(()) |
| } |
| } |
| |
| #[cfg(test)] |
| pub(crate) struct DebugDataForTestResult { |
| pub processor: DebugDataProcessor, |
| pub sender: DebugDataSender, |
| pub stream: ftest_debug::DebugDataProcessorRequestStream, |
| } |
| |
| /// Serve |fuchsia.debugdata.Publisher| as a RealmBuilder mock. Collected VMOs are sent over |
| /// |debug_data_sender| for processing. |started_event| is signalled once the mock is ready |
| /// to serve requests. |
| // TODO(https://fxbug.dev/42056523): |started_event| is added as part of a synchronization mechanism to |
| // work around cases when a component is destroyed before starting, even though there is a |
| // request. Remove when no longer needed. |
| pub(crate) async fn serve_debug_data_publisher( |
| handles: LocalComponentHandles, |
| test_url: String, |
| debug_data_sender: DebugDataSender, |
| started_event: async_utils::event::Event, |
| ) -> Result<(), Error> { |
| let mut fs = ServiceFs::new(); |
| // Register a notifier so that this mock isn't immediately killed - it needs to drain |
| // debug data. |
| let stop_recv = handles.register_stop_notifier().await; |
| started_event.signal(); |
| |
| fs.dir("svc").add_fidl_service(|stream: fdebug::PublisherRequestStream| stream); |
| fs.serve_connection(handles.outgoing_dir)?; |
| |
| let mut drain_tasks = FuturesUnordered::new(); |
| drain_tasks.push(fasync::Task::spawn(async move { |
| let _ = stop_recv.await; |
| Ok(()) |
| })); |
| |
| let mut got_requests = false; |
| |
| loop { |
| select_biased! { |
| maybe_stream = fs.next().fuse() => match maybe_stream { |
| None => { |
| if !got_requests { |
| info!("Got no debug data requests for {}", test_url); |
| } |
| return drain_tasks.try_collect::<()>().await; |
| }, |
| Some(stream) => { |
| got_requests = true; |
| let sender_clone = debug_data_sender.clone(); |
| let url_clone = test_url.clone(); |
| drain_tasks.push(fasync::Task::spawn(async move { |
| serve_publisher(stream, &url_clone, sender_clone).await?; |
| Ok(()) |
| })); |
| } |
| }, |
| // Poll for completion of both stop_recv and any futures serving the publisher |
| // together. This allows us to accept any new serve requests even if stop is |
| // called, so long as at least one other request is still being served. |
| maybe_result = drain_tasks.next() => match maybe_result { |
| Some(result) => { |
| result?; |
| }, |
| None => { |
| if !got_requests { |
| info!("Got no debug data requests for {}", test_url); |
| } |
| return Ok(()); |
| } |
| }, |
| }; |
| } |
| } |
| |
| async fn serve_publisher( |
| stream: fdebug::PublisherRequestStream, |
| test_url: &str, |
| debug_data_sender: DebugDataSender, |
| ) -> Result<(), Error> { |
| stream |
| .map(Ok) |
| .try_for_each_concurrent(None, |req| { |
| let test_url = test_url.to_string(); |
| let mut sender_clone = debug_data_sender.clone(); |
| async move { |
| let fdebug::PublisherRequest::Publish { data_sink, data, vmo_token, .. } = req?; |
| // Wait for the token handle to close before sending the VMO for processing. |
| // This allows the client to continue modifying the VMO after it has sent it. |
| // See |fuchsia.debugdata.Publisher| protocol for details. |
| fasync::OnSignals::new(&vmo_token, zx::Signals::EVENTPAIR_PEER_CLOSED).await?; |
| let _ = sender_clone |
| .sender |
| .send(ftest_debug::DebugVmo { test_url, data_sink, vmo: data }) |
| .await; |
| Ok(()) |
| } |
| }) |
| .await |
| } |
| |
| #[cfg(test)] |
| mod test { |
| use { |
| super::*, |
| crate::{run_events::RunEventPayload, run_events::SuiteEventPayload, utilities::stream_fn}, |
| fidl::endpoints::create_proxy_and_stream, |
| fuchsia_component_test::{ |
| Capability, ChildOptions, RealmBuilder, RealmInstance, Ref, Route, |
| }, |
| futures::TryFutureExt, |
| maplit::hashset, |
| std::{collections::HashSet, task::Poll}, |
| test_diagnostics::collect_string_from_socket, |
| }; |
| |
| const VMO_SIZE: u64 = 4096; |
| |
| /// Runs a fake test processor implementation that, for each VMO received, creates |
| /// a new file called "data_sink" and writes the test_url inside it. |
| /// |debug_vmo_recevied_sender| is a synchronization hack that sends one message for |
| /// each vmo received. It is a workaround for the |
| /// Started/Destroyed/CapabilityRequested events being delivered out of order. |
| /// See https://fxbug.dev/42156498. |
| async fn run_test_processor( |
| mut stream: ftest_debug::DebugDataProcessorRequestStream, |
| mut debug_vmo_recevied_sender: mpsc::Sender<()>, |
| ) { |
| let req = stream.try_next().await.expect("get first request").unwrap(); |
| let dir = match req { |
| ftest_debug::DebugDataProcessorRequest::SetDirectory { directory, .. } => { |
| directory.into_proxy().expect("convert to proxy") |
| } |
| other => panic!("First request should be SetDirectory but got {:?}", other), |
| }; |
| |
| let mut collected_vmos = vec![]; |
| let mut finish_responder = None; |
| while let Some(req) = stream.try_next().await.expect("get request") { |
| match req { |
| ftest_debug::DebugDataProcessorRequest::SetDirectory { .. } => { |
| panic!("Set directory called twice") |
| } |
| ftest_debug::DebugDataProcessorRequest::AddDebugVmos { |
| mut vmos, |
| responder, |
| .. |
| } => { |
| let num_vmos = vmos.len(); |
| collected_vmos.append(&mut vmos); |
| let _ = responder.send(); |
| for _ in 0..num_vmos { |
| let _ = debug_vmo_recevied_sender.send(()).await; |
| } |
| } |
| ftest_debug::DebugDataProcessorRequest::Finish { responder, .. } => { |
| finish_responder = Some(responder); |
| break; |
| } |
| } |
| } |
| |
| for ftest_debug::DebugVmo { data_sink, test_url, .. } in collected_vmos { |
| let file = fuchsia_fs::directory::open_file_no_describe( |
| &dir, |
| &data_sink, |
| OpenFlags::CREATE | OpenFlags::RIGHT_WRITABLE, |
| ) |
| .expect("open file"); |
| fuchsia_fs::file::write(&file, &test_url).await.expect("write file"); |
| } |
| finish_responder.unwrap().send().unwrap(); |
| } |
| |
| async fn construct_test_realm( |
| sender: DebugDataSender, |
| test_url: &'static str, |
| ) -> Result<RealmInstance, Error> { |
| let builder = RealmBuilder::new().await?; |
| |
| let processor = builder |
| .add_local_child( |
| "processor", |
| move |handles| { |
| Box::pin(serve_debug_data_publisher( |
| handles, |
| test_url.to_string(), |
| sender.clone(), |
| async_utils::event::Event::new(), |
| )) |
| }, |
| ChildOptions::new().eager(), |
| ) |
| .await?; |
| builder |
| .add_route( |
| Route::new() |
| .capability(Capability::protocol::<fdebug::PublisherMarker>()) |
| .from(&processor) |
| .to(Ref::parent()), |
| ) |
| .await?; |
| |
| let instance = builder.build().await?; |
| Ok(instance) |
| } |
| |
| fn isolated_dir() -> DebugDataDirectory { |
| DebugDataDirectory::Isolated { parent: "/tmp" } |
| } |
| |
| #[fuchsia::test] |
| async fn serve_no_requests() { |
| const TEST_URL: &str = "test-url"; |
| let DebugDataForTestResult { processor, sender, stream } = |
| DebugDataProcessor::new_for_test(isolated_dir()); |
| let test_realm = construct_test_realm(sender, TEST_URL).await.expect("build test realm"); |
| test_realm.destroy().await.expect("destroy test realm"); |
| |
| let (event_sender, event_recv) = mpsc::channel(1); |
| processor.collect_and_serve(event_sender).await.unwrap(); |
| |
| assert!(stream.collect::<Vec<_>>().await.is_empty()); |
| assert!(event_recv.collect::<Vec<_>>().await.is_empty()); |
| } |
| |
| #[fuchsia::test] |
| async fn serve_for_suite_no_requests() { |
| const TEST_URL: &str = "test-url"; |
| let DebugDataForTestResult { processor, sender, stream } = |
| DebugDataProcessor::new_for_test(isolated_dir()); |
| let test_realm = construct_test_realm(sender, TEST_URL).await.expect("build test realm"); |
| test_realm.destroy().await.expect("destroy test realm"); |
| |
| let (event_sender, event_recv) = mpsc::channel(1); |
| processor.collect_and_serve_for_suite(event_sender).await.unwrap(); |
| |
| assert!(stream.collect::<Vec<_>>().await.is_empty()); |
| assert!(event_recv.collect::<Vec<_>>().await.is_empty()); |
| } |
| |
| #[fuchsia::test] |
| async fn serve_single_client() { |
| const TEST_URL: &str = "test-url"; |
| let DebugDataForTestResult { processor, sender, stream } = |
| DebugDataProcessor::new_for_test(isolated_dir()); |
| let test_realm = construct_test_realm(sender, TEST_URL).await.expect("build test realm"); |
| |
| let (vmo_request_received_send, vmo_request_received_recv) = mpsc::channel(2); |
| // Future running fuchsia.test.debug.DebugDataProcessor. |
| let processor_server_fut = run_test_processor(stream, vmo_request_received_send); |
| // Future running the 'test' (client of fuchsia.debugdata.Publisher) |
| let test_fut = async move { |
| let proxy = test_realm |
| .root |
| .connect_to_protocol_at_exposed_dir::<fdebug::PublisherMarker>() |
| .expect("connect to publisher"); |
| let vmo_1 = zx::Vmo::create(VMO_SIZE).unwrap(); |
| let (vmo_token_1, vmo_token_server_1) = zx::EventPair::create(); |
| proxy.publish("data-sink-1", vmo_1, vmo_token_server_1).expect("publish vmo"); |
| drop(vmo_token_1); |
| let vmo_2 = zx::Vmo::create(VMO_SIZE).unwrap(); |
| let (vmo_token_2, vmo_token_server_2) = zx::EventPair::create(); |
| proxy.publish("data-sink-2", vmo_2, vmo_token_server_2).expect("publish vmo"); |
| drop(vmo_token_2); |
| drop(proxy); |
| |
| vmo_request_received_recv.take(1).collect::<()>().await; |
| test_realm.destroy().await.expect("destroy test realm"); |
| }; |
| |
| let (event_sender, event_recv) = mpsc::channel(10); |
| // Future that collects VMOs from the test realm and forwards |
| // them to fuchsia.debugdata.Publisher |
| let processor_fut = processor |
| .collect_and_serve(event_sender) |
| .unwrap_or_else(|e| panic!("processor failed: {:?}", e)); |
| // Future that collects produced debug artifact and asserts on contents. |
| let assertion_fut = async move { |
| let mut events: Vec<_> = event_recv.collect().await; |
| assert_eq!(events.len(), 1); |
| let RunEventPayload::DebugData(iterator) = events.pop().unwrap().into_payload(); |
| let iterator_proxy = iterator.into_proxy().unwrap(); |
| let files: HashSet<_> = stream_fn(move || iterator_proxy.get_next()) |
| .and_then(|debug_data| async move { |
| Ok(( |
| debug_data.name.unwrap(), |
| collect_string_from_socket(debug_data.socket.unwrap()) |
| .await |
| .expect("Cannot read socket"), |
| )) |
| }) |
| .try_collect() |
| .await |
| .expect("file collection"); |
| let expected = hashset! { |
| ("data-sink-1".to_string(), TEST_URL.to_string()), |
| ("data-sink-2".to_string(), TEST_URL.to_string()), |
| }; |
| assert_eq!(files, expected); |
| }; |
| |
| futures::future::join4(processor_server_fut, test_fut, processor_fut, assertion_fut).await; |
| } |
| |
| #[fuchsia::test] |
| async fn serve_for_suite_single_client() { |
| const TEST_URL: &str = "test-url"; |
| let DebugDataForTestResult { processor, sender, stream } = |
| DebugDataProcessor::new_for_test(isolated_dir()); |
| let test_realm = construct_test_realm(sender, TEST_URL).await.expect("build test realm"); |
| |
| let (vmo_request_received_send, vmo_request_received_recv) = mpsc::channel(2); |
| // Future running fuchsia.test.debug.DebugDataProcessor. |
| let processor_server_fut = run_test_processor(stream, vmo_request_received_send); |
| // Future running the 'test' (client of fuchsia.debugdata.Publisher) |
| let test_fut = async move { |
| let proxy = test_realm |
| .root |
| .connect_to_protocol_at_exposed_dir::<fdebug::PublisherMarker>() |
| .expect("connect to publisher"); |
| let vmo_1 = zx::Vmo::create(VMO_SIZE).unwrap(); |
| let (vmo_token_1, vmo_token_server_1) = zx::EventPair::create(); |
| proxy.publish("data-sink-1", vmo_1, vmo_token_server_1).expect("publish vmo"); |
| drop(vmo_token_1); |
| let vmo_2 = zx::Vmo::create(VMO_SIZE).unwrap(); |
| let (vmo_token_2, vmo_token_server_2) = zx::EventPair::create(); |
| proxy.publish("data-sink-2", vmo_2, vmo_token_server_2).expect("publish vmo"); |
| drop(vmo_token_2); |
| drop(proxy); |
| |
| vmo_request_received_recv.take(1).collect::<()>().await; |
| test_realm.destroy().await.expect("destroy test realm"); |
| }; |
| |
| let (event_sender, event_recv) = mpsc::channel(10); |
| // Future that collects VMOs from the test realm and forwards |
| // them to fuchsia.debugdata.Publisher |
| let processor_fut = processor |
| .collect_and_serve_for_suite(event_sender) |
| .unwrap_or_else(|e| panic!("processor failed: {:?}", e)); |
| // Future that collects produced debug artifact and asserts on contents. |
| let assertion_fut = async move { |
| let mut events: Vec<_> = event_recv.collect().await; |
| assert_eq!(events.len(), 1); |
| if let SuiteEventPayload::DebugData(iterator) = |
| events.pop().unwrap().unwrap().into_payload() |
| { |
| let iterator_proxy = iterator.into_proxy().unwrap(); |
| let files: HashSet<_> = stream_fn(move || iterator_proxy.get_next()) |
| .and_then(|debug_data| async move { |
| Ok(( |
| debug_data.name.unwrap(), |
| collect_string_from_socket(debug_data.socket.unwrap()) |
| .await |
| .expect("Cannot read socket"), |
| )) |
| }) |
| .try_collect() |
| .await |
| .expect("file collection"); |
| let expected = hashset! { |
| ("data-sink-1".to_string(), TEST_URL.to_string()), |
| ("data-sink-2".to_string(), TEST_URL.to_string()), |
| }; |
| assert_eq!(files, expected); |
| } else { |
| assert!(false); // Event payload was not DebugData |
| } |
| }; |
| |
| futures::future::join4(processor_server_fut, test_fut, processor_fut, assertion_fut).await; |
| } |
| |
| #[fuchsia::test] |
| async fn serve_multiple_client() { |
| const TEST_URL: &str = "test-url"; |
| let DebugDataForTestResult { processor, sender, stream } = |
| DebugDataProcessor::new_for_test(isolated_dir()); |
| let test_realm = construct_test_realm(sender, TEST_URL).await.expect("build test realm"); |
| |
| let (vmo_request_received_send, vmo_request_received_recv) = mpsc::channel(2); |
| // Future running fuchsia.test.debug.DebugDataProcessor. |
| let processor_server_fut = run_test_processor(stream, vmo_request_received_send); |
| // Future running the 'test' (client of fuchsia.debugdata.Publisher) |
| let test_fut = async move { |
| let proxy_1 = test_realm |
| .root |
| .connect_to_protocol_at_exposed_dir::<fdebug::PublisherMarker>() |
| .expect("connect to publisher"); |
| let vmo_1 = zx::Vmo::create(VMO_SIZE).unwrap(); |
| let (vmo_token_1, vmo_token_server_1) = zx::EventPair::create(); |
| proxy_1.publish("data-sink-1", vmo_1, vmo_token_server_1).expect("publish vmo"); |
| drop(vmo_token_1); |
| let proxy_2 = test_realm |
| .root |
| .connect_to_protocol_at_exposed_dir::<fdebug::PublisherMarker>() |
| .expect("connect to publisher"); |
| let vmo_2 = zx::Vmo::create(VMO_SIZE).unwrap(); |
| let (vmo_token_2, vmo_token_server_2) = zx::EventPair::create(); |
| proxy_2.publish("data-sink-2", vmo_2, vmo_token_server_2).expect("publish vmo"); |
| drop(vmo_token_2); |
| drop(proxy_1); |
| drop(proxy_2); |
| |
| vmo_request_received_recv.take(2).collect::<()>().await; |
| test_realm.destroy().await.expect("destroy test realm"); |
| }; |
| |
| let (event_sender, event_recv) = mpsc::channel(10); |
| // Future that collects VMOs from the test realm and forwards |
| // them to fuchsia.debugdata.Publisher |
| let processor_fut = processor |
| .collect_and_serve(event_sender) |
| .unwrap_or_else(|e| panic!("processor failed: {:?}", e)); |
| // Future that collects produced debug artifact and asserts on contents. |
| let assertion_fut = async move { |
| let mut events: Vec<_> = event_recv.collect().await; |
| assert_eq!(events.len(), 1); |
| let RunEventPayload::DebugData(iterator) = events.pop().unwrap().into_payload(); |
| let iterator_proxy = iterator.into_proxy().unwrap(); |
| let files: HashSet<_> = stream_fn(move || iterator_proxy.get_next()) |
| .and_then(|debug_data| async move { |
| Ok(( |
| debug_data.name.unwrap(), |
| collect_string_from_socket(debug_data.socket.unwrap()) |
| .await |
| .expect("read socket"), |
| )) |
| }) |
| .try_collect() |
| .await |
| .expect("file collection"); |
| let expected = hashset! { |
| ("data-sink-1".to_string(), TEST_URL.to_string()), |
| ("data-sink-2".to_string(), TEST_URL.to_string()), |
| }; |
| assert_eq!(files, expected); |
| }; |
| |
| futures::future::join4(processor_server_fut, test_fut, processor_fut, assertion_fut).await; |
| } |
| |
| #[fuchsia::test] |
| async fn serve_for_suite_multiple_client() { |
| const TEST_URL: &str = "test-url"; |
| let DebugDataForTestResult { processor, sender, stream } = |
| DebugDataProcessor::new_for_test(isolated_dir()); |
| let test_realm = construct_test_realm(sender, TEST_URL).await.expect("build test realm"); |
| |
| let (vmo_request_received_send, vmo_request_received_recv) = mpsc::channel(2); |
| // Future running fuchsia.test.debug.DebugDataProcessor. |
| let processor_server_fut = run_test_processor(stream, vmo_request_received_send); |
| // Future running the 'test' (client of fuchsia.debugdata.Publisher) |
| let test_fut = async move { |
| let proxy_1 = test_realm |
| .root |
| .connect_to_protocol_at_exposed_dir::<fdebug::PublisherMarker>() |
| .expect("connect to publisher"); |
| let vmo_1 = zx::Vmo::create(VMO_SIZE).unwrap(); |
| let (vmo_token_1, vmo_token_server_1) = zx::EventPair::create(); |
| proxy_1.publish("data-sink-1", vmo_1, vmo_token_server_1).expect("publish vmo"); |
| drop(vmo_token_1); |
| let proxy_2 = test_realm |
| .root |
| .connect_to_protocol_at_exposed_dir::<fdebug::PublisherMarker>() |
| .expect("connect to publisher"); |
| let vmo_2 = zx::Vmo::create(VMO_SIZE).unwrap(); |
| let (vmo_token_2, vmo_token_server_2) = zx::EventPair::create(); |
| proxy_2.publish("data-sink-2", vmo_2, vmo_token_server_2).expect("publish vmo"); |
| drop(vmo_token_2); |
| drop(proxy_1); |
| drop(proxy_2); |
| |
| vmo_request_received_recv.take(2).collect::<()>().await; |
| test_realm.destroy().await.expect("destroy test realm"); |
| }; |
| |
| let (event_sender, event_recv) = mpsc::channel(10); |
| // Future that collects VMOs from the test realm and forwards |
| // them to fuchsia.debugdata.Publisher |
| let processor_fut = processor |
| .collect_and_serve_for_suite(event_sender) |
| .unwrap_or_else(|e| panic!("processor failed: {:?}", e)); |
| // Future that collects produced debug artifact and asserts on contents. |
| let assertion_fut = async move { |
| let mut events: Vec<_> = event_recv.collect().await; |
| assert_eq!(events.len(), 1); |
| if let SuiteEventPayload::DebugData(iterator) = |
| events.pop().unwrap().unwrap().into_payload() |
| { |
| let iterator_proxy = iterator.into_proxy().unwrap(); |
| let files: HashSet<_> = stream_fn(move || iterator_proxy.get_next()) |
| .and_then(|debug_data| async move { |
| Ok(( |
| debug_data.name.unwrap(), |
| collect_string_from_socket(debug_data.socket.unwrap()) |
| .await |
| .expect("read socket"), |
| )) |
| }) |
| .try_collect() |
| .await |
| .expect("file collection"); |
| let expected = hashset! { |
| ("data-sink-1".to_string(), TEST_URL.to_string()), |
| ("data-sink-2".to_string(), TEST_URL.to_string()), |
| }; |
| assert_eq!(files, expected); |
| } else { |
| assert!(false); // Event payload was not DebugData |
| } |
| }; |
| |
| futures::future::join4(processor_server_fut, test_fut, processor_fut, assertion_fut).await; |
| } |
| |
| #[fuchsia::test] |
| fn single_publisher_connection_send_vmo_when_ready() { |
| const TEST_URL: &str = "test-url"; |
| let mut executor = fasync::TestExecutor::new(); |
| |
| let (vmo_send, vmo_recv) = mpsc::channel(5); |
| let mut vmo_chunk_stream = vmo_recv.ready_chunks(5).boxed(); |
| let (publisher_proxy, publisher_stream) = |
| create_proxy_and_stream::<fdebug::PublisherMarker>().unwrap(); |
| let mut serve_fut = |
| serve_publisher(publisher_stream, TEST_URL, DebugDataSender { sender: vmo_send }) |
| .boxed(); |
| |
| let vmo_1 = zx::Vmo::create(VMO_SIZE).unwrap(); |
| let (vmo_token_1, vmo_token_server_1) = zx::EventPair::create(); |
| let vmo_2 = zx::Vmo::create(VMO_SIZE).unwrap(); |
| let (vmo_token_2, vmo_token_server_2) = zx::EventPair::create(); |
| |
| publisher_proxy.publish("data-sink-1", vmo_1, vmo_token_server_1).expect("publish vmo"); |
| publisher_proxy.publish("data-sink-2", vmo_2, vmo_token_server_2).expect("publish vmo"); |
| drop(vmo_token_1); |
| |
| // After this point vmo 1 should be ready for processing and passed on to processor, but |
| // vmo 2 should not. |
| |
| assert!(executor.run_until_stalled(&mut serve_fut).is_pending()); |
| let mut ready_vmos = match executor.run_until_stalled(&mut vmo_chunk_stream.next().boxed()) |
| { |
| Poll::Pending => panic!("vmos should be ready"), |
| Poll::Ready(Some(vmos)) => vmos, |
| Poll::Ready(None) => panic!("stream closed prematurely"), |
| }; |
| assert_eq!(ready_vmos.len(), 1); |
| let ready_vmo = ready_vmos.pop().unwrap(); |
| assert_eq!(ready_vmo.test_url.as_str(), TEST_URL); |
| assert_eq!(ready_vmo.data_sink.as_str(), "data-sink-1"); |
| |
| // After dropping vmo token 2 it should be passed to the processor. |
| drop(vmo_token_2); |
| drop(publisher_proxy); |
| match executor.run_until_stalled(&mut serve_fut) { |
| futures::task::Poll::Ready(Ok(())) => (), |
| other => panic!("Expected poll to be ready but was {:?}", other), |
| } |
| |
| let mut ready_vmos = match executor.run_until_stalled(&mut vmo_chunk_stream.next().boxed()) |
| { |
| Poll::Pending => panic!("vmos should be ready"), |
| Poll::Ready(Some(vmos)) => vmos, |
| Poll::Ready(None) => panic!("stream closed prematurely"), |
| }; |
| assert_eq!(ready_vmos.len(), 1); |
| let ready_vmo = ready_vmos.pop().unwrap(); |
| assert_eq!(ready_vmo.test_url.as_str(), TEST_URL); |
| assert_eq!(ready_vmo.data_sink.as_str(), "data-sink-2"); |
| |
| match executor.run_until_stalled(&mut vmo_chunk_stream.next().boxed()) { |
| Poll::Pending => panic!("vmos should be ready"), |
| Poll::Ready(None) => (), |
| Poll::Ready(Some(vmos)) => panic!("Expected stream to terminate but got {:?}", vmos), |
| }; |
| } |
| } |