blob: 1d088b39c6c62770449da27d6a461916b28cbe10 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//! Helpers for capturing logs from Fuchsia processes.
use {
fuchsia_async as fasync, fuchsia_zircon as zx,
futures::{future, AsyncReadExt as _, AsyncWriteExt as _, FutureExt as _},
std::num::NonZeroUsize,
thiserror::Error,
zx::HandleBased as _,
};
/// Buffer size for socket read calls to `LoggerStream::buffer_and_drain`.
const SOCKET_BUFFER_SIZE: usize = 2048;
const NEWLINE: u8 = b'\n';
/// Error returned by this library.
#[derive(Debug, PartialEq, Eq, Error, Clone)]
pub enum LoggerError {
#[error("cannot create socket: {:?}", _0)]
CreateSocket(zx::Status),
#[error("cannot duplicate socket: {:?}", _0)]
DuplicateSocket(zx::Status),
#[error("invalid socket: {:?}", _0)]
InvalidSocket(zx::Status),
}
/// Error returned from draining LoggerStream or writing to LogWriter.
#[derive(Debug, Error)]
pub enum LogError {
/// Error encountered when draining LoggerStream.
#[error("can't get logs: {:?}", _0)]
Read(std::io::Error),
/// Error encountered when writing to LogWriter.
#[error("can't write logs: {:?}", _0)]
Write(std::io::Error),
}
/// Creates a combined socket handle for stdout and stderr and hooks them to same socket.
/// It also wraps the socket into stream and returns it back.
pub fn create_std_combined_log_stream(
) -> Result<(LoggerStream, zx::Handle, zx::Handle), LoggerError> {
let (client, log) =
zx::Socket::create(zx::SocketOpts::STREAM).map_err(LoggerError::CreateSocket)?;
let stream = LoggerStream::new(client).map_err(LoggerError::InvalidSocket)?;
let clone =
log.duplicate_handle(zx::Rights::SAME_RIGHTS).map_err(LoggerError::DuplicateSocket)?;
Ok((stream, log.into_handle(), clone.into_handle()))
}
/// Creates a socket handle for stdout/stderr and hooks it to a file handle.
/// It also wraps the socket into stream and returns it back.
pub fn create_log_stream() -> Result<(LoggerStream, zx::Handle), LoggerError> {
let (client, log) =
zx::Socket::create(zx::SocketOpts::STREAM).map_err(LoggerError::CreateSocket)?;
let stream = LoggerStream::new(client).map_err(LoggerError::InvalidSocket)?;
Ok((stream, log.into_handle()))
}
/// Collects logs in background and gives a way to collect those logs.
pub struct LogStreamReader {
fut: future::RemoteHandle<Result<Vec<u8>, LogError>>,
}
impl LogStreamReader {
pub fn new(logger: LoggerStream) -> Self {
let (logger_handle, logger_fut) = logger.read_to_end().remote_handle();
fasync::Task::spawn(logger_handle).detach();
Self { fut: logger_fut }
}
/// Retrieve all logs.
pub async fn get_logs(self) -> Result<Vec<u8>, LogError> {
self.fut.await
}
}
/// A stream bound to a socket where a source stream is captured.
/// For example, stdout and stderr streams can be redirected to the contained
/// socket and captured.
pub struct LoggerStream {
socket: fasync::Socket,
}
impl Unpin for LoggerStream {}
impl LoggerStream {
/// Create a LoggerStream from the provided zx::Socket. The `socket` object
/// should be bound to its intented source stream (e.g. "stdout").
pub fn new(socket: zx::Socket) -> Result<LoggerStream, zx::Status> {
let l = LoggerStream { socket: fasync::Socket::from_socket(socket)? };
Ok(l)
}
/// Reads all bytes from socket.
pub async fn read_to_end(mut self) -> Result<Vec<u8>, LogError> {
let mut buffer: Vec<u8> = Vec::new();
let _bytes_read = self.socket.read_to_end(&mut buffer).await.map_err(LogError::Read)?;
Ok(buffer)
}
/// Drain the `stream` and write all of its contents to `writer`. Bytes are
/// delimited by newline and each line will be passed to `writer.write`.
pub async fn buffer_and_drain(mut self, writer: &mut SocketLogWriter) -> Result<(), LogError> {
let mut message_buffer: Vec<u8> = Vec::new();
let mut socket_buffer: Vec<u8> = vec![0; SOCKET_BUFFER_SIZE];
while let Some(bytes_read) = NonZeroUsize::new(
self.socket.read(&mut socket_buffer[..]).await.map_err(LogError::Read)?,
) {
let bytes_read = bytes_read.get();
message_buffer.extend(&socket_buffer[..bytes_read]);
if let Some(last_newline_pos) = message_buffer.iter().rposition(|&x| x == NEWLINE) {
let () = writer.write(message_buffer.drain(..=last_newline_pos).as_slice()).await?;
}
while message_buffer.len() >= SOCKET_BUFFER_SIZE {
let () = writer.write(message_buffer.drain(..).as_slice()).await?;
}
}
if !message_buffer.is_empty() {
let () = writer.write(&message_buffer[..]).await?;
}
Ok(())
}
/// Take the underlying socket of this object.
pub fn take_socket(self) -> fasync::Socket {
self.socket
}
}
/// Utility struct to write to socket asynchrously.
pub struct SocketLogWriter {
logger: fasync::Socket,
}
impl SocketLogWriter {
pub fn new(logger: fasync::Socket) -> Self {
Self { logger }
}
pub async fn write_str(&mut self, s: &str) -> Result<(), LogError> {
self.write(s.as_bytes()).await
}
pub async fn write(&mut self, bytes: &[u8]) -> Result<(), LogError> {
self.logger.write_all(bytes).await.map_err(LogError::Write)
}
}
#[cfg(test)]
mod tests {
use {
super::*,
anyhow::{format_err, Context as _, Error},
assert_matches::assert_matches,
futures::{try_join, TryStreamExt as _},
rand::{
distributions::{Alphanumeric, DistString as _},
thread_rng,
},
std::mem::drop,
test_case::test_case,
};
#[fuchsia_async::run_singlethreaded(test)]
async fn log_writer_reader_work() {
let (sock1, sock2) = zx::Socket::create(zx::SocketOpts::STREAM).unwrap();
let mut log_writer = SocketLogWriter::new(fasync::Socket::from_socket(sock1).unwrap());
let reader = LoggerStream::new(sock2).unwrap();
let reader = LogStreamReader::new(reader);
log_writer.write_str("this is string one.").await.unwrap();
log_writer.write_str("this is string two.").await.unwrap();
drop(log_writer);
let actual = reader.get_logs().await.unwrap();
let actual = std::str::from_utf8(&actual).unwrap();
assert_eq!(actual, "this is string one.this is string two.".to_owned());
}
#[test_case(String::from("Hello World!") ; "consumes_simple_msg")]
#[test_case(get_random_string(10000) ; "consumes_large_msg")]
#[fasync::run_singlethreaded(test)]
async fn logger_stream_read_to_end(msg: String) -> Result<(), Error> {
let (stream, tx) = create_logger_stream()?;
let () = take_and_write_to_socket(tx, &msg)?;
let result = stream.read_to_end().await.context("Failed to read from socket")?;
let actual = std::str::from_utf8(&result).context("Failed to parse bytes")?.to_owned();
assert_eq!(actual, msg);
Ok(())
}
#[fasync::run_singlethreaded(test)]
async fn logger_stream_read_to_end_consumes_concat_msgs() -> Result<(), Error> {
let (stream, tx) = create_logger_stream()?;
let msgs =
vec!["Hello World!".to_owned(), "Hola Mundo!".to_owned(), "你好,世界!".to_owned()];
for msg in msgs.iter() {
let () = write_to_socket(&tx, &msg)?;
}
std::mem::drop(tx);
let result = stream.read_to_end().await.context("Failed to read from socket")?;
let actual = std::str::from_utf8(&result).context("Failed to parse bytes")?.to_owned();
assert_eq!(actual, msgs.join(""));
Ok(())
}
#[fasync::run_singlethreaded(test)]
async fn buffer_and_drain_reads_message_until_last_newline() -> Result<(), Error> {
let (stream, tx) = create_logger_stream()?;
let (mut logger, rx) = create_datagram_logger()?;
let msg = "Hello World\nHola Mundo!\n你好,世界!";
let () = take_and_write_to_socket(tx, msg)?;
let (actual, ()) = try_join!(read_all_messages(rx), async move {
stream.buffer_and_drain(&mut logger).await.context("Failed to drain stream")
},)?;
assert_eq!(
actual,
vec![String::from("Hello World\nHola Mundo!\n"), String::from("你好,世界!")],
);
Ok(())
}
#[fasync::run_singlethreaded(test)]
async fn buffer_and_drain_dumps_full_buffer_if_no_newline_seen() -> Result<(), Error> {
let (stream, tx) = create_logger_stream()?;
let (mut logger, rx) = create_datagram_logger()?;
let ((), ()) = try_join!(
async move {
let msg = get_random_string(SOCKET_BUFFER_SIZE);
// First write up to (SOCKET_BUFFER_SIZE - 1) so that we can
// assert that buffer isn't drained prematurely.
let () = write_to_socket(&tx, &msg[..SOCKET_BUFFER_SIZE - 1])?;
// Temporarily convert fasync::Socket back to zx::Socket so that
// we can use non-blocking `read` call.
let rx = rx.into_zx_socket();
let mut buffer = vec![0u8; SOCKET_BUFFER_SIZE];
let maybe_bytes_read = rx.read(&mut buffer);
assert_eq!(maybe_bytes_read, Err(zx::Status::SHOULD_WAIT));
// Write last byte and convert zx::Socket back to fasync::Socket.
let () = write_to_socket(&tx, &msg[SOCKET_BUFFER_SIZE - 1..SOCKET_BUFFER_SIZE])?;
let mut rx = fasync::Socket::from_socket(rx)
.context("Failed to convert to fasync::Socket")?;
let bytes_read =
rx.read(&mut buffer).await.context("Failed to read from socket")?;
let msg_written = std::str::from_utf8(&buffer).context("Failed to parse bytes")?;
assert_eq!(bytes_read, SOCKET_BUFFER_SIZE);
assert_eq!(msg_written, msg);
Ok(())
},
async move { stream.buffer_and_drain(&mut logger).await.context("Failed to drain stream") },
)?;
Ok(())
}
#[fasync::run_singlethreaded(test)]
async fn buffer_and_drain_return_error_if_stream_polls_err() -> Result<(), Error> {
let (tx, rx) =
zx::Socket::create(zx::SocketOpts::STREAM).context("Failed to create socket")?;
// A closed socket should yield an error when stream is polled.
let () = rx.half_close()?;
let () = tx.half_close()?;
let stream = LoggerStream::new(rx).context("Failed to create LoggerStream")?;
let (mut logger, _rx) = create_datagram_logger()?;
let result = stream.buffer_and_drain(&mut logger).await;
assert_matches!(result, Err(LogError::Read(_)));
Ok(())
}
async fn read_all_messages(socket: fasync::Socket) -> Result<Vec<String>, Error> {
let mut results = Vec::new();
let mut stream = socket.into_datagram_stream();
while let Some(bytes) = stream.try_next().await.context("Failed to read socket stream")? {
results.push(
std::str::from_utf8(&bytes).context("Failed to parse bytes into utf8")?.to_owned(),
);
}
Ok(results)
}
fn take_and_write_to_socket(socket: zx::Socket, message: &str) -> Result<(), Error> {
write_to_socket(&socket, &message)
}
fn write_to_socket(socket: &zx::Socket, message: &str) -> Result<(), Error> {
let bytes_written =
socket.write(message.as_bytes()).context("Failed to write to socket")?;
match bytes_written == message.len() {
true => Ok(()),
false => Err(format_err!("Bytes written to socket doesn't match len of message. Message len = {}. Bytes written = {}", message.len(), bytes_written)),
}
}
fn create_datagram_logger() -> Result<(SocketLogWriter, fasync::Socket), Error> {
let (tx, rx) =
zx::Socket::create(zx::SocketOpts::DATAGRAM).context("Failed to create zx::Socket")?;
let logger = SocketLogWriter::new(
fasync::Socket::from_socket(tx).context("Failed to create fasync::Socket")?,
);
let rx = fasync::Socket::from_socket(rx).context("Failed to create fasync::Socket")?;
Ok((logger, rx))
}
fn create_logger_stream() -> Result<(LoggerStream, zx::Socket), Error> {
let (tx, rx) =
zx::Socket::create(zx::SocketOpts::STREAM).context("Failed to create socket")?;
let stream = LoggerStream::new(rx).context("Failed to create LoggerStream")?;
Ok((stream, tx))
}
fn get_random_string(size: usize) -> String {
Alphanumeric.sample_string(&mut thread_rng(), size)
}
}