blob: 8285bb70aada9f92358a46a0a29476ebdd9c9359 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//! Helpers for capturing logs from Fuchsia processes.
use {
crate::errors::FdioError,
fdio::fdio_sys,
fuchsia_async as fasync, fuchsia_zircon as zx,
futures::{
io::{self, AsyncRead},
prelude::*,
ready,
task::{Context, Poll},
},
std::{cell::RefCell, pin::Pin},
thiserror::Error,
zx::HandleBased,
};
/// Error returned by this library.
#[derive(Debug, PartialEq, Eq, Error, Clone)]
pub enum LoggerError {
#[error("fdio error: {:?}", _0)]
Fdio(#[from] FdioError),
#[error("cannot create socket: {:?}", _0)]
CreateSocket(zx::Status),
#[error("invalid socket: {:?}", _0)]
InvalidSocket(zx::Status),
}
/// Logger stream to read logs from a socket
#[must_use = "futures/streams"]
pub struct LoggerStream {
socket: fasync::Socket,
}
impl Unpin for LoggerStream {}
thread_local! {
pub static BUFFER:
RefCell<[u8; 4096]> = RefCell::new([0; 4096]);
}
impl LoggerStream {
/// Creates a new `LoggerStream` for given `socket`.
pub fn new(socket: zx::Socket) -> Result<LoggerStream, LoggerError> {
let l = LoggerStream {
socket: fasync::Socket::from_socket(socket).map_err(LoggerError::InvalidSocket)?,
};
Ok(l)
}
}
fn process_log_bytes(bytes: &[u8]) -> Vec<u8> {
bytes.to_vec()
}
impl Stream for LoggerStream {
type Item = io::Result<Vec<u8>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
BUFFER.with(|b| {
let mut b = b.borrow_mut();
let len = ready!(Pin::new(&mut self.socket).poll_read(cx, &mut *b)?);
if len == 0 {
return Poll::Ready(None);
}
Poll::Ready(Some(process_log_bytes(&b[0..len])).map(Ok))
})
}
}
/// Creates socket handle for stdout and stderr and hooks them to same socket.
/// It also wraps the socket into stream and returns it back.
pub fn create_log_stream() -> Result<(LoggerStream, zx::Handle, zx::Handle), LoggerError> {
let (client, log) =
zx::Socket::create(zx::SocketOpts::STREAM).map_err(LoggerError::CreateSocket)?;
let mut stdout_file_handle = zx::sys::ZX_HANDLE_INVALID;
let mut stderr_file_handle = zx::sys::ZX_HANDLE_INVALID;
unsafe {
let mut std_fd: i32 = -1;
let mut status = fdio::fdio_sys::fdio_fd_create(log.into_raw(), &mut std_fd);
if let Err(s) = zx::Status::ok(status) {
return Err(LoggerError::Fdio(FdioError::Create(s)));
}
status =
fdio_sys::fdio_fd_clone(std_fd, &mut stderr_file_handle as *mut zx::sys::zx_handle_t);
if let Err(s) = zx::Status::ok(status) {
return Err(LoggerError::Fdio(FdioError::Clone(s)));
}
status = fdio_sys::fdio_fd_transfer(
std_fd,
&mut stdout_file_handle as *mut zx::sys::zx_handle_t,
);
if let Err(s) = zx::Status::ok(status) {
return Err(LoggerError::Fdio(FdioError::Transfer(s)));
}
Ok((
LoggerStream::new(client)?,
zx::Handle::from_raw(stdout_file_handle),
zx::Handle::from_raw(stderr_file_handle),
))
}
}
/// Buffer `stdlogger` by newline and write to `log_writer`.
pub async fn buffer_and_drain_logger(
mut stdlogger: LoggerStream,
log_writer: &mut LogWriter,
) -> Result<(), LogError> {
let mut buf: Vec<u8> = vec![];
let newline = b'\n';
while let Some(bytes) = stdlogger.try_next().await.map_err(LogError::Read)? {
if bytes.is_empty() {
continue;
}
// buffer by newline, find last newline and send message till then,
// store rest in buffer.
buf.extend(bytes);
if let Some(i) = buf.iter().rposition(|&x| x == newline) {
log_writer.write(buf.drain(0..=i).as_slice()).await?;
}
}
if buf.len() > 0 {
// Flush remainder of buffer in case the last message isn't terminated with a newline.
log_writer.write(&buf).await?;
}
Ok(())
}
/// Error while reading/writing log using socket
#[derive(Debug, Error)]
pub enum LogError {
#[error("can't get logs: {:?}", _0)]
Read(std::io::Error),
#[error("can't write logs: {:?}", _0)]
Write(std::io::Error),
}
/// Collects logs in background and gives a way to collect those logs.
pub struct LogStreamReader {
fut: future::RemoteHandle<Result<Vec<u8>, std::io::Error>>,
}
impl LogStreamReader {
pub fn new(logger: LoggerStream) -> Self {
let (logger_handle, logger_fut) = logger.try_concat().remote_handle();
fasync::Task::local(logger_handle).detach();
Self { fut: logger_fut }
}
/// Retrive all logs.
pub async fn get_logs(self) -> Result<Vec<u8>, LogError> {
self.fut.await.map_err(LogError::Read)
}
}
/// Utility struct to write to socket asynchrously.
pub struct LogWriter {
logger: fasync::Socket,
}
impl LogWriter {
pub fn new(logger: fasync::Socket) -> Self {
Self { logger }
}
pub async fn write_str(&mut self, s: &str) -> Result<usize, LogError> {
self.write(s.as_bytes()).await
}
pub async fn write(&mut self, bytes: &[u8]) -> Result<usize, LogError> {
self.logger.write(bytes).await.map_err(LogError::Write)
}
}
#[cfg(test)]
mod tests {
use {super::*, fuchsia_zircon as zx, std::mem::drop};
#[fuchsia_async::run_singlethreaded(test)]
async fn log_writer_reader_work() {
let (sock1, sock2) = zx::Socket::create(zx::SocketOpts::DATAGRAM).unwrap();
let mut log_writer = LogWriter::new(fasync::Socket::from_socket(sock1).unwrap());
let reader = LoggerStream::new(sock2).unwrap();
let reader = LogStreamReader::new(reader);
log_writer.write_str("this is string one.").await.unwrap();
log_writer.write_str("this is string two.").await.unwrap();
drop(log_writer);
let actual = reader.get_logs().await.unwrap();
let actual = std::str::from_utf8(&actual).unwrap();
assert_eq!(actual, "this is string one.this is string two.".to_owned());
}
}