| // Copyright 2022 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| use crate::message::{PublisherRequestMessage, VmoMessage}; |
| use anyhow::Error; |
| use fidl_fuchsia_debugdata as fdebug; |
| use fuchsia_async as fasync; |
| use fuchsia_zircon as zx; |
| use futures::{ |
| channel::mpsc, |
| stream::{Stream, StreamExt, TryStreamExt}, |
| SinkExt, TryFutureExt, |
| }; |
| use tracing::warn; |
| |
| /// Serve the |fuchsia.debugdata.Publisher| protocol for each connection received over |
| /// |request_stream|. VMOs ready to be processed are output via |vmo_sender|. |
| pub async fn serve_publisher_requests<S: Stream<Item = PublisherRequestMessage>>( |
| request_stream: S, |
| vmo_sender: mpsc::Sender<VmoMessage>, |
| ) { |
| request_stream |
| .for_each_concurrent(None, move |request| |
| // failure serving one connection shouldn't terminate all of them. |
| serve_publisher(request, vmo_sender.clone()) |
| .unwrap_or_else(|e| warn!("Error serving debug data publisher: {:?}", e))) |
| .await |
| } |
| |
| async fn serve_publisher( |
| request_message: PublisherRequestMessage, |
| vmo_sender: mpsc::Sender<VmoMessage>, |
| ) -> Result<(), Error> { |
| let PublisherRequestMessage { test_url, request } = request_message; |
| let request_stream = request.into_stream()?; |
| request_stream |
| .map_err(Into::<Error>::into) |
| .try_for_each_concurrent(None, move |request| { |
| handle_publish_request(request, test_url.clone(), vmo_sender.clone()) |
| }) |
| .await |
| } |
| |
| async fn handle_publish_request( |
| request: fdebug::PublisherRequest, |
| test_url: String, |
| mut vmo_sender: mpsc::Sender<VmoMessage>, |
| ) -> Result<(), Error> { |
| match request { |
| fdebug::PublisherRequest::Publish { data_sink, data, vmo_token, .. } => { |
| // Wait for the token handle to close before sending the VMO for processing. |
| // This allows the client to continue modifying the VMO after it has sent it. |
| // See |fuchsia.debugdata.Publisher| protocol for details. |
| fasync::OnSignals::new(&vmo_token, zx::Signals::EVENTPAIR_CLOSED).await?; |
| vmo_sender.send(VmoMessage { test_url, data_sink, vmo: data }).await?; |
| Ok(()) |
| } |
| } |
| } |
| |
| #[cfg(test)] |
| mod test { |
| use super::*; |
| use fidl::{endpoints::create_proxy, AsHandleRef}; |
| use futures::FutureExt; |
| |
| fn create_proxy_and_message( |
| test_url: &str, |
| ) -> (fdebug::PublisherProxy, PublisherRequestMessage) { |
| let (proxy, request) = create_proxy::<fdebug::PublisherMarker>().unwrap(); |
| (proxy, PublisherRequestMessage { test_url: test_url.to_string(), request }) |
| } |
| |
| #[fuchsia::test] |
| async fn single_publisher_connection() { |
| let (proxy, message) = create_proxy_and_message("test-url"); |
| let (vmo_send, vmo_recv) = mpsc::channel(5); |
| let serve_fut = serve_publisher(message, vmo_send); |
| let test_fut = async move { |
| let (vmo_token, vmo_token_server) = zx::EventPair::create().unwrap(); |
| let vmo = zx::Vmo::create(1024).unwrap(); |
| let vmo_koid = vmo.get_koid().unwrap(); |
| proxy.publish("data-sink", vmo, vmo_token_server).expect("publish vmo"); |
| drop(vmo_token); |
| drop(proxy); |
| let vmo_messages: Vec<_> = vmo_recv.collect().await; |
| assert_eq!(vmo_messages.len(), 1); |
| assert_eq!(vmo_messages[0].test_url, "test-url"); |
| assert_eq!(vmo_messages[0].data_sink, "data-sink"); |
| assert_eq!(vmo_messages[0].vmo.get_koid().unwrap(), vmo_koid); |
| }; |
| let (result, ()) = futures::future::join(serve_fut, test_fut).await; |
| result.expect("serve failed"); |
| } |
| |
| #[fuchsia::test] |
| fn single_publisher_connection_send_vmo_when_ready() { |
| let mut executor = fasync::TestExecutor::new().unwrap(); |
| |
| let (proxy, message) = create_proxy_and_message("test-url"); |
| let (vmo_send, mut vmo_recv) = mpsc::channel(5); |
| let mut serve_fut = serve_publisher(message, vmo_send).boxed(); |
| |
| let (vmo_token, vmo_token_server) = zx::EventPair::create().unwrap(); |
| let vmo = zx::Vmo::create(1024).unwrap(); |
| let vmo_koid = vmo.get_koid().unwrap(); |
| |
| // VMO should not be sent for processing until the vmo_token channel is closed. |
| proxy.publish("data-sink", vmo, vmo_token_server).expect("publish vmo"); |
| assert!(executor.run_until_stalled(&mut serve_fut).is_pending()); |
| assert!(vmo_recv.try_next().is_err()); |
| |
| // After closing vmo_token, the VMO is sent for processing. |
| drop(vmo_token); |
| assert!(executor.run_until_stalled(&mut serve_fut).is_pending()); |
| let vmo_message = vmo_recv.try_next().expect("receive vmo").unwrap(); |
| assert_eq!(vmo_message.test_url, "test-url"); |
| assert_eq!(vmo_message.data_sink, "data-sink"); |
| assert_eq!(vmo_message.vmo.get_koid().unwrap(), vmo_koid); |
| |
| drop(proxy); |
| match executor.run_until_stalled(&mut serve_fut) { |
| futures::task::Poll::Ready(Ok(())) => (), |
| other => panic!("Expected server to complete succesffully but got {:?}", other), |
| } |
| } |
| |
| #[fuchsia::test] |
| async fn handle_concurrent_publisher_connections() { |
| const CONCURRENT_CLIENTS: usize = 10; |
| let (proxies, messages): (Vec<_>, Vec<_>) = (0..CONCURRENT_CLIENTS) |
| .map(|i| create_proxy_and_message(&format!("test-url-{:?}", i))) |
| .unzip(); |
| |
| let (vmo_send, mut vmo_recv) = mpsc::channel(5); |
| let serve_fut = serve_publisher_requests(futures::stream::iter(messages), vmo_send); |
| let test_fut = async move { |
| // Send one VMO for each proxy. |
| let first_vmo_tokens: Vec<_> = proxies |
| .iter() |
| .map(|proxy| { |
| let (vmo_token, vmo_token_server) = zx::EventPair::create().unwrap(); |
| let vmo = zx::Vmo::create(1024).unwrap(); |
| proxy.publish("data-sink-1", vmo, vmo_token_server).expect("publish vmo"); |
| vmo_token |
| }) |
| .collect(); |
| // Send a second VMO for each proxy. |
| let second_vmo_tokens: Vec<_> = proxies |
| .iter() |
| .map(|proxy| { |
| let (vmo_token, vmo_token_server) = zx::EventPair::create().unwrap(); |
| let vmo = zx::Vmo::create(1024).unwrap(); |
| proxy.publish("data-sink-2", vmo, vmo_token_server).expect("publish vmo"); |
| vmo_token |
| }) |
| .collect(); |
| // After dropping the second VMOs, the messages for those VMOS only should be ready. |
| drop(second_vmo_tokens); |
| let ready_vmos: Vec<_> = vmo_recv.by_ref().take(CONCURRENT_CLIENTS).collect().await; |
| assert!(ready_vmos.iter().all(|message| message.data_sink == "data-sink-2")); |
| assert!(vmo_recv.try_next().is_err()); |
| // After dropping the first VMOs, the remaining messages are ready. |
| drop(first_vmo_tokens); |
| let ready_vmos: Vec<_> = vmo_recv.by_ref().take(CONCURRENT_CLIENTS).collect().await; |
| assert!(ready_vmos.iter().all(|message| message.data_sink == "data-sink-1")); |
| assert!(vmo_recv.try_next().is_err()); |
| // dropping the proxies terminates the stream. |
| drop(proxies); |
| assert!(vmo_recv.next().await.is_none()); |
| }; |
| let ((), ()) = futures::future::join(serve_fut, test_fut).await; |
| } |
| |
| #[fuchsia::test] |
| async fn subsequent_requests_handled_when_vmo_not_ready() { |
| // this test verifies that when a VMO is not immediately ready to be processed, |
| // this does not stop subsequent requests from being processed |
| let (proxy, message) = create_proxy_and_message("test-url-2"); |
| let (vmo_send, mut vmo_recv) = mpsc::channel(5); |
| let serve_fut = serve_publisher(message, vmo_send); |
| let test_fut = async move { |
| let (vmo_token_1, vmo_token_server_1) = zx::EventPair::create().unwrap(); |
| let vmo_1 = zx::Vmo::create(1024).unwrap(); |
| let vmo_1_koid = vmo_1.get_koid().unwrap(); |
| |
| proxy.publish("data-sink-1", vmo_1, vmo_token_server_1).expect("publish vmo"); |
| // VMO should not be sent for processing until the vmo_token_1 channel is closed. |
| // Hold it open, and in the meantime, send another vmo |
| let (vmo_token_2, vmo_token_server_2) = zx::EventPair::create().unwrap(); |
| let vmo_2 = zx::Vmo::create(1024).unwrap(); |
| let vmo_2_koid = vmo_2.get_koid().unwrap(); |
| proxy.publish("data-sink-2", vmo_2, vmo_token_server_2).expect("publish vmo"); |
| // second VMO should become immediately available for processing once token is dropped |
| drop(vmo_token_2); |
| let next_message = vmo_recv.next().await.unwrap(); |
| assert_eq!(next_message.data_sink, "data-sink-2"); |
| assert_eq!(next_message.vmo.get_koid().unwrap(), vmo_2_koid); |
| |
| // if we never drop the token for the first VMO, it should not be sent. |
| assert!(vmo_recv.next().now_or_never().is_none()); |
| // after dropping the first token the message becomes available. |
| drop(vmo_token_1); |
| let next_message = vmo_recv.next().await.unwrap(); |
| assert_eq!(next_message.data_sink, "data-sink-1"); |
| assert_eq!(next_message.vmo.get_koid().unwrap(), vmo_1_koid); |
| }; |
| let ((), result) = futures::future::join(test_fut, serve_fut).await; |
| result.expect("serve should not fail"); |
| } |
| } |