[run-test-suite] Move shell writer to new file

Bug: 98222

Change-Id: Iad4e5d4e52a8b1ea0da57a0f95bff28267c4abeb
Reviewed-on: https://fuchsia-review.googlesource.com/c/fuchsia/+/693830
Fuchsia-Auto-Submit: Satsuki Ueno <satsukiu@google.com>
Commit-Queue: Auto-Submit <auto-submit@fuchsia-infra.iam.gserviceaccount.com>
Reviewed-by: Ankur Mittal <anmittal@google.com>
diff --git a/src/sys/run_test_suite/BUILD.gn b/src/sys/run_test_suite/BUILD.gn
index 8aaa742..e829765 100644
--- a/src/sys/run_test_suite/BUILD.gn
+++ b/src/sys/run_test_suite/BUILD.gn
@@ -74,7 +74,8 @@
     "src/output/mod.rs",
     "src/output/mux.rs",
     "src/output/noop.rs",
-    "src/output/shell.rs",
+    "src/output/shell/mod.rs",
+    "src/output/shell/writer.rs",
     "src/params.rs",
     "src/stream_util.rs",
     "src/trace.rs",
diff --git a/src/sys/run_test_suite/src/output/shell.rs b/src/sys/run_test_suite/src/output/shell/mod.rs
similarity index 61%
rename from src/sys/run_test_suite/src/output/shell.rs
rename to src/sys/run_test_suite/src/output/shell/mod.rs
index 8654e74..ae5bf8c 100644
--- a/src/sys/run_test_suite/src/output/shell.rs
+++ b/src/sys/run_test_suite/src/output/shell/mod.rs
@@ -6,7 +6,6 @@
     noop::NoopDirectoryWriter, ArtifactType, DirectoryArtifactType, DynArtifact,
     DynDirectoryArtifact, EntityId, EntityInfo, ReportedOutcome, Reporter, Timestamp,
 };
-use crate::trace::duration;
 use async_trait::async_trait;
 use fuchsia_async as fasync;
 use log::error;
@@ -18,154 +17,13 @@
     time::Duration,
 };
 
+mod writer;
+pub use writer::ShellWriterView;
+use writer::{ShellWriterHandle, ShellWriterHandleInner};
+
 /// Duration after which to emit an excessive duration log.
 const EXCESSIVE_DURATION: Duration = Duration::from_secs(60);
 
-/// A handle around an inner writer. This serves as a "multiplexing" writer that
-/// writes bytes from multiple sources into a single serial destination, typically
-/// to stdout.
-/// Output sent to a handle is buffered until a newline is encountered, then the
-/// buffered output is written to the inner writer.
-/// The handle also supports prepending a prefix to the start of each buffer. This
-/// helps preserve existing behavior where prefixes are added to the start of stdout
-/// and log lines to help a developer understand what produced some output.
-struct ShellWriterHandle<W: 'static + Write + Send + Sync> {
-    inner: Arc<Mutex<ShellWriterHandleInner<W>>>,
-    buffer: Vec<u8>,
-    /// Prefix, if any, to prepend to output before writing to the inner writer.
-    prefix: Option<Vec<u8>>,
-    handle_id: u32,
-}
-
-impl<W: 'static + Write + Send + Sync> ShellWriterHandle<W> {
-    const NEWLINE_BYTE: u8 = b'\n';
-    const BUFFER_CAPACITY: usize = 1024;
-
-    /// Create a new handle to a wrapped writer.
-    fn new_handle(inner: Arc<Mutex<ShellWriterHandleInner<W>>>, prefix: Option<String>) -> Self {
-        let mut lock = inner.lock();
-        let handle_id = lock.num_handles;
-        lock.num_handles += 1;
-        drop(lock);
-        Self {
-            inner,
-            buffer: Vec::with_capacity(Self::BUFFER_CAPACITY),
-            prefix: prefix.map(String::into_bytes),
-            handle_id,
-        }
-    }
-
-    /// Write a full line to the inner writer.
-    fn write_bufs(writer: &mut W, bufs: &[&[u8]]) -> Result<(), Error> {
-        for buf in bufs {
-            writer.write_all(buf)?;
-        }
-        Ok(())
-    }
-}
-
-/// Inner mutable state for |ShellWriterHandle|.
-struct ShellWriterHandleInner<W: 'static + Write + Send + Sync> {
-    /// The writer to which all content is passed.
-    writer: W,
-    /// The id of the last handle that wrote to the writer, used to conditionally
-    /// output a prefix only when the handle writing to the output changes.
-    last_writer_id: Option<u32>,
-    /// The number of handles that have been created. Used to assign ids to handles.
-    num_handles: u32,
-}
-
-impl<W: 'static + Write + Send + Sync> ShellWriterHandleInner<W> {
-    fn new(writer: W) -> Self {
-        Self { writer, last_writer_id: None, num_handles: 0 }
-    }
-}
-
-/// A handle to a writer contained in a |ShellWriterHandle|. This is exposed for testing
-/// purposes.
-pub struct ShellWriterView<W: 'static + Write + Send + Sync>(Arc<Mutex<ShellWriterHandleInner<W>>>);
-
-impl<W: 'static + Write + Send + Sync> ShellWriterView<W> {
-    pub fn lock(&self) -> parking_lot::MappedMutexGuard<'_, W> {
-        parking_lot::MutexGuard::map(self.0.lock(), |handle_inner| &mut handle_inner.writer)
-    }
-}
-
-impl<W: 'static + Write + Send + Sync> Write for ShellWriterHandle<W> {
-    fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
-        duration!("shell_write");
-        // find the last newline in the buffer. In case multiple lines are written as once,
-        // we should write once to the inner writer and add our prefix only once. This helps
-        // avoid spamming the output with prefixes in case many lines are present.
-        let newline_pos = buf
-            .iter()
-            .rev()
-            .position(|byte| *byte == Self::NEWLINE_BYTE)
-            .map(|pos_from_end| buf.len() - pos_from_end - 1);
-        // In case we'd exceed the buffer, just wrte everything, but append a newline to avoid
-        // interspersing.
-        let (final_byte_pos, append_newline) = match newline_pos {
-            // no newline, pushing all to buffer would exceed capacity
-            None if self.buffer.len() + buf.len() > Self::BUFFER_CAPACITY => (buf.len() - 1, true),
-            None => {
-                self.buffer.extend_from_slice(buf);
-                return Ok(buf.len());
-            }
-            // newline exists, but the rest of buf would exceed capacity.
-            Some(pos) if buf.len() - pos > Self::BUFFER_CAPACITY => (buf.len() - 1, true),
-            Some(pos) => (pos, false),
-        };
-
-        let mut inner = self.inner.lock();
-        let last_writer_id = inner.last_writer_id.replace(self.handle_id);
-
-        let mut bufs_to_write = vec![];
-        if let Some(prefix) = self.prefix.as_ref() {
-            if last_writer_id != Some(self.handle_id) {
-                bufs_to_write.push(prefix.as_slice());
-            }
-        }
-        if !self.buffer.is_empty() {
-            bufs_to_write.push(self.buffer.as_slice());
-        }
-        bufs_to_write.push(&buf[..final_byte_pos + 1]);
-        if append_newline {
-            bufs_to_write.push(&[Self::NEWLINE_BYTE]);
-        }
-
-        Self::write_bufs(&mut inner.writer, bufs_to_write.as_slice())?;
-
-        self.buffer.clear();
-        self.buffer.extend_from_slice(&buf[final_byte_pos + 1..]);
-        Ok(buf.len())
-    }
-
-    fn flush(&mut self) -> Result<(), Error> {
-        let mut inner = self.inner.lock();
-        let last_writer_id = inner.last_writer_id.replace(self.handle_id);
-        if !self.buffer.is_empty() {
-            self.buffer.push(Self::NEWLINE_BYTE);
-            let mut bufs_to_write = vec![];
-            if let Some(prefix) = self.prefix.as_ref() {
-                if last_writer_id != Some(self.handle_id) {
-                    bufs_to_write.push(prefix.as_slice());
-                }
-            }
-            bufs_to_write.push(self.buffer.as_slice());
-
-            Self::write_bufs(&mut inner.writer, bufs_to_write.as_slice())?;
-            self.buffer.clear();
-        }
-        inner.writer.flush()
-    }
-}
-
-impl<W: 'static + Write + Send + Sync> std::ops::Drop for ShellWriterHandle<W> {
-    fn drop(&mut self) {
-        let _ = self.flush();
-    }
-}
-
 /// A reporter that outputs results and artifacts to a single stream, usually stdout.
 /// This reporter is intended to provide "live" updates to a developer watching while
 /// tests are executed.
@@ -220,7 +78,7 @@
                 entity_state_map: Mutex::new(entity_state_map),
                 completed_suites: AtomicU32::new(0),
             },
-            ShellWriterView(inner),
+            ShellWriterView::new(inner),
         )
     }
 }
@@ -434,7 +292,7 @@
             ArtifactType::RestrictedLog => {
                 // Restricted logs are saved for reporting when the entity completes.
                 let log_buffer = Arc::new(Mutex::new(ShellWriterHandleInner::new(vec![])));
-                entity.restricted_logs = Some(ShellWriterView(log_buffer.clone()));
+                entity.restricted_logs = Some(ShellWriterView::new(log_buffer.clone()));
                 Box::new(ShellWriterHandle::new_handle(log_buffer, None))
             }
         })
@@ -454,253 +312,10 @@
 #[cfg(test)]
 mod test {
     use super::*;
-    use crate::output::{CaseId, RunReporter, SuiteId};
-    use std::io::ErrorKind;
-
-    fn create_writer_inner_and_view(
-    ) -> (Arc<Mutex<ShellWriterHandleInner<Vec<u8>>>>, ShellWriterView<Vec<u8>>) {
-        let inner = Arc::new(Mutex::new(ShellWriterHandleInner::new(vec![])));
-        (inner.clone(), ShellWriterView(inner))
-    }
-
-    #[fuchsia::test]
-    fn single_handle() {
-        let (handle_inner, output) = create_writer_inner_and_view();
-        let mut write_handle = ShellWriterHandle::new_handle(handle_inner, None);
-
-        assert_eq!(write_handle.write(b"hello world").unwrap(), b"hello world".len(),);
-        assert!(output.lock().is_empty());
-
-        assert_eq!(write_handle.write(b"\n").unwrap(), b"\n".len(),);
-        assert_eq!(output.lock().as_slice(), b"hello world\n");
-
-        assert_eq!(write_handle.write(b"flushed output").unwrap(), b"flushed output".len(),);
-        write_handle.flush().unwrap();
-        assert_eq!(output.lock().as_slice(), b"hello world\nflushed output\n");
-    }
-
-    #[fuchsia::test]
-    fn single_handle_with_prefix() {
-        let (handle_inner, output) = create_writer_inner_and_view();
-        let mut write_handle =
-            ShellWriterHandle::new_handle(handle_inner, Some("[prefix] ".to_string()));
-
-        assert_eq!(write_handle.write(b"hello world").unwrap(), b"hello world".len(),);
-        assert!(output.lock().is_empty());
-
-        assert_eq!(write_handle.write(b"\n").unwrap(), b"\n".len(),);
-        assert_eq!(output.lock().as_slice(), b"[prefix] hello world\n");
-
-        assert_eq!(write_handle.write(b"flushed output").unwrap(), b"flushed output".len(),);
-        write_handle.flush().unwrap();
-        assert_eq!(output.lock().as_slice(), b"[prefix] hello world\nflushed output\n");
-    }
-
-    #[fuchsia::test]
-    fn single_handle_multiple_line() {
-        let (handle_inner, output) = create_writer_inner_and_view();
-        let mut write_handle = ShellWriterHandle::new_handle(handle_inner, None);
-        const WRITE_BYTES: &[u8] = b"This is a \nmultiline output \nwithout newline termination";
-        assert_eq!(write_handle.write(WRITE_BYTES).unwrap(), WRITE_BYTES.len(),);
-        assert_eq!(output.lock().as_slice(), b"This is a \nmultiline output \n");
-        write_handle.flush().unwrap();
-        assert_eq!(
-            output.lock().as_slice(),
-            b"This is a \nmultiline output \nwithout newline termination\n"
-        );
-        output.lock().clear();
-
-        const TERMINATED_BYTES: &[u8] = b"This is \nnewline terminated \noutput\n";
-        assert_eq!(write_handle.write(TERMINATED_BYTES).unwrap(), TERMINATED_BYTES.len(),);
-        assert_eq!(output.lock().as_slice(), b"This is \nnewline terminated \noutput\n");
-    }
-
-    #[fuchsia::test]
-    fn single_handle_exceed_buffer_in_single_write() {
-        const CAPACITY: usize = ShellWriterHandle::<Vec<u8>>::BUFFER_CAPACITY;
-        // each case consists of a sequence of pairs, where each pair is a string to write, and
-        // the expected output after writing the string.
-        let cases = vec![
-            (
-                "exceed in one write",
-                vec![("a".repeat(CAPACITY + 1), format!("{}\n", "a".repeat(CAPACITY + 1)))],
-            ),
-            (
-                "exceed on second write",
-                vec![
-                    ("a".to_string(), "".to_string()),
-                    ("a".repeat(CAPACITY + 1), format!("{}\n", "a".repeat(CAPACITY + 2))),
-                ],
-            ),
-            (
-                "exceed in one write, with newline",
-                vec![(
-                    format!("\n{}", "a".repeat(CAPACITY + 1)),
-                    format!("\n{}\n", "a".repeat(CAPACITY + 1)),
-                )],
-            ),
-            (
-                "exceed in two writes, with newline",
-                vec![
-                    ("a".to_string(), "".to_string()),
-                    (
-                        format!("\n{}", "a".repeat(CAPACITY + 1)),
-                        format!("a\n{}\n", "a".repeat(CAPACITY + 1)),
-                    ),
-                ],
-            ),
-        ];
-
-        for (case_name, writes) in cases.into_iter() {
-            let (handle_inner, output) = create_writer_inner_and_view();
-            let mut write_handle = ShellWriterHandle::new_handle(handle_inner, None);
-            for (write_no, (to_write, expected)) in writes.into_iter().enumerate() {
-                assert_eq!(
-                    write_handle.write(to_write.as_bytes()).unwrap(),
-                    to_write.as_bytes().len(),
-                    "Got wrong number of bytes written for write {:?} in case {}",
-                    write_no,
-                    case_name
-                );
-                assert_eq!(
-                    String::from_utf8(output.lock().clone()).unwrap(),
-                    expected,
-                    "Buffer contains unexpected contents after write {:?} in case {}",
-                    write_no,
-                    case_name
-                )
-            }
-        }
-    }
-
-    #[fuchsia::test]
-    fn single_handle_with_prefix_multiple_line() {
-        let (handle_inner, output) = create_writer_inner_and_view();
-        let mut write_handle =
-            ShellWriterHandle::new_handle(handle_inner, Some("[prefix] ".to_string()));
-        const WRITE_BYTES: &[u8] = b"This is a \nmultiline output \nwithout newline termination";
-        assert_eq!(write_handle.write(WRITE_BYTES).unwrap(), WRITE_BYTES.len(),);
-        // Note we 'chunk' output in each write to avoid spamming the prefix, so the second
-        // line won't contain the prefix.
-        assert_eq!(output.lock().as_slice(), b"[prefix] This is a \nmultiline output \n");
-        write_handle.flush().unwrap();
-        assert_eq!(
-            output.lock().as_slice(),
-            "[prefix] This is a \nmultiline output \nwithout newline termination\n".as_bytes()
-        );
-
-        const TERMINATED_BYTES: &[u8] = b"This is \nnewline terminated \noutput\n";
-        assert_eq!(write_handle.write(TERMINATED_BYTES).unwrap(), TERMINATED_BYTES.len(),);
-        assert_eq!(
-            output.lock().as_slice(),
-            b"[prefix] This is a \nmultiline output \n\
-            without newline termination\nThis is \nnewline terminated \noutput\n"
-        );
-    }
-
-    #[fuchsia::test]
-    fn multiple_handles() {
-        let (handle_inner, output) = create_writer_inner_and_view();
-        let mut handle_1 =
-            ShellWriterHandle::new_handle(handle_inner.clone(), Some("[1] ".to_string()));
-        let mut handle_2 = ShellWriterHandle::new_handle(handle_inner, Some("[2] ".to_string()));
-
-        write!(handle_1, "hi from 1").unwrap();
-        write!(handle_2, "hi from 2").unwrap();
-        assert!(output.lock().is_empty());
-        write!(handle_1, "\n").unwrap();
-        assert_eq!(output.lock().as_slice(), "[1] hi from 1\n".as_bytes());
-        write!(handle_2, "\n").unwrap();
-        assert_eq!(output.lock().as_slice(), "[1] hi from 1\n[2] hi from 2\n".as_bytes());
-    }
-
-    // The following tests verify behavior of the shell writer when the inner writer
-    // exhibits some allowed edge cases.
-
-    #[fuchsia::test]
-    fn output_written_when_inner_writer_writes_partial_buffer() {
-        /// A writer that writes at most 3 bytes at a time.
-        struct PartialOutputWriter(Vec<u8>);
-        impl Write for PartialOutputWriter {
-            fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
-                if buf.len() >= 3 {
-                    self.0.write(&buf[..3])
-                } else {
-                    self.0.write(buf)
-                }
-            }
-
-            fn flush(&mut self) -> Result<(), Error> {
-                self.0.flush()
-            }
-        }
-
-        let inner = Arc::new(Mutex::new(ShellWriterHandleInner::new(PartialOutputWriter(vec![]))));
-        let output = ShellWriterView(inner.clone());
-        let mut write_handle =
-            ShellWriterHandle::new_handle(inner.clone(), Some("[prefix] ".to_string()));
-        assert_eq!(write_handle.write(b"hello").unwrap(), b"hello".len());
-        assert!(output.lock().0.is_empty());
-        assert_eq!(write_handle.write(b"\n").unwrap(), b"\n".len());
-        assert_eq!(output.lock().0.as_slice(), b"[prefix] hello\n");
-
-        let mut write_handle_2 =
-            ShellWriterHandle::new_handle(inner, Some("[prefix2] ".to_string()));
-
-        assert_eq!(write_handle.write(b"hello").unwrap(), b"hello".len());
-        assert_eq!(write_handle_2.write(b"world").unwrap(), b"world".len());
-        assert_eq!(output.lock().0.as_slice(), b"[prefix] hello\n");
-        assert_eq!(write_handle.write(b"\n").unwrap(), b"\n".len());
-        assert_eq!(output.lock().0.as_slice(), b"[prefix] hello\nhello\n");
-        assert_eq!(write_handle_2.write(b"\n").unwrap(), b"\n".len());
-        assert_eq!(output.lock().0.as_slice(), b"[prefix] hello\nhello\n[prefix2] world\n");
-    }
-
-    #[fuchsia::test]
-    fn output_written_when_inner_writer_returns_interrupted() {
-        /// A writer that returns interrupted on the first write attempt
-        struct InterruptWriter {
-            buf: Vec<u8>,
-            returned_interrupt: bool,
-        }
-        impl Write for InterruptWriter {
-            fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
-                if !self.returned_interrupt {
-                    self.returned_interrupt = true;
-                    Err(ErrorKind::Interrupted.into())
-                } else {
-                    self.buf.write(buf)
-                }
-            }
-
-            fn flush(&mut self) -> Result<(), Error> {
-                self.buf.flush()
-            }
-        }
-
-        let inner = Arc::new(Mutex::new(ShellWriterHandleInner::new(InterruptWriter {
-            buf: vec![],
-            returned_interrupt: false,
-        })));
-        let output = ShellWriterView(inner.clone());
-        let mut write_handle =
-            ShellWriterHandle::new_handle(inner.clone(), Some("[prefix] ".to_string()));
-        assert_eq!(write_handle.write(b"hello").unwrap(), b"hello".len());
-        assert!(output.lock().buf.is_empty());
-        assert_eq!(write_handle.write(b"\n").unwrap(), b"\n".len());
-        assert_eq!(output.lock().buf.as_slice(), b"[prefix] hello\n");
-
-        let mut write_handle_2 =
-            ShellWriterHandle::new_handle(inner.clone(), Some("[prefix2] ".to_string()));
-
-        assert_eq!(write_handle.write(b"hello").unwrap(), b"hello".len());
-        assert_eq!(write_handle_2.write(b"world").unwrap(), b"world".len());
-        assert_eq!(output.lock().buf.as_slice(), b"[prefix] hello\n");
-        assert_eq!(write_handle.write(b"\n").unwrap(), b"\n".len());
-        assert_eq!(output.lock().buf.as_slice(), b"[prefix] hello\nhello\n");
-        assert_eq!(write_handle_2.write(b"\n").unwrap(), b"\n".len());
-        assert_eq!(output.lock().buf.as_slice(), b"[prefix] hello\nhello\n[prefix2] world\n");
-    }
+    use {
+        crate::output::{CaseId, RunReporter, SuiteId},
+        std::io::Write,
+    };
 
     #[fuchsia::test]
     async fn report_case_events() {
diff --git a/src/sys/run_test_suite/src/output/shell/writer.rs b/src/sys/run_test_suite/src/output/shell/writer.rs
new file mode 100644
index 0000000..289fb8c
--- /dev/null
+++ b/src/sys/run_test_suite/src/output/shell/writer.rs
@@ -0,0 +1,413 @@
+// Copyright 2022 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use crate::trace::duration;
+use parking_lot::Mutex;
+use std::{
+    io::{Error, Write},
+    sync::Arc,
+};
+
+/// A handle around an inner writer. This serves as a "multiplexing" writer that
+/// writes bytes from multiple sources into a single serial destination, typically
+/// to stdout.
+/// Output sent to a handle is buffered until a newline is encountered, then the
+/// buffered output is written to the inner writer.
+/// The handle also supports prepending a prefix to the start of each buffer. This
+/// helps preserve existing behavior where prefixes are added to the start of stdout
+/// and log lines to help a developer understand what produced some output.
+pub(super) struct ShellWriterHandle<W: 'static + Write + Send + Sync> {
+    inner: Arc<Mutex<ShellWriterHandleInner<W>>>,
+    buffer: Vec<u8>,
+    /// Prefix, if any, to prepend to output before writing to the inner writer.
+    prefix: Option<Vec<u8>>,
+    handle_id: u32,
+}
+
+impl<W: 'static + Write + Send + Sync> ShellWriterHandle<W> {
+    const NEWLINE_BYTE: u8 = b'\n';
+    const BUFFER_CAPACITY: usize = 1024;
+
+    /// Create a new handle to a wrapped writer.
+    pub(super) fn new_handle(
+        inner: Arc<Mutex<ShellWriterHandleInner<W>>>,
+        prefix: Option<String>,
+    ) -> Self {
+        let mut lock = inner.lock();
+        let handle_id = lock.num_handles;
+        lock.num_handles += 1;
+        drop(lock);
+        Self {
+            inner,
+            buffer: Vec::with_capacity(Self::BUFFER_CAPACITY),
+            prefix: prefix.map(String::into_bytes),
+            handle_id,
+        }
+    }
+
+    /// Write a full line to the inner writer.
+    fn write_bufs(writer: &mut W, bufs: &[&[u8]]) -> Result<(), Error> {
+        for buf in bufs {
+            writer.write_all(buf)?;
+        }
+        Ok(())
+    }
+}
+
+/// Inner mutable state for |ShellWriterHandle|.
+pub(super) struct ShellWriterHandleInner<W: 'static + Write + Send + Sync> {
+    /// The writer to which all content is passed.
+    writer: W,
+    /// The id of the last handle that wrote to the writer, used to conditionally
+    /// output a prefix only when the handle writing to the output changes.
+    last_writer_id: Option<u32>,
+    /// The number of handles that have been created. Used to assign ids to handles.
+    num_handles: u32,
+}
+
+impl<W: 'static + Write + Send + Sync> ShellWriterHandleInner<W> {
+    pub(super) fn new(writer: W) -> Self {
+        Self { writer, last_writer_id: None, num_handles: 0 }
+    }
+}
+
+/// A handle to a writer contained in a |ShellWriterHandle|. This is exposed for testing
+/// purposes.
+pub struct ShellWriterView<W: 'static + Write + Send + Sync>(Arc<Mutex<ShellWriterHandleInner<W>>>);
+
+impl<W: 'static + Write + Send + Sync> ShellWriterView<W> {
+    pub(super) fn new(inner: Arc<Mutex<ShellWriterHandleInner<W>>>) -> Self {
+        Self(inner)
+    }
+
+    pub fn lock(&self) -> parking_lot::MappedMutexGuard<'_, W> {
+        parking_lot::MutexGuard::map(self.0.lock(), |handle_inner| &mut handle_inner.writer)
+    }
+}
+
+impl<W: 'static + Write + Send + Sync> Write for ShellWriterHandle<W> {
+    fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
+        duration!("shell_write");
+        // find the last newline in the buffer. In case multiple lines are written as once,
+        // we should write once to the inner writer and add our prefix only once. This helps
+        // avoid spamming the output with prefixes in case many lines are present.
+        let newline_pos = buf
+            .iter()
+            .rev()
+            .position(|byte| *byte == Self::NEWLINE_BYTE)
+            .map(|pos_from_end| buf.len() - pos_from_end - 1);
+        // In case we'd exceed the buffer, just wrte everything, but append a newline to avoid
+        // interspersing.
+        let (final_byte_pos, append_newline) = match newline_pos {
+            // no newline, pushing all to buffer would exceed capacity
+            None if self.buffer.len() + buf.len() > Self::BUFFER_CAPACITY => (buf.len() - 1, true),
+            None => {
+                self.buffer.extend_from_slice(buf);
+                return Ok(buf.len());
+            }
+            // newline exists, but the rest of buf would exceed capacity.
+            Some(pos) if buf.len() - pos > Self::BUFFER_CAPACITY => (buf.len() - 1, true),
+            Some(pos) => (pos, false),
+        };
+
+        let mut inner = self.inner.lock();
+        let last_writer_id = inner.last_writer_id.replace(self.handle_id);
+
+        let mut bufs_to_write = vec![];
+        if let Some(prefix) = self.prefix.as_ref() {
+            if last_writer_id != Some(self.handle_id) {
+                bufs_to_write.push(prefix.as_slice());
+            }
+        }
+        if !self.buffer.is_empty() {
+            bufs_to_write.push(self.buffer.as_slice());
+        }
+        bufs_to_write.push(&buf[..final_byte_pos + 1]);
+        if append_newline {
+            bufs_to_write.push(&[Self::NEWLINE_BYTE]);
+        }
+
+        Self::write_bufs(&mut inner.writer, bufs_to_write.as_slice())?;
+
+        self.buffer.clear();
+        self.buffer.extend_from_slice(&buf[final_byte_pos + 1..]);
+        Ok(buf.len())
+    }
+
+    fn flush(&mut self) -> Result<(), Error> {
+        let mut inner = self.inner.lock();
+        let last_writer_id = inner.last_writer_id.replace(self.handle_id);
+        if !self.buffer.is_empty() {
+            self.buffer.push(Self::NEWLINE_BYTE);
+            let mut bufs_to_write = vec![];
+            if let Some(prefix) = self.prefix.as_ref() {
+                if last_writer_id != Some(self.handle_id) {
+                    bufs_to_write.push(prefix.as_slice());
+                }
+            }
+            bufs_to_write.push(self.buffer.as_slice());
+
+            Self::write_bufs(&mut inner.writer, bufs_to_write.as_slice())?;
+            self.buffer.clear();
+        }
+        inner.writer.flush()
+    }
+}
+
+impl<W: 'static + Write + Send + Sync> std::ops::Drop for ShellWriterHandle<W> {
+    fn drop(&mut self) {
+        let _ = self.flush();
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use std::io::ErrorKind;
+
+    fn create_writer_inner_and_view(
+    ) -> (Arc<Mutex<ShellWriterHandleInner<Vec<u8>>>>, ShellWriterView<Vec<u8>>) {
+        let inner = Arc::new(Mutex::new(ShellWriterHandleInner::new(vec![])));
+        (inner.clone(), ShellWriterView(inner))
+    }
+
+    #[fuchsia::test]
+    fn single_handle() {
+        let (handle_inner, output) = create_writer_inner_and_view();
+        let mut write_handle = ShellWriterHandle::new_handle(handle_inner, None);
+
+        assert_eq!(write_handle.write(b"hello world").unwrap(), b"hello world".len(),);
+        assert!(output.lock().is_empty());
+
+        assert_eq!(write_handle.write(b"\n").unwrap(), b"\n".len(),);
+        assert_eq!(output.lock().as_slice(), b"hello world\n");
+
+        assert_eq!(write_handle.write(b"flushed output").unwrap(), b"flushed output".len(),);
+        write_handle.flush().unwrap();
+        assert_eq!(output.lock().as_slice(), b"hello world\nflushed output\n");
+    }
+
+    #[fuchsia::test]
+    fn single_handle_with_prefix() {
+        let (handle_inner, output) = create_writer_inner_and_view();
+        let mut write_handle =
+            ShellWriterHandle::new_handle(handle_inner, Some("[prefix] ".to_string()));
+
+        assert_eq!(write_handle.write(b"hello world").unwrap(), b"hello world".len(),);
+        assert!(output.lock().is_empty());
+
+        assert_eq!(write_handle.write(b"\n").unwrap(), b"\n".len(),);
+        assert_eq!(output.lock().as_slice(), b"[prefix] hello world\n");
+
+        assert_eq!(write_handle.write(b"flushed output").unwrap(), b"flushed output".len(),);
+        write_handle.flush().unwrap();
+        assert_eq!(output.lock().as_slice(), b"[prefix] hello world\nflushed output\n");
+    }
+
+    #[fuchsia::test]
+    fn single_handle_multiple_line() {
+        let (handle_inner, output) = create_writer_inner_and_view();
+        let mut write_handle = ShellWriterHandle::new_handle(handle_inner, None);
+        const WRITE_BYTES: &[u8] = b"This is a \nmultiline output \nwithout newline termination";
+        assert_eq!(write_handle.write(WRITE_BYTES).unwrap(), WRITE_BYTES.len(),);
+        assert_eq!(output.lock().as_slice(), b"This is a \nmultiline output \n");
+        write_handle.flush().unwrap();
+        assert_eq!(
+            output.lock().as_slice(),
+            b"This is a \nmultiline output \nwithout newline termination\n"
+        );
+        output.lock().clear();
+
+        const TERMINATED_BYTES: &[u8] = b"This is \nnewline terminated \noutput\n";
+        assert_eq!(write_handle.write(TERMINATED_BYTES).unwrap(), TERMINATED_BYTES.len(),);
+        assert_eq!(output.lock().as_slice(), b"This is \nnewline terminated \noutput\n");
+    }
+
+    #[fuchsia::test]
+    fn single_handle_exceed_buffer_in_single_write() {
+        const CAPACITY: usize = ShellWriterHandle::<Vec<u8>>::BUFFER_CAPACITY;
+        // each case consists of a sequence of pairs, where each pair is a string to write, and
+        // the expected output after writing the string.
+        let cases = vec![
+            (
+                "exceed in one write",
+                vec![("a".repeat(CAPACITY + 1), format!("{}\n", "a".repeat(CAPACITY + 1)))],
+            ),
+            (
+                "exceed on second write",
+                vec![
+                    ("a".to_string(), "".to_string()),
+                    ("a".repeat(CAPACITY + 1), format!("{}\n", "a".repeat(CAPACITY + 2))),
+                ],
+            ),
+            (
+                "exceed in one write, with newline",
+                vec![(
+                    format!("\n{}", "a".repeat(CAPACITY + 1)),
+                    format!("\n{}\n", "a".repeat(CAPACITY + 1)),
+                )],
+            ),
+            (
+                "exceed in two writes, with newline",
+                vec![
+                    ("a".to_string(), "".to_string()),
+                    (
+                        format!("\n{}", "a".repeat(CAPACITY + 1)),
+                        format!("a\n{}\n", "a".repeat(CAPACITY + 1)),
+                    ),
+                ],
+            ),
+        ];
+
+        for (case_name, writes) in cases.into_iter() {
+            let (handle_inner, output) = create_writer_inner_and_view();
+            let mut write_handle = ShellWriterHandle::new_handle(handle_inner, None);
+            for (write_no, (to_write, expected)) in writes.into_iter().enumerate() {
+                assert_eq!(
+                    write_handle.write(to_write.as_bytes()).unwrap(),
+                    to_write.as_bytes().len(),
+                    "Got wrong number of bytes written for write {:?} in case {}",
+                    write_no,
+                    case_name
+                );
+                assert_eq!(
+                    String::from_utf8(output.lock().clone()).unwrap(),
+                    expected,
+                    "Buffer contains unexpected contents after write {:?} in case {}",
+                    write_no,
+                    case_name
+                )
+            }
+        }
+    }
+
+    #[fuchsia::test]
+    fn single_handle_with_prefix_multiple_line() {
+        let (handle_inner, output) = create_writer_inner_and_view();
+        let mut write_handle =
+            ShellWriterHandle::new_handle(handle_inner, Some("[prefix] ".to_string()));
+        const WRITE_BYTES: &[u8] = b"This is a \nmultiline output \nwithout newline termination";
+        assert_eq!(write_handle.write(WRITE_BYTES).unwrap(), WRITE_BYTES.len(),);
+        // Note we 'chunk' output in each write to avoid spamming the prefix, so the second
+        // line won't contain the prefix.
+        assert_eq!(output.lock().as_slice(), b"[prefix] This is a \nmultiline output \n");
+        write_handle.flush().unwrap();
+        assert_eq!(
+            output.lock().as_slice(),
+            "[prefix] This is a \nmultiline output \nwithout newline termination\n".as_bytes()
+        );
+
+        const TERMINATED_BYTES: &[u8] = b"This is \nnewline terminated \noutput\n";
+        assert_eq!(write_handle.write(TERMINATED_BYTES).unwrap(), TERMINATED_BYTES.len(),);
+        assert_eq!(
+            output.lock().as_slice(),
+            b"[prefix] This is a \nmultiline output \n\
+            without newline termination\nThis is \nnewline terminated \noutput\n"
+        );
+    }
+
+    #[fuchsia::test]
+    fn multiple_handles() {
+        let (handle_inner, output) = create_writer_inner_and_view();
+        let mut handle_1 =
+            ShellWriterHandle::new_handle(handle_inner.clone(), Some("[1] ".to_string()));
+        let mut handle_2 = ShellWriterHandle::new_handle(handle_inner, Some("[2] ".to_string()));
+
+        write!(handle_1, "hi from 1").unwrap();
+        write!(handle_2, "hi from 2").unwrap();
+        assert!(output.lock().is_empty());
+        write!(handle_1, "\n").unwrap();
+        assert_eq!(output.lock().as_slice(), "[1] hi from 1\n".as_bytes());
+        write!(handle_2, "\n").unwrap();
+        assert_eq!(output.lock().as_slice(), "[1] hi from 1\n[2] hi from 2\n".as_bytes());
+    }
+
+    // The following tests verify behavior of the shell writer when the inner writer
+    // exhibits some allowed edge cases.
+
+    #[fuchsia::test]
+    fn output_written_when_inner_writer_writes_partial_buffer() {
+        /// A writer that writes at most 3 bytes at a time.
+        struct PartialOutputWriter(Vec<u8>);
+        impl Write for PartialOutputWriter {
+            fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
+                if buf.len() >= 3 {
+                    self.0.write(&buf[..3])
+                } else {
+                    self.0.write(buf)
+                }
+            }
+
+            fn flush(&mut self) -> Result<(), Error> {
+                self.0.flush()
+            }
+        }
+
+        let inner = Arc::new(Mutex::new(ShellWriterHandleInner::new(PartialOutputWriter(vec![]))));
+        let output = ShellWriterView(inner.clone());
+        let mut write_handle =
+            ShellWriterHandle::new_handle(inner.clone(), Some("[prefix] ".to_string()));
+        assert_eq!(write_handle.write(b"hello").unwrap(), b"hello".len());
+        assert!(output.lock().0.is_empty());
+        assert_eq!(write_handle.write(b"\n").unwrap(), b"\n".len());
+        assert_eq!(output.lock().0.as_slice(), b"[prefix] hello\n");
+
+        let mut write_handle_2 =
+            ShellWriterHandle::new_handle(inner, Some("[prefix2] ".to_string()));
+
+        assert_eq!(write_handle.write(b"hello").unwrap(), b"hello".len());
+        assert_eq!(write_handle_2.write(b"world").unwrap(), b"world".len());
+        assert_eq!(output.lock().0.as_slice(), b"[prefix] hello\n");
+        assert_eq!(write_handle.write(b"\n").unwrap(), b"\n".len());
+        assert_eq!(output.lock().0.as_slice(), b"[prefix] hello\nhello\n");
+        assert_eq!(write_handle_2.write(b"\n").unwrap(), b"\n".len());
+        assert_eq!(output.lock().0.as_slice(), b"[prefix] hello\nhello\n[prefix2] world\n");
+    }
+
+    #[fuchsia::test]
+    fn output_written_when_inner_writer_returns_interrupted() {
+        /// A writer that returns interrupted on the first write attempt
+        struct InterruptWriter {
+            buf: Vec<u8>,
+            returned_interrupt: bool,
+        }
+        impl Write for InterruptWriter {
+            fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
+                if !self.returned_interrupt {
+                    self.returned_interrupt = true;
+                    Err(ErrorKind::Interrupted.into())
+                } else {
+                    self.buf.write(buf)
+                }
+            }
+
+            fn flush(&mut self) -> Result<(), Error> {
+                self.buf.flush()
+            }
+        }
+
+        let inner = Arc::new(Mutex::new(ShellWriterHandleInner::new(InterruptWriter {
+            buf: vec![],
+            returned_interrupt: false,
+        })));
+        let output = ShellWriterView(inner.clone());
+        let mut write_handle =
+            ShellWriterHandle::new_handle(inner.clone(), Some("[prefix] ".to_string()));
+        assert_eq!(write_handle.write(b"hello").unwrap(), b"hello".len());
+        assert!(output.lock().buf.is_empty());
+        assert_eq!(write_handle.write(b"\n").unwrap(), b"\n".len());
+        assert_eq!(output.lock().buf.as_slice(), b"[prefix] hello\n");
+
+        let mut write_handle_2 =
+            ShellWriterHandle::new_handle(inner.clone(), Some("[prefix2] ".to_string()));
+
+        assert_eq!(write_handle.write(b"hello").unwrap(), b"hello".len());
+        assert_eq!(write_handle_2.write(b"world").unwrap(), b"world".len());
+        assert_eq!(output.lock().buf.as_slice(), b"[prefix] hello\n");
+        assert_eq!(write_handle.write(b"\n").unwrap(), b"\n".len());
+        assert_eq!(output.lock().buf.as_slice(), b"[prefix] hello\nhello\n");
+        assert_eq!(write_handle_2.write(b"\n").unwrap(), b"\n".len());
+        assert_eq!(output.lock().buf.as_slice(), b"[prefix] hello\nhello\n[prefix2] world\n");
+    }
+}