| // Copyright 2019 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| //! This crate provides helper functions to collect test diagnostics. |
| |
| use fuchsia_async as fasync; |
| use fuchsia_sync::Mutex; |
| use futures::channel::mpsc; |
| use futures::prelude::*; |
| use futures::ready; |
| use futures::task::{Context, Poll}; |
| use std::cell::RefCell; |
| use std::io::Write; |
| use std::pin::Pin; |
| use std::sync::Arc; |
| |
| pub use crate::diagnostics::LogStream; |
| |
| mod diagnostics; |
| |
| thread_local! { |
| static BUFFER: RefCell<[u8; 1024*48]> = RefCell::new([0; 1024*48]); |
| } |
| |
| /// Future that executes a function when bytes are available on a socket. |
| pub struct SocketReadFut<'a, T, F> |
| where |
| F: FnMut(Option<&[u8]>) -> Result<T, std::io::Error> + Unpin, |
| { |
| socket: &'a mut fidl::AsyncSocket, |
| on_read_fn: F, |
| } |
| |
| impl<'a, T, F> SocketReadFut<'a, T, F> |
| where |
| F: FnMut(Option<&[u8]>) -> Result<T, std::io::Error> + Unpin, |
| { |
| pub fn new(socket: &'a mut fidl::AsyncSocket, on_read_fn: F) -> Self { |
| Self { socket, on_read_fn } |
| } |
| } |
| |
| impl<'a, T, F> Future for SocketReadFut<'a, T, F> |
| where |
| F: FnMut(Option<&[u8]>) -> Result<T, std::io::Error> + Unpin, |
| { |
| type Output = Result<T, std::io::Error>; |
| fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| BUFFER.with(|b| { |
| let mut b = b.borrow_mut(); |
| let mut_self = self.get_mut(); |
| let len = ready!(Pin::new(&mut mut_self.socket).poll_read(cx, &mut *b)?); |
| match len { |
| 0 => Poll::Ready((mut_self.on_read_fn)(None)), |
| l => Poll::Ready((mut_self.on_read_fn)(Some(&b[..l]))), |
| } |
| }) |
| } |
| } |
| |
| pub async fn collect_string_from_socket(socket: fidl::Socket) -> Result<String, anyhow::Error> { |
| let (s, mut r) = mpsc::channel(1024); |
| let task = fasync::Task::spawn(collect_and_send_string_output(socket, s)); |
| let mut ret = String::new(); |
| while let Some(content) = r.next().await { |
| ret.push_str(&content); |
| } |
| task.await?; |
| Ok(ret) |
| } |
| |
| pub async fn collect_and_send_string_output( |
| socket: fidl::Socket, |
| mut sender: mpsc::Sender<String>, |
| ) -> Result<(), anyhow::Error> { |
| let mut async_socket = fidl::AsyncSocket::from_socket(socket); |
| loop { |
| let maybe_string = SocketReadFut::new(&mut async_socket, |maybe_buf| { |
| Ok(maybe_buf.map(|buf| String::from_utf8_lossy(buf).into())) |
| }) |
| .await?; |
| match maybe_string { |
| Some(string) => sender.send(string).await?, |
| None => return Ok(()), |
| } |
| } |
| } |
| |
| /// A writer that buffers content in memory for some duration before flushing the contents to |
| /// an inner writer. After the duration elapses, any new bytes are written immediately to the |
| /// inner writer. Calling flush() also immediately flushes the contents. |
| /// Errors that occur when flushing on timeout are returned at the next write() or flush() |
| /// call. Therefore, callers should make sure to call flush before the StdoutBuffer goes out of |
| /// scope. |
| pub struct StdoutBuffer<W: Write + Send + 'static> { |
| inner: Arc<Mutex<StdoutBufferInner<W>>>, |
| _timer: fuchsia_async::Task<()>, |
| } |
| |
| impl<W: Write + Send + 'static> StdoutBuffer<W> { |
| /// Crates new StdoutBuffer and starts the timer on log buffering. |
| /// `duration`: Buffers log for this duration or till done() is called. |
| /// `sender`: Channel to send logs on. |
| /// `max_capacity`: Flush log if buffer size exceeds this value. This will not cancel the timer |
| /// and all the logs would be flushed once timer expires. |
| pub fn new(duration: std::time::Duration, writer: W, max_capacity: usize) -> Self { |
| let (inner, timer) = StdoutBufferInner::new(duration, writer, max_capacity); |
| Self { inner, _timer: timer } |
| } |
| } |
| |
| impl<W: Write + Send + 'static> Write for StdoutBuffer<W> { |
| fn flush(&mut self) -> Result<(), std::io::Error> { |
| self.inner.lock().flush() |
| } |
| |
| fn write(&mut self, bytes: &[u8]) -> Result<usize, std::io::Error> { |
| self.inner.lock().write(bytes) |
| } |
| } |
| |
| struct StdoutBufferInner<W: Write + Send + 'static> { |
| writer: W, |
| /// Whether to buffer logs or not. |
| buffer: Option<Vec<u8>>, |
| stop_buffer_error: Option<std::io::Error>, |
| max_capacity: usize, |
| } |
| |
| impl<W: Write + Send + 'static> StdoutBufferInner<W> { |
| fn new( |
| duration: std::time::Duration, |
| writer: W, |
| max_capacity: usize, |
| ) -> (Arc<Mutex<Self>>, fuchsia_async::Task<()>) { |
| let new_self = Arc::new(Mutex::new(StdoutBufferInner { |
| writer, |
| buffer: Some(Vec::with_capacity(max_capacity)), |
| stop_buffer_error: None, |
| max_capacity, |
| })); |
| |
| let timer = fuchsia_async::Timer::new(duration); |
| let log_buffer = Arc::downgrade(&new_self); |
| let f = async move { |
| timer.await; |
| if let Some(log_buffer) = log_buffer.upgrade() { |
| log_buffer.lock().stop_buffering(); |
| } |
| }; |
| |
| (new_self, fuchsia_async::Task::spawn(f)) |
| } |
| |
| fn stop_buffering(&mut self) { |
| if let Some(buf) = self.buffer.take() { |
| if let Err(e) = self.writer.write_all(&buf) { |
| self.stop_buffer_error = Some(e); |
| } |
| } |
| } |
| } |
| |
| impl<W: Write + Send + 'static> Write for StdoutBufferInner<W> { |
| fn flush(&mut self) -> Result<(), std::io::Error> { |
| self.stop_buffering(); |
| match self.stop_buffer_error.take() { |
| Some(e) => Err(e), |
| None => self.writer.flush(), |
| } |
| } |
| |
| fn write(&mut self, bytes: &[u8]) -> Result<usize, std::io::Error> { |
| if let Some(e) = self.stop_buffer_error.take() { |
| return Err(e); |
| } |
| match self.buffer.as_mut() { |
| None => self.writer.write(bytes), |
| Some(buf) if buf.len() + bytes.len() > self.max_capacity => { |
| self.writer.write_all(&buf)?; |
| buf.truncate(0); |
| self.writer.write(bytes) |
| } |
| Some(buf) => Write::write(buf, bytes), |
| } |
| } |
| } |
| |
| impl<W: Write + Send + 'static> Drop for StdoutBufferInner<W> { |
| fn drop(&mut self) { |
| let _ = self.flush(); |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use super::*; |
| use fidl::HandleBased; |
| use futures::StreamExt; |
| use pretty_assertions::assert_eq; |
| |
| async fn collect_until_eq<S: Stream<Item = String> + Unpin>(mut stream: S, target: &str) { |
| let mut accumulator = "".to_string(); |
| while accumulator.len() < target.len() { |
| match stream.next().await { |
| Some(string) => accumulator.push_str(&string), |
| None => panic!( |
| "Expected string '{}' but stream terminated after '{}'", |
| target, accumulator |
| ), |
| } |
| } |
| assert_eq!(target, accumulator); |
| } |
| |
| #[fuchsia_async::run_singlethreaded(test)] |
| async fn collect_test_stdout() { |
| let (sock_server, sock_client) = fidl::Socket::create_stream(); |
| |
| let (sender, mut recv) = mpsc::channel(1); |
| |
| let fut = |
| fuchsia_async::Task::spawn(collect_and_send_string_output(sock_client, sender.into())); |
| |
| sock_server.write(b"test message 1").expect("Can't write msg to socket"); |
| sock_server.write(b"test message 2").expect("Can't write msg to socket"); |
| sock_server.write(b"test message 3").expect("Can't write msg to socket"); |
| |
| collect_until_eq(&mut recv, "test message 1test message 2test message 3").await; |
| |
| // can receive messages multiple times |
| sock_server.write(b"test message 4").expect("Can't write msg to socket"); |
| collect_until_eq(&mut recv, "test message 4").await; |
| |
| // messages can be read after socket server is closed. |
| sock_server.write(b"test message 5").expect("Can't write msg to socket"); |
| sock_server.into_handle(); // this will drop this handle and close it. |
| fut.await.expect("log collection should not fail"); |
| collect_until_eq(&mut recv, "test message 5").await; |
| |
| // socket was closed, this should return None |
| let msg = recv.next().await; |
| assert_eq!(msg, None); |
| } |
| |
| /// Host side executor doesn't have a fake timer, so these tests only run on device for now. |
| #[cfg(target_os = "fuchsia")] |
| mod stdout { |
| use super::*; |
| use fuchsia_async::TestExecutor; |
| use fuchsia_zircon::DurationNum; |
| use pretty_assertions::assert_eq; |
| use std::ops::Add; |
| |
| struct MutexBytes(Arc<Mutex<Vec<u8>>>); |
| |
| impl Write for MutexBytes { |
| fn flush(&mut self) -> Result<(), std::io::Error> { |
| Write::flush(&mut *self.0.lock()) |
| } |
| |
| fn write(&mut self, bytes: &[u8]) -> Result<usize, std::io::Error> { |
| Write::write(&mut *self.0.lock(), bytes) |
| } |
| } |
| |
| #[test] |
| fn log_buffer_without_timeout() { |
| let mut executor = TestExecutor::new_with_fake_time(); |
| let output = Arc::new(Mutex::new(vec![])); |
| let writer = MutexBytes(output.clone()); |
| let (log_buffer, mut timeout_task) = |
| StdoutBufferInner::new(std::time::Duration::from_secs(5), writer, 100); |
| |
| write!(log_buffer.lock(), "message1").expect("write message"); |
| assert_eq!(*output.lock(), b""); |
| write!(log_buffer.lock(), "message2").expect("write message"); |
| assert_eq!(*output.lock(), b""); |
| |
| assert_eq!(executor.run_until_stalled(&mut timeout_task), Poll::Pending); |
| assert_eq!(*output.lock(), b""); |
| |
| log_buffer.lock().flush().expect("flush buffer"); |
| assert_eq!(*output.lock(), b"message1message2"); |
| } |
| |
| #[test] |
| fn log_buffer_flush_on_drop() { |
| let mut executor = TestExecutor::new_with_fake_time(); |
| let output = Arc::new(Mutex::new(vec![])); |
| let writer = MutexBytes(output.clone()); |
| let (log_buffer, mut timeout_task) = |
| StdoutBufferInner::new(std::time::Duration::from_secs(5), writer, 100); |
| |
| write!(log_buffer.lock(), "message1").expect("write message"); |
| assert_eq!(*output.lock(), b""); |
| write!(log_buffer.lock(), "message2").expect("write message"); |
| assert_eq!(*output.lock(), b""); |
| |
| assert_eq!(executor.run_until_stalled(&mut timeout_task), Poll::Pending); |
| assert_eq!(*output.lock(), b""); |
| |
| drop(log_buffer); |
| assert_eq!(*output.lock(), b"message1message2"); |
| } |
| |
| #[test] |
| fn log_buffer_with_timeout() { |
| let mut executor = TestExecutor::new_with_fake_time(); |
| let output = Arc::new(Mutex::new(vec![])); |
| let writer = MutexBytes(output.clone()); |
| let (log_buffer, mut timeout_task) = |
| StdoutBufferInner::new(std::time::Duration::from_secs(5), writer, 100); |
| |
| write!(log_buffer.lock(), "message1").expect("write message"); |
| assert_eq!(*output.lock(), b""); |
| write!(log_buffer.lock(), "message2").expect("write message"); |
| assert_eq!(*output.lock(), b""); |
| |
| assert_eq!(executor.run_until_stalled(&mut timeout_task), Poll::Pending); |
| assert_eq!(*output.lock(), b""); |
| |
| executor.set_fake_time(executor.now().add(6.seconds())); |
| executor.wake_next_timer(); |
| assert_eq!(executor.run_until_stalled(&mut timeout_task), Poll::Ready(())); |
| assert_eq!(*output.lock(), b"message1message2"); |
| } |
| |
| #[test] |
| fn log_buffer_capacity_reached() { |
| let _executor = TestExecutor::new_with_fake_time(); |
| let output = Arc::new(Mutex::new(vec![])); |
| let writer = MutexBytes(output.clone()); |
| let (log_buffer, _timeout_task) = |
| StdoutBufferInner::new(std::time::Duration::from_secs(5), writer, 10); |
| |
| write!(log_buffer.lock(), "message1").expect("write message"); |
| assert_eq!(*output.lock(), b""); |
| write!(log_buffer.lock(), "message2").expect("write message"); |
| assert_eq!(*output.lock(), b"message1message2"); |
| |
| // capacity was reached but buffering is still on, so next msg should buffer |
| write!(log_buffer.lock(), "message1").expect("write message"); |
| assert_eq!(*output.lock(), b"message1message2"); |
| write!(log_buffer.lock(), "message2").expect("write message"); |
| assert_eq!(*output.lock(), b"message1message2message1message2"); |
| } |
| } |
| } |