| // Copyright 2021 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| use { |
| crate::constants::TEST_ROOT_REALM_NAME, |
| anyhow::Error, |
| async_trait::async_trait, |
| diagnostics_bridge::ArchiveReaderManager, |
| diagnostics_data::{Data, LogsData}, |
| diagnostics_reader as reader, |
| fidl::endpoints::{RequestStream, ServerEnd}, |
| fidl_fuchsia_developer_remotecontrol::StreamError, |
| fidl_fuchsia_diagnostics::{ |
| ArchiveAccessorProxy, ArchiveAccessorRequest, ArchiveAccessorRequestStream, |
| BatchIteratorMarker, BatchIteratorProxy, BatchIteratorRequest, BatchIteratorRequestStream, |
| ClientSelectorConfiguration, ComponentSelector, DataType, Format, FormattedContent, |
| Selector, SelectorArgument, StreamMode, StreamParameters, StringSelector, |
| }, |
| fidl_fuchsia_mem as fmem, fuchsia_async as fasync, fuchsia_zircon as zx, |
| futures::{future::Either, stream::FusedStream, FutureExt, TryStreamExt}, |
| serde_json::{self, Value as JsonValue}, |
| std::ops::Deref, |
| tracing::{error, warn}, |
| }; |
| |
| pub struct IsolatedLogsProvider { |
| accessor: ArchiveAccessorProxy, |
| } |
| |
| impl IsolatedLogsProvider { |
| pub fn new(accessor: ArchiveAccessorProxy) -> Self { |
| Self { accessor } |
| } |
| |
| pub fn start_streaming_logs( |
| &self, |
| iterator: ServerEnd<BatchIteratorMarker>, |
| ) -> Result<(), StreamError> { |
| let stream_parameters = StreamParameters { |
| stream_mode: Some(StreamMode::SnapshotThenSubscribe), |
| data_type: Some(DataType::Logs), |
| format: Some(Format::Json), |
| client_selector_configuration: Some(ClientSelectorConfiguration::SelectAll(true)), |
| ..StreamParameters::EMPTY |
| }; |
| self.accessor.stream_diagnostics(stream_parameters, iterator).map_err(|err| { |
| warn!(%err, "Failed to subscribe to isolated logs"); |
| StreamError::SetupSubscriptionFailed |
| })?; |
| Ok(()) |
| } |
| } |
| |
| impl Deref for IsolatedLogsProvider { |
| type Target = ArchiveAccessorProxy; |
| |
| fn deref(&self) -> &Self::Target { |
| &self.accessor |
| } |
| } |
| |
| #[async_trait] |
| impl ArchiveReaderManager for IsolatedLogsProvider { |
| type Error = reader::Error; |
| |
| async fn snapshot<D: diagnostics_data::DiagnosticsData + 'static>( |
| &self, |
| ) -> Result<Vec<Data<D>>, StreamError> { |
| unimplemented!("This functionality is not yet needed."); |
| } |
| |
| fn start_log_stream( |
| &mut self, |
| ) -> Result< |
| Box<dyn FusedStream<Item = Result<LogsData, Self::Error>> + Unpin + Send>, |
| StreamError, |
| > { |
| let (proxy, batch_iterator_server) = fidl::endpoints::create_proxy::<BatchIteratorMarker>() |
| .map_err(|err| { |
| warn!(%err, "Fidl error while creating proxy"); |
| StreamError::GenericError |
| })?; |
| self.start_streaming_logs(batch_iterator_server)?; |
| let subscription = reader::Subscription::new(proxy); |
| Ok(Box::new(subscription)) |
| } |
| } |
| |
| /// Runs an ArchiveAccessor to which test components connect. |
| /// This will append the test realm name to all selectors coming from the component. |
| pub async fn run_intermediary_archive_accessor( |
| embedded_archive_accessor: ArchiveAccessorProxy, |
| mut stream: ArchiveAccessorRequestStream, |
| ) -> Result<(), Error> { |
| while let Some(ArchiveAccessorRequest::StreamDiagnostics { |
| result_stream, |
| stream_parameters, |
| control_handle: _, |
| }) = stream.try_next().await? |
| { |
| let (iterator, server_end) = fidl::endpoints::create_proxy::<BatchIteratorMarker>()?; |
| let stream_parameters = scope_stream_parameters(stream_parameters); |
| embedded_archive_accessor.stream_diagnostics(stream_parameters, server_end)?; |
| |
| fasync::Task::spawn(async move { |
| interpose_batch_iterator_responses(iterator, result_stream).await.unwrap_or_else(|e| { |
| error!("Failed running batch iterator: {:?}", e); |
| }) |
| }) |
| .detach(); |
| } |
| Ok(()) |
| } |
| |
| /// Forward BatchIterator#GetNext requests to the actual archivist and remove the `test_root` |
| /// prefixes from the monikers in the response. |
| async fn interpose_batch_iterator_responses( |
| iterator: BatchIteratorProxy, |
| client_server_end: ServerEnd<BatchIteratorMarker>, |
| ) -> Result<(), Error> { |
| let request_stream = client_server_end.into_stream()?; |
| let (serve_inner, terminated) = request_stream.into_inner(); |
| let serve_inner_clone = serve_inner.clone(); |
| let mut channel_closed_fut = |
| fasync::OnSignals::new(serve_inner_clone.channel(), zx::Signals::CHANNEL_PEER_CLOSED) |
| .fuse(); |
| let mut request_stream = BatchIteratorRequestStream::from_inner(serve_inner, terminated); |
| |
| while let Some(BatchIteratorRequest::GetNext { responder }) = request_stream.try_next().await? { |
| let result = |
| match futures::future::select(iterator.get_next(), &mut channel_closed_fut).await { |
| Either::Left((result, _)) => result?, |
| Either::Right(_) => break, |
| }; |
| match result { |
| Err(e) => responder.send(&mut Err(e))?, |
| Ok(batch) => { |
| let batch = batch |
| .into_iter() |
| .map(|f| scope_formatted_content(f)) |
| .collect::<Result<Vec<_>, _>>()?; |
| responder.send(&mut Ok(batch))?; |
| } |
| } |
| } |
| |
| Ok(()) |
| } |
| |
| fn scope_formatted_content(content: FormattedContent) -> Result<FormattedContent, Error> { |
| match content { |
| FormattedContent::Json(data) => { |
| let json_value = load_json_value(data)?; |
| let value = match json_value { |
| value @ JsonValue::Object(_) => scope_formatted_content_json(value), |
| JsonValue::Array(objects) => { |
| let objects = objects |
| .into_iter() |
| .map(|object| scope_formatted_content_json(object)) |
| .collect::<Vec<_>>(); |
| JsonValue::Array(objects) |
| } |
| _ => unreachable!("ArchiveAccessor only returns top-level objects and arrays"), |
| }; |
| let buffer = write_json_value(value)?; |
| Ok(FormattedContent::Json(buffer)) |
| } |
| // This should never be reached as the Archivist is not serving Text at the moment. When it |
| // does we can decide how to parse it to scope this, but for now, not scoping. |
| data @ FormattedContent::Text(_) => Ok(data), |
| other => Ok(other), |
| } |
| } |
| |
| fn scope_formatted_content_json(mut object: JsonValue) -> JsonValue { |
| object.get_mut("moniker").map(|moniker| match moniker { |
| JsonValue::String(ref mut moniker) => { |
| if let Some(updated) = moniker.strip_prefix(&format!("{}/", TEST_ROOT_REALM_NAME)) { |
| *moniker = updated.to_string(); |
| } |
| } |
| _ => unreachable!("ArchiveAccessor always returns a moniker in the payload"), |
| }); |
| object |
| } |
| |
| fn load_json_value(data: fmem::Buffer) -> Result<JsonValue, Error> { |
| let mut buf = vec![0; data.size as usize]; |
| data.vmo.read(&mut buf, 0)?; |
| let hierarchy_json = std::str::from_utf8(&buf)?; |
| let result = serde_json::from_str(&hierarchy_json)?; |
| Ok(result) |
| } |
| |
| fn write_json_value(value: JsonValue) -> Result<fmem::Buffer, Error> { |
| let content = value.to_string(); |
| let size = content.len() as u64; |
| let vmo = zx::Vmo::create(size)?; |
| vmo.write(content.as_bytes(), 0)?; |
| Ok(fmem::Buffer { vmo, size }) |
| } |
| |
| fn scope_stream_parameters(stream_parameters: StreamParameters) -> StreamParameters { |
| StreamParameters { |
| client_selector_configuration: stream_parameters.client_selector_configuration.map( |
| |config| match config { |
| ClientSelectorConfiguration::Selectors(selectors) => { |
| ClientSelectorConfiguration::Selectors( |
| selectors |
| .into_iter() |
| .map(|selector_argument| scope_selector_argument(selector_argument)) |
| .collect::<Vec<_>>(), |
| ) |
| } |
| other => other, |
| }, |
| ), |
| ..stream_parameters |
| } |
| } |
| |
| fn scope_selector_argument(selector_argument: SelectorArgument) -> SelectorArgument { |
| match selector_argument { |
| SelectorArgument::StructuredSelector(selector) => { |
| SelectorArgument::StructuredSelector(Selector { |
| tree_selector: selector.tree_selector, |
| component_selector: selector.component_selector.map(|component_selector| { |
| ComponentSelector { |
| moniker_segments: component_selector.moniker_segments.map( |
| |mut segments| { |
| let mut moniker_segments = vec![StringSelector::ExactMatch( |
| TEST_ROOT_REALM_NAME.to_string(), |
| )]; |
| moniker_segments.append(&mut segments); |
| moniker_segments |
| }, |
| ), |
| ..component_selector |
| } |
| }), |
| ..selector |
| }) |
| } |
| SelectorArgument::RawSelector(selector) => { |
| SelectorArgument::RawSelector(format!("{}/{}", TEST_ROOT_REALM_NAME, selector)) |
| } |
| other => other, |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use { |
| super::*, |
| diagnostics_data::Data, |
| diagnostics_hierarchy::hierarchy, |
| fidl_fuchsia_diagnostics::ArchiveAccessorMarker, |
| futures::{channel::mpsc, FutureExt, SinkExt, StreamExt}, |
| }; |
| |
| #[fuchsia::test] |
| async fn verify_archive_accessor_server_scopes_monikers() { |
| let (sender, receiver) = mpsc::channel(1); |
| let embedded_accessor = spawn_fake_archive_accessor(sender); |
| let (test_accessor, stream) = |
| fidl::endpoints::create_proxy_and_stream::<ArchiveAccessorMarker>() |
| .expect("create our archive accessor proxy"); |
| fasync::Task::spawn(async move { |
| run_intermediary_archive_accessor(embedded_accessor, stream) |
| .await |
| .expect("ran proxyed archive accessor"); |
| }) |
| .detach(); |
| |
| let (iterator, server_end) = fidl::endpoints::create_proxy::<BatchIteratorMarker>() |
| .expect("create batch iterator proxy"); |
| |
| let stream_parameters = StreamParameters { |
| client_selector_configuration: Some(ClientSelectorConfiguration::Selectors(vec![ |
| SelectorArgument::RawSelector("foo/bar/component".to_string()), |
| SelectorArgument::StructuredSelector(Selector { |
| component_selector: Some(ComponentSelector { |
| moniker_segments: Some(vec![StringSelector::StringPattern( |
| "foo".to_string(), |
| )]), |
| ..ComponentSelector::EMPTY |
| }), |
| ..Selector::EMPTY |
| }), |
| ])), |
| ..StreamParameters::EMPTY |
| }; |
| test_accessor |
| .stream_diagnostics(stream_parameters, server_end) |
| .expect("stream diagnostics ok"); |
| |
| // Verify that the selectors received by the embedded archivist are scoped to the test root. |
| let mut params_stream = receiver.boxed(); |
| let params = params_stream.next().await.expect("got params"); |
| assert_eq!( |
| params, |
| StreamParameters { |
| client_selector_configuration: Some(ClientSelectorConfiguration::Selectors(vec![ |
| SelectorArgument::RawSelector("test_root/foo/bar/component".to_string()), |
| SelectorArgument::StructuredSelector(Selector { |
| component_selector: Some(ComponentSelector { |
| moniker_segments: Some(vec![ |
| StringSelector::ExactMatch("test_root".to_string()), |
| StringSelector::StringPattern("foo".to_string()), |
| ]), |
| ..ComponentSelector::EMPTY |
| }), |
| ..Selector::EMPTY |
| }), |
| ])), |
| ..StreamParameters::EMPTY |
| } |
| ); |
| |
| // Verify that none of the monikers received from the batch contain the `test_root` prefix. |
| let batch = iterator.get_next().await.expect("got batch").expect("batch is not an error"); |
| let batch = batch |
| .into_iter() |
| .map(|content| match content { |
| FormattedContent::Json(data) => { |
| let json_value = load_json_value(data).expect("got json value"); |
| json_value.get("moniker").unwrap().as_str().unwrap().to_string() |
| } |
| _ => unreachable!("our fake accessor just sends json"), |
| }) |
| .collect::<Vec<_>>(); |
| |
| assert_eq!( |
| batch, |
| vec!["foo/bar/component".to_string(), "baz/qux/other_component".to_string(),] |
| ); |
| } |
| |
| fn spawn_fake_archive_accessor( |
| mut seen_stream_parameters: mpsc::Sender<StreamParameters>, |
| ) -> ArchiveAccessorProxy { |
| let (proxy, mut stream) = |
| fidl::endpoints::create_proxy_and_stream::<ArchiveAccessorMarker>() |
| .expect("create proxy"); |
| fasync::Task::spawn(async move { |
| while let Some(ArchiveAccessorRequest::StreamDiagnostics { |
| stream_parameters, |
| result_stream, |
| .. |
| }) = stream.try_next().await.expect("stream request") |
| { |
| seen_stream_parameters.send(stream_parameters).await.expect("send seen parameters"); |
| fasync::Task::spawn(async move { |
| let mut stream = result_stream.into_stream().expect("into stream"); |
| while let Some(BatchIteratorRequest::GetNext { responder }) = |
| stream.try_next().await.expect("stream request") |
| { |
| let results = vec![ |
| make_result("test_root/foo/bar/component"), |
| make_result("baz/qux/other_component"), |
| ]; |
| responder.send(&mut Ok(results)).expect("send response"); |
| } |
| }) |
| .detach(); |
| } |
| }) |
| .detach(); |
| |
| proxy |
| } |
| |
| fn make_result(moniker: &str) -> FormattedContent { |
| let result = Data::for_inspect( |
| moniker, |
| Some(hierarchy! { |
| root: { |
| x: 1u64, |
| } |
| }), |
| 0, |
| "http://component", |
| "fuchsia.inspect.Tree", |
| vec![], |
| ); |
| let json_value = serde_json::to_value(result).expect("data to json"); |
| let buffer = write_json_value(json_value).expect("json value to vmo buffer"); |
| FormattedContent::Json(buffer) |
| } |
| |
| #[test] |
| fn verify_channel_closure_propagated() { |
| // This test verifies that when the client of the mock closes it's channel, the channel to |
| // Archivist also closes, even when some other future might be in flight. In case the |
| // closure is not propagated, the mock may remain alive and keep Archivist alive even |
| // though it's not serving a client any more. |
| |
| let mut executor = fasync::TestExecutor::new().expect("create executor"); |
| |
| let (client_proxy, client_server) = |
| fidl::endpoints::create_proxy::<BatchIteratorMarker>().unwrap(); |
| let (interpose_client, interpose_server) = |
| fidl::endpoints::create_proxy::<BatchIteratorMarker>().unwrap(); |
| |
| let interpose_fut = interpose_batch_iterator_responses(interpose_client, client_server); |
| // "Archivist" server just waits for channel closed. This simulates GetNext not having |
| // new data |
| let server_fut = |
| fasync::OnSignals::new(&interpose_server, zx::Signals::CHANNEL_PEER_CLOSED); |
| let mut client_fut = client_proxy.get_next().boxed(); |
| |
| let mut join_fut = futures::future::join(interpose_fut, server_fut).boxed(); |
| |
| // first, poll client and server futs to ensure client request is received. |
| assert!(executor.run_until_stalled(&mut client_fut).is_pending()); |
| assert!(executor.run_until_stalled(&mut join_fut).is_pending()); |
| assert!(executor.run_until_stalled(&mut client_fut).is_pending()); |
| |
| // close channel on client side. |
| drop(client_fut); |
| drop(client_proxy); |
| |
| // server futs should now complete. |
| assert!(executor.run_until_stalled(&mut join_fut).is_ready()); |
| } |
| } |