blob: 289fb8c4ce3a37ee741e3bb9d4f7aff247c1a2f7 [file] [log] [blame]
// Copyright 2022 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use crate::trace::duration;
use parking_lot::Mutex;
use std::{
io::{Error, Write},
sync::Arc,
};
/// A handle around an inner writer. This serves as a "multiplexing" writer that
/// writes bytes from multiple sources into a single serial destination, typically
/// to stdout.
/// Output sent to a handle is buffered until a newline is encountered, then the
/// buffered output is written to the inner writer.
/// The handle also supports prepending a prefix to the start of each buffer. This
/// helps preserve existing behavior where prefixes are added to the start of stdout
/// and log lines to help a developer understand what produced some output.
pub(super) struct ShellWriterHandle<W: 'static + Write + Send + Sync> {
inner: Arc<Mutex<ShellWriterHandleInner<W>>>,
buffer: Vec<u8>,
/// Prefix, if any, to prepend to output before writing to the inner writer.
prefix: Option<Vec<u8>>,
handle_id: u32,
}
impl<W: 'static + Write + Send + Sync> ShellWriterHandle<W> {
const NEWLINE_BYTE: u8 = b'\n';
const BUFFER_CAPACITY: usize = 1024;
/// Create a new handle to a wrapped writer.
pub(super) fn new_handle(
inner: Arc<Mutex<ShellWriterHandleInner<W>>>,
prefix: Option<String>,
) -> Self {
let mut lock = inner.lock();
let handle_id = lock.num_handles;
lock.num_handles += 1;
drop(lock);
Self {
inner,
buffer: Vec::with_capacity(Self::BUFFER_CAPACITY),
prefix: prefix.map(String::into_bytes),
handle_id,
}
}
/// Write a full line to the inner writer.
fn write_bufs(writer: &mut W, bufs: &[&[u8]]) -> Result<(), Error> {
for buf in bufs {
writer.write_all(buf)?;
}
Ok(())
}
}
/// Inner mutable state for |ShellWriterHandle|.
pub(super) struct ShellWriterHandleInner<W: 'static + Write + Send + Sync> {
/// The writer to which all content is passed.
writer: W,
/// The id of the last handle that wrote to the writer, used to conditionally
/// output a prefix only when the handle writing to the output changes.
last_writer_id: Option<u32>,
/// The number of handles that have been created. Used to assign ids to handles.
num_handles: u32,
}
impl<W: 'static + Write + Send + Sync> ShellWriterHandleInner<W> {
pub(super) fn new(writer: W) -> Self {
Self { writer, last_writer_id: None, num_handles: 0 }
}
}
/// A handle to a writer contained in a |ShellWriterHandle|. This is exposed for testing
/// purposes.
pub struct ShellWriterView<W: 'static + Write + Send + Sync>(Arc<Mutex<ShellWriterHandleInner<W>>>);
impl<W: 'static + Write + Send + Sync> ShellWriterView<W> {
pub(super) fn new(inner: Arc<Mutex<ShellWriterHandleInner<W>>>) -> Self {
Self(inner)
}
pub fn lock(&self) -> parking_lot::MappedMutexGuard<'_, W> {
parking_lot::MutexGuard::map(self.0.lock(), |handle_inner| &mut handle_inner.writer)
}
}
impl<W: 'static + Write + Send + Sync> Write for ShellWriterHandle<W> {
fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
duration!("shell_write");
// find the last newline in the buffer. In case multiple lines are written as once,
// we should write once to the inner writer and add our prefix only once. This helps
// avoid spamming the output with prefixes in case many lines are present.
let newline_pos = buf
.iter()
.rev()
.position(|byte| *byte == Self::NEWLINE_BYTE)
.map(|pos_from_end| buf.len() - pos_from_end - 1);
// In case we'd exceed the buffer, just wrte everything, but append a newline to avoid
// interspersing.
let (final_byte_pos, append_newline) = match newline_pos {
// no newline, pushing all to buffer would exceed capacity
None if self.buffer.len() + buf.len() > Self::BUFFER_CAPACITY => (buf.len() - 1, true),
None => {
self.buffer.extend_from_slice(buf);
return Ok(buf.len());
}
// newline exists, but the rest of buf would exceed capacity.
Some(pos) if buf.len() - pos > Self::BUFFER_CAPACITY => (buf.len() - 1, true),
Some(pos) => (pos, false),
};
let mut inner = self.inner.lock();
let last_writer_id = inner.last_writer_id.replace(self.handle_id);
let mut bufs_to_write = vec![];
if let Some(prefix) = self.prefix.as_ref() {
if last_writer_id != Some(self.handle_id) {
bufs_to_write.push(prefix.as_slice());
}
}
if !self.buffer.is_empty() {
bufs_to_write.push(self.buffer.as_slice());
}
bufs_to_write.push(&buf[..final_byte_pos + 1]);
if append_newline {
bufs_to_write.push(&[Self::NEWLINE_BYTE]);
}
Self::write_bufs(&mut inner.writer, bufs_to_write.as_slice())?;
self.buffer.clear();
self.buffer.extend_from_slice(&buf[final_byte_pos + 1..]);
Ok(buf.len())
}
fn flush(&mut self) -> Result<(), Error> {
let mut inner = self.inner.lock();
let last_writer_id = inner.last_writer_id.replace(self.handle_id);
if !self.buffer.is_empty() {
self.buffer.push(Self::NEWLINE_BYTE);
let mut bufs_to_write = vec![];
if let Some(prefix) = self.prefix.as_ref() {
if last_writer_id != Some(self.handle_id) {
bufs_to_write.push(prefix.as_slice());
}
}
bufs_to_write.push(self.buffer.as_slice());
Self::write_bufs(&mut inner.writer, bufs_to_write.as_slice())?;
self.buffer.clear();
}
inner.writer.flush()
}
}
impl<W: 'static + Write + Send + Sync> std::ops::Drop for ShellWriterHandle<W> {
fn drop(&mut self) {
let _ = self.flush();
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::ErrorKind;
fn create_writer_inner_and_view(
) -> (Arc<Mutex<ShellWriterHandleInner<Vec<u8>>>>, ShellWriterView<Vec<u8>>) {
let inner = Arc::new(Mutex::new(ShellWriterHandleInner::new(vec![])));
(inner.clone(), ShellWriterView(inner))
}
#[fuchsia::test]
fn single_handle() {
let (handle_inner, output) = create_writer_inner_and_view();
let mut write_handle = ShellWriterHandle::new_handle(handle_inner, None);
assert_eq!(write_handle.write(b"hello world").unwrap(), b"hello world".len(),);
assert!(output.lock().is_empty());
assert_eq!(write_handle.write(b"\n").unwrap(), b"\n".len(),);
assert_eq!(output.lock().as_slice(), b"hello world\n");
assert_eq!(write_handle.write(b"flushed output").unwrap(), b"flushed output".len(),);
write_handle.flush().unwrap();
assert_eq!(output.lock().as_slice(), b"hello world\nflushed output\n");
}
#[fuchsia::test]
fn single_handle_with_prefix() {
let (handle_inner, output) = create_writer_inner_and_view();
let mut write_handle =
ShellWriterHandle::new_handle(handle_inner, Some("[prefix] ".to_string()));
assert_eq!(write_handle.write(b"hello world").unwrap(), b"hello world".len(),);
assert!(output.lock().is_empty());
assert_eq!(write_handle.write(b"\n").unwrap(), b"\n".len(),);
assert_eq!(output.lock().as_slice(), b"[prefix] hello world\n");
assert_eq!(write_handle.write(b"flushed output").unwrap(), b"flushed output".len(),);
write_handle.flush().unwrap();
assert_eq!(output.lock().as_slice(), b"[prefix] hello world\nflushed output\n");
}
#[fuchsia::test]
fn single_handle_multiple_line() {
let (handle_inner, output) = create_writer_inner_and_view();
let mut write_handle = ShellWriterHandle::new_handle(handle_inner, None);
const WRITE_BYTES: &[u8] = b"This is a \nmultiline output \nwithout newline termination";
assert_eq!(write_handle.write(WRITE_BYTES).unwrap(), WRITE_BYTES.len(),);
assert_eq!(output.lock().as_slice(), b"This is a \nmultiline output \n");
write_handle.flush().unwrap();
assert_eq!(
output.lock().as_slice(),
b"This is a \nmultiline output \nwithout newline termination\n"
);
output.lock().clear();
const TERMINATED_BYTES: &[u8] = b"This is \nnewline terminated \noutput\n";
assert_eq!(write_handle.write(TERMINATED_BYTES).unwrap(), TERMINATED_BYTES.len(),);
assert_eq!(output.lock().as_slice(), b"This is \nnewline terminated \noutput\n");
}
#[fuchsia::test]
fn single_handle_exceed_buffer_in_single_write() {
const CAPACITY: usize = ShellWriterHandle::<Vec<u8>>::BUFFER_CAPACITY;
// each case consists of a sequence of pairs, where each pair is a string to write, and
// the expected output after writing the string.
let cases = vec![
(
"exceed in one write",
vec![("a".repeat(CAPACITY + 1), format!("{}\n", "a".repeat(CAPACITY + 1)))],
),
(
"exceed on second write",
vec![
("a".to_string(), "".to_string()),
("a".repeat(CAPACITY + 1), format!("{}\n", "a".repeat(CAPACITY + 2))),
],
),
(
"exceed in one write, with newline",
vec![(
format!("\n{}", "a".repeat(CAPACITY + 1)),
format!("\n{}\n", "a".repeat(CAPACITY + 1)),
)],
),
(
"exceed in two writes, with newline",
vec![
("a".to_string(), "".to_string()),
(
format!("\n{}", "a".repeat(CAPACITY + 1)),
format!("a\n{}\n", "a".repeat(CAPACITY + 1)),
),
],
),
];
for (case_name, writes) in cases.into_iter() {
let (handle_inner, output) = create_writer_inner_and_view();
let mut write_handle = ShellWriterHandle::new_handle(handle_inner, None);
for (write_no, (to_write, expected)) in writes.into_iter().enumerate() {
assert_eq!(
write_handle.write(to_write.as_bytes()).unwrap(),
to_write.as_bytes().len(),
"Got wrong number of bytes written for write {:?} in case {}",
write_no,
case_name
);
assert_eq!(
String::from_utf8(output.lock().clone()).unwrap(),
expected,
"Buffer contains unexpected contents after write {:?} in case {}",
write_no,
case_name
)
}
}
}
#[fuchsia::test]
fn single_handle_with_prefix_multiple_line() {
let (handle_inner, output) = create_writer_inner_and_view();
let mut write_handle =
ShellWriterHandle::new_handle(handle_inner, Some("[prefix] ".to_string()));
const WRITE_BYTES: &[u8] = b"This is a \nmultiline output \nwithout newline termination";
assert_eq!(write_handle.write(WRITE_BYTES).unwrap(), WRITE_BYTES.len(),);
// Note we 'chunk' output in each write to avoid spamming the prefix, so the second
// line won't contain the prefix.
assert_eq!(output.lock().as_slice(), b"[prefix] This is a \nmultiline output \n");
write_handle.flush().unwrap();
assert_eq!(
output.lock().as_slice(),
"[prefix] This is a \nmultiline output \nwithout newline termination\n".as_bytes()
);
const TERMINATED_BYTES: &[u8] = b"This is \nnewline terminated \noutput\n";
assert_eq!(write_handle.write(TERMINATED_BYTES).unwrap(), TERMINATED_BYTES.len(),);
assert_eq!(
output.lock().as_slice(),
b"[prefix] This is a \nmultiline output \n\
without newline termination\nThis is \nnewline terminated \noutput\n"
);
}
#[fuchsia::test]
fn multiple_handles() {
let (handle_inner, output) = create_writer_inner_and_view();
let mut handle_1 =
ShellWriterHandle::new_handle(handle_inner.clone(), Some("[1] ".to_string()));
let mut handle_2 = ShellWriterHandle::new_handle(handle_inner, Some("[2] ".to_string()));
write!(handle_1, "hi from 1").unwrap();
write!(handle_2, "hi from 2").unwrap();
assert!(output.lock().is_empty());
write!(handle_1, "\n").unwrap();
assert_eq!(output.lock().as_slice(), "[1] hi from 1\n".as_bytes());
write!(handle_2, "\n").unwrap();
assert_eq!(output.lock().as_slice(), "[1] hi from 1\n[2] hi from 2\n".as_bytes());
}
// The following tests verify behavior of the shell writer when the inner writer
// exhibits some allowed edge cases.
#[fuchsia::test]
fn output_written_when_inner_writer_writes_partial_buffer() {
/// A writer that writes at most 3 bytes at a time.
struct PartialOutputWriter(Vec<u8>);
impl Write for PartialOutputWriter {
fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
if buf.len() >= 3 {
self.0.write(&buf[..3])
} else {
self.0.write(buf)
}
}
fn flush(&mut self) -> Result<(), Error> {
self.0.flush()
}
}
let inner = Arc::new(Mutex::new(ShellWriterHandleInner::new(PartialOutputWriter(vec![]))));
let output = ShellWriterView(inner.clone());
let mut write_handle =
ShellWriterHandle::new_handle(inner.clone(), Some("[prefix] ".to_string()));
assert_eq!(write_handle.write(b"hello").unwrap(), b"hello".len());
assert!(output.lock().0.is_empty());
assert_eq!(write_handle.write(b"\n").unwrap(), b"\n".len());
assert_eq!(output.lock().0.as_slice(), b"[prefix] hello\n");
let mut write_handle_2 =
ShellWriterHandle::new_handle(inner, Some("[prefix2] ".to_string()));
assert_eq!(write_handle.write(b"hello").unwrap(), b"hello".len());
assert_eq!(write_handle_2.write(b"world").unwrap(), b"world".len());
assert_eq!(output.lock().0.as_slice(), b"[prefix] hello\n");
assert_eq!(write_handle.write(b"\n").unwrap(), b"\n".len());
assert_eq!(output.lock().0.as_slice(), b"[prefix] hello\nhello\n");
assert_eq!(write_handle_2.write(b"\n").unwrap(), b"\n".len());
assert_eq!(output.lock().0.as_slice(), b"[prefix] hello\nhello\n[prefix2] world\n");
}
#[fuchsia::test]
fn output_written_when_inner_writer_returns_interrupted() {
/// A writer that returns interrupted on the first write attempt
struct InterruptWriter {
buf: Vec<u8>,
returned_interrupt: bool,
}
impl Write for InterruptWriter {
fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
if !self.returned_interrupt {
self.returned_interrupt = true;
Err(ErrorKind::Interrupted.into())
} else {
self.buf.write(buf)
}
}
fn flush(&mut self) -> Result<(), Error> {
self.buf.flush()
}
}
let inner = Arc::new(Mutex::new(ShellWriterHandleInner::new(InterruptWriter {
buf: vec![],
returned_interrupt: false,
})));
let output = ShellWriterView(inner.clone());
let mut write_handle =
ShellWriterHandle::new_handle(inner.clone(), Some("[prefix] ".to_string()));
assert_eq!(write_handle.write(b"hello").unwrap(), b"hello".len());
assert!(output.lock().buf.is_empty());
assert_eq!(write_handle.write(b"\n").unwrap(), b"\n".len());
assert_eq!(output.lock().buf.as_slice(), b"[prefix] hello\n");
let mut write_handle_2 =
ShellWriterHandle::new_handle(inner.clone(), Some("[prefix2] ".to_string()));
assert_eq!(write_handle.write(b"hello").unwrap(), b"hello".len());
assert_eq!(write_handle_2.write(b"world").unwrap(), b"world".len());
assert_eq!(output.lock().buf.as_slice(), b"[prefix] hello\n");
assert_eq!(write_handle.write(b"\n").unwrap(), b"\n".len());
assert_eq!(output.lock().buf.as_slice(), b"[prefix] hello\nhello\n");
assert_eq!(write_handle_2.write(b"\n").unwrap(), b"\n".len());
assert_eq!(output.lock().buf.as_slice(), b"[prefix] hello\nhello\n[prefix2] world\n");
}
}