blob: f7a4e088ddff914002c1a3bc44bc0f7ff041324c [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
anyhow::{anyhow, Context, Result},
async_fs::{create_dir_all, read_dir, remove_dir_all, remove_file, File, OpenOptions},
async_lock::{RwLock, RwLockWriteGuard},
async_trait::async_trait,
diagnostics_data::Timestamp,
ffx_config::get,
ffx_log_data::{LogData, LogEntry},
fidl_fuchsia_developer_ffx::StreamMode,
futures::{
channel::oneshot,
future::{LocalBoxFuture, Shared},
FutureExt,
},
futures_lite::io::{BufReader, Lines},
futures_lite::stream::{Stream, StreamExt},
futures_lite::{AsyncBufReadExt, AsyncWriteExt},
std::{
collections::HashMap, convert::TryInto, fmt, io::ErrorKind, iter::Iterator, path::PathBuf,
pin::Pin, sync::Arc, task::Poll, time::Duration,
},
};
const CACHE_DIRECTORY_CONFIG: &str = "proactive_log.cache_directory";
const MAX_LOG_SIZE_CONFIG: &str = "proactive_log.max_log_size_bytes";
const MAX_SESSION_SIZE_CONFIG: &str = "proactive_log.max_session_size_bytes";
const MAX_SESSIONS_CONFIG: &str = "proactive_log.max_sessions_per_target";
const READ_STREAM_BUFFER_SIZE: usize = 1000;
struct LogFileEntries {
lines: Lines<BufReader<Box<File>>>,
}
impl LogFileEntries {
fn new(lines: Lines<BufReader<Box<File>>>) -> Self {
Self { lines }
}
}
impl Stream for LogFileEntries {
type Item = Result<LogEntry>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut futures::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.lines).poll_next(cx).map(|line_opt| {
line_opt.map(|line| match line {
Ok(line) => serde_json::from_str(&line).map_err(|e| anyhow!(e)),
Err(e) => Err(anyhow!(e)),
})
})
}
}
async fn sort_directory(parent: &PathBuf) -> Result<Vec<PathBuf>> {
let mut reader = read_dir(parent.clone()).await?;
let mut result = vec![];
while let Some(ent) = reader.try_next().await? {
let fname_num: u64 = match String::from(ent.file_name().to_string_lossy()).parse() {
Ok(name) => name,
Err(_) => continue,
};
result.push((fname_num, ent));
}
result.sort_by_key(|tup| tup.0);
Ok(result.iter().map(|tup| tup.1.path()).collect())
}
#[derive(Debug)]
struct LogFile {
path: PathBuf,
// TODO(fxbug.dev/84729)
#[allow(unused)]
parent: TargetSessionDirectory,
file: Option<Box<File>>,
}
impl LogFile {
fn new(path: PathBuf, parent: TargetSessionDirectory) -> Self {
Self { path, parent, file: None }
}
fn from_file(path: PathBuf, parent: TargetSessionDirectory, file: Box<File>) -> Self {
Self { path, parent, file: Some(file) }
}
async fn create(parent: TargetSessionDirectory) -> Result<Self> {
let fname = match parent.latest_file().await? {
Some(f) => f.parsed_name()? + 1,
None => 1,
};
let mut file_path = parent.to_path_buf();
file_path.push(fname.to_string());
let mut options = OpenOptions::new();
options.create(true).write(true);
let f = options
.open(file_path.clone())
.await
.context("opening output file")
.map(|f| Box::new(f))?;
Ok(Self::from_file(file_path, parent.clone(), f))
}
fn parsed_name(&self) -> Result<u64> {
Ok(String::from(
self.path
.file_name()
.ok_or(anyhow!("invalid path name {:?}", &self.path.to_str()))?
.to_string_lossy(),
)
.parse()?)
}
async fn get_file(&mut self) -> Result<&mut Box<File>> {
if self.file.is_none() {
let mut options = OpenOptions::new();
options.append(true);
self.file = Some(
options
.open(self.path.clone())
.await
.context("opening output file")
.map(|f| Box::new(f))?,
);
}
Ok(self.file.as_mut().unwrap())
}
/// Utility function for immediately opening the underlying file
/// for this LogFile.
pub async fn force_open(&mut self) -> Result<()> {
self.get_file().await.map(|_| ())
}
async fn bytes_in_file(&mut self) -> Result<usize> {
Ok(self.get_file().await?.metadata().await?.len().try_into()?)
}
async fn remove(&mut self) -> Result<()> {
remove_file(self.path.clone()).await?;
self.file = None;
Ok(())
}
async fn write_entries(&mut self, entries: Vec<&'_ Vec<u8>>) -> Result<()> {
let f = self.get_file().await?;
for entry in entries.iter() {
f.write_all(entry.as_slice()).await?;
}
f.flush().await?;
f.sync_data().await?;
Ok(())
}
async fn stream_entries(&self) -> Result<LogFileEntries> {
Ok(LogFileEntries::new(
BufReader::new(Box::new(File::open(self.path.clone()).await?)).lines(),
))
}
}
impl fmt::Display for &LogFile {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "<Log file wrapping '{}'>", &self.path.to_string_lossy())
}
}
#[derive(Clone, Debug)]
pub struct TargetLogDirectory {
root: PathBuf,
target_identifier: String,
}
impl TargetLogDirectory {
pub async fn new(target_identifier: String) -> Result<Self> {
let mut root: PathBuf = get(CACHE_DIRECTORY_CONFIG).await?;
root.push(target_identifier.clone());
Ok(Self { target_identifier, root })
}
#[cfg(test)]
fn new_with_root(root_dir: PathBuf, target_identifier: String) -> Self {
let mut root = root_dir.clone();
root.push(target_identifier.clone());
Self { root, target_identifier }
}
pub fn target_identifier(&self) -> String {
self.target_identifier.clone()
}
pub fn exists(&self) -> bool {
self.root.exists()
}
pub async fn all_targets() -> Result<Vec<Self>> {
let root: PathBuf = get(CACHE_DIRECTORY_CONFIG).await?;
let mut iterator = read_dir(root).await?;
let mut result = vec![];
while let Some(entry) = iterator.try_next().await? {
result.push(Self {
root: entry.path(),
target_identifier: entry.file_name().to_string_lossy().to_string(),
});
}
Ok(result)
}
pub async fn list_sessions(&self) -> Result<Vec<TargetSessionDirectory>> {
Ok(sort_directory(&self.root)
.await?
.into_iter()
.filter_map(|p| {
p.file_name().map(|n| n.to_str().map(|s| s.parse().ok())).flatten().flatten()
})
.map(|t| self.with_session(t))
.collect())
}
fn with_session(&self, timestamp_millis: usize) -> TargetSessionDirectory {
TargetSessionDirectory::new(self.clone(), timestamp_millis)
}
// Note that we need a different implementation here due to fxbug.dev/72876#c5.
// TODO(jwing): revisit this with a longer-term solution.
async fn sort_sessions(&self) -> Result<Vec<PathBuf>> {
let mut reader = read_dir(self.root.as_path()).await?;
let mut result: Vec<(i64, _)> = vec![];
while let Some(dir_ent) = reader.try_next().await? {
let first_chunk_path = dir_ent.path().join("1");
if first_chunk_path.exists() {
let f = LogFile::new(
first_chunk_path.clone(),
TargetSessionDirectory::new(
self.clone(),
dir_ent
.path()
.file_name()
.unwrap()
.to_string_lossy()
.to_string()
.parse()?,
),
);
let log_entry = f.stream_entries().await?.next().await;
if let Some(ent) = log_entry {
result.push((ent?.timestamp.into(), dir_ent));
}
}
}
result.sort_by_key(|tup| tup.0);
Ok(result.iter().map(|tup| tup.1.path()).collect())
}
pub async fn clean_sessions(&self, max_sessions: usize) -> Result<()> {
let entries = self.sort_sessions().await?;
if entries.len() > max_sessions {
let end = entries.len() - max_sessions;
let entries_to_remove = &entries[..end];
for path in entries_to_remove.iter() {
log::info!("[logger] garbage collecting log directory: {:?}", &path);
remove_dir_all(path).await?;
}
}
Ok(())
}
fn to_path_buf(&self) -> PathBuf {
self.root.clone()
}
}
#[derive(Clone, Debug)]
pub struct TargetSessionDirectory {
target_root: TargetLogDirectory,
full_path: PathBuf,
timestamp_millis: usize,
}
impl TargetSessionDirectory {
fn new(target_root: TargetLogDirectory, timestamp_millis: usize) -> Self {
let mut pb = target_root.to_path_buf();
pb.push(timestamp_millis.to_string());
Self { target_root, full_path: pb, timestamp_millis }
}
async fn sort_entries(&self) -> Result<Vec<LogFile>> {
Ok(sort_directory(&self.full_path)
.await?
.iter()
.map(|p| LogFile::new(p.clone(), self.clone()))
.collect())
}
fn parent(&self) -> TargetLogDirectory {
self.target_root.clone()
}
async fn latest_file(&self) -> Result<Option<LogFile>> {
self.sort_entries().await.map(|v| v.into_iter().rev().next())
}
async fn create_file(&self) -> Result<LogFile> {
LogFile::create(self.clone()).await
}
fn to_path_buf(&self) -> PathBuf {
self.full_path.clone()
}
pub fn timestamp_nanos(&self) -> i64 {
Duration::from_millis(self.timestamp_millis as u64).as_nanos() as i64
}
}
struct CachedSessionStream {
file_iter: Option<LogFileEntries>,
chunks: Vec<LogFile>,
index: usize,
start_target_timestamp: Option<Timestamp>,
end_timestamp: Option<Timestamp>,
finished: bool,
}
impl CachedSessionStream {
async fn new(
session_dir: TargetSessionDirectory,
start_target_timestamp: Option<Timestamp>,
end_timestamp: Option<Timestamp>,
stream_mode: StreamMode,
) -> Result<Self> {
let mut entries = session_dir.sort_entries().await?;
if stream_mode == StreamMode::SnapshotRecentThenSubscribe && entries.len() > 4 {
entries.drain(0..(entries.len() - 4));
}
if let Some(start_ts) = start_target_timestamp {
// We try to optimize the read by choosing the first log file that
// could possibly contain the provided start timestamp
let mut i = entries.len();
for file in entries.iter().rev() {
i -= 1;
let stream = match file.stream_entries().await {
Ok(s) => s,
Err(e) => {
// If this fails, just bail out on the optimization and continue on.
log::info!(
"non-critical failure to stream entries from log file '{}': {}",
file,
e
);
break;
}
};
let earliest_chunk_ts = stream
.filter_map(|e| e.ok())
.filter_map(|e| match e.data {
LogData::TargetLog(data) | LogData::SymbolizedTargetLog(data, _) => {
Some(data.metadata.timestamp)
}
_ => None,
})
.next()
.await;
if let Some(earliest_ts) = earliest_chunk_ts {
if earliest_ts < *start_ts {
entries.drain(0..i);
break;
}
}
}
}
// Force every LogFile to cache its underlying FS file. This allows to continue
// streaming data even if the files get garbage collected during iteration.
futures::future::join_all(entries.iter_mut().map(|e| e.force_open())).await;
Ok(Self {
chunks: entries,
index: 0,
file_iter: None,
finished: false,
start_target_timestamp,
end_timestamp,
})
}
pub async fn iter(&mut self) -> Result<Option<Result<LogEntry>>> {
if self.finished {
return Ok(None);
}
loop {
if self.file_iter.is_some() {
let val = self.file_iter.as_mut().unwrap().next().await;
if val.is_some() {
let entry = val.unwrap();
if !entry.is_ok() {
return Ok(Some(entry));
}
let entry = entry.unwrap();
if let Some(ts) = self.end_timestamp {
if entry.timestamp > ts {
self.finished = true;
return Ok(None);
}
}
match &entry.data {
LogData::TargetLog(data) | LogData::SymbolizedTargetLog(data, _) => {
if let Some(min_ts) = self.start_target_timestamp {
if data.metadata.timestamp < *min_ts {
continue;
}
}
}
_ => {
if self.start_target_timestamp.is_some() {
continue;
}
}
}
return Ok(Some(Ok(entry)));
} else {
self.file_iter = None;
self.index += 1;
}
}
if let Some(chunk) = self.chunks.get(self.index) {
let iterator = chunk.stream_entries().await?;
self.file_iter = Some(iterator);
continue;
} else {
self.finished = true;
return Ok(None);
}
}
}
}
pub struct SessionStream {
cache_stream: CachedSessionStream,
cache_read_finished: bool,
// We hold onto the Sender in order to create the receiver only at the point
// that we've finished iterating through the on-disk cache. This prevents
// duplicating any log entries.
read_stream: Arc<mpmc::Sender<LogEntry>>,
read_receiver: Option<mpmc::Receiver<LogEntry>>,
stream_mode: StreamMode,
}
impl SessionStream {
async fn new(
session_dir: TargetSessionDirectory,
start_target_timestamp: Option<Timestamp>,
end_timestamp: Option<Timestamp>,
stream_mode: StreamMode,
read_stream: Arc<mpmc::Sender<LogEntry>>,
) -> Result<Self> {
// In the case of a Subscribe only stream_mode, we can create the receiver immediately
// because we have no risk of duplicating on-disk log entries.
let receiver = if stream_mode == StreamMode::Subscribe {
Some(read_stream.new_receiver())
} else {
None
};
Ok(Self {
cache_stream: CachedSessionStream::new(
session_dir,
start_target_timestamp,
end_timestamp,
stream_mode,
)
.await?,
cache_read_finished: false,
read_receiver: receiver,
read_stream,
stream_mode: stream_mode,
})
}
pub async fn iter(&mut self) -> Result<Option<Result<LogEntry>>> {
// Initialize a mpmc listener if we will eventually subscribe, else some messages may be missed.
match self.stream_mode {
StreamMode::Subscribe
| StreamMode::SnapshotAllThenSubscribe
| StreamMode::SnapshotRecentThenSubscribe => {
if self.read_receiver.is_none() {
self.read_receiver = Some(self.read_stream.new_receiver());
}
}
_ => {}
}
if self.stream_mode != StreamMode::Subscribe && !self.cache_read_finished {
let res = self.cache_stream.iter().await;
if let Ok(opt) = res {
if opt.is_some() {
return Ok(opt);
} else {
self.cache_read_finished = true;
}
} else {
return res;
}
}
if self.stream_mode != StreamMode::SnapshotAll {
let res = self
.read_receiver
.as_mut()
.unwrap()
.next()
.map(|log_opt| Ok(log_opt.map(|l| Ok(l))))
.await;
return res;
} else {
return Ok(None);
}
}
}
/// A bit of explanation for the setup_* fields: they exist to facilitate the
/// wait_for_setup method on DiagnosticsStreamer.
///
/// `setup_fut` is a future polling the result of the Receiver end of
/// `setup_notifier`. Because it is `Shared`, it has two properties:
/// 1. It can be cloned by individual calls to `wait_for_setup`.
/// 2. It transparently resolves immediately if `setup_stream` has already
/// been called, even if `wait_for_setup` has already been called.
///
/// This all could likely be replaced by a condvar, but sadly these are
/// unavailable in the present version of async-std.
struct DiagnosticsStreamerInner<'a> {
output_dir: Option<TargetSessionDirectory>,
setup_notifier: Option<oneshot::Sender<()>>,
setup_fut: Shared<LocalBoxFuture<'a, Result<(), oneshot::Canceled>>>,
current_file: Option<LogFile>,
max_file_size_bytes: usize,
max_session_size_bytes: usize,
max_num_sessions: usize,
read_stream: Arc<mpmc::Sender<LogEntry>>,
}
impl Clone for DiagnosticsStreamerInner<'_> {
fn clone(&self) -> Self {
let (tx, rx) = oneshot::channel::<()>();
Self {
output_dir: self.output_dir.clone(),
setup_notifier: Some(tx),
setup_fut: rx.boxed_local().shared(),
current_file: None,
max_file_size_bytes: self.max_file_size_bytes,
max_session_size_bytes: self.max_session_size_bytes,
max_num_sessions: self.max_num_sessions,
read_stream: self.read_stream.clone(),
}
}
}
impl DiagnosticsStreamerInner<'_> {
fn new() -> Self {
let (tx, rx) = oneshot::channel::<()>();
Self {
output_dir: None,
setup_notifier: Some(tx),
setup_fut: rx.boxed_local().shared(),
current_file: None,
max_file_size_bytes: 0,
max_session_size_bytes: 0,
max_num_sessions: 0,
read_stream: Arc::new(mpmc::Sender::with_buffer_size(READ_STREAM_BUFFER_SIZE)),
}
}
}
pub struct DiagnosticsStreamer<'a> {
inner: RwLock<DiagnosticsStreamerInner<'a>>,
}
impl Clone for DiagnosticsStreamer<'_> {
fn clone(&self) -> Self {
let inner = futures::executor::block_on(self.inner.read());
Self { inner: RwLock::new(inner.clone()) }
}
}
impl Default for DiagnosticsStreamer<'_> {
fn default() -> Self {
Self { inner: RwLock::new(DiagnosticsStreamerInner::new()) }
}
}
impl DiagnosticsStreamer<'_> {
pub async fn session_timestamp_nanos(&self) -> Option<i64> {
let inner = self.inner.read().await;
inner.output_dir.as_ref().map(|d| d.timestamp_nanos())
}
pub async fn list_sessions(
target_identifier: Option<String>,
) -> Result<HashMap<String, Vec<DiagnosticsStreamer<'static>>>> {
let targets = match target_identifier {
Some(s) => {
let target_dir = TargetLogDirectory::new(s.to_string()).await?;
if !target_dir.exists() {
return Ok(HashMap::default());
}
vec![target_dir]
}
None => TargetLogDirectory::all_targets().await?,
};
let mut result = HashMap::new();
for target_dir in targets.iter() {
let sessions = target_dir.list_sessions().await?;
let mut target_results = vec![];
for session in sessions.iter() {
let streamer = DiagnosticsStreamer::default();
streamer
.setup_stream(target_dir.target_identifier(), session.timestamp_nanos())
.await?;
target_results.push(streamer)
}
result.insert(target_dir.target_identifier(), target_results);
}
Ok(result)
}
}
#[async_trait(?Send)]
pub trait GenericDiagnosticsStreamer {
async fn setup_stream(
&self,
target_nodename: String,
session_timestamp_nanos: i64,
) -> Result<()>;
async fn append_logs(&self, entries: Vec<LogEntry>) -> Result<()>;
async fn read_most_recent_target_timestamp(&self) -> Result<Option<Timestamp>>;
async fn read_most_recent_entry_timestamp(&self) -> Result<Option<Timestamp>>;
async fn clean_sessions_for_target(&self) -> Result<()>;
async fn stream_entries(
&self,
stream_mode: StreamMode,
min_target_timestamp: Option<Timestamp>,
) -> Result<SessionStream>;
}
#[async_trait::async_trait(?Send)]
impl GenericDiagnosticsStreamer for DiagnosticsStreamer<'_> {
async fn setup_stream(
&self,
target_nodename: String,
session_timestamp_nanos: i64,
) -> Result<()> {
self.setup_stream_with_config(
TargetLogDirectory::new(target_nodename).await?,
session_timestamp_nanos,
get(MAX_LOG_SIZE_CONFIG).await?,
get(MAX_SESSION_SIZE_CONFIG).await?,
get(MAX_SESSIONS_CONFIG).await?,
)
.await
}
async fn append_logs(&self, raw_entries: Vec<LogEntry>) -> Result<()> {
let entries = raw_entries
.iter()
.filter_map(|entry| match serde_json::to_string(entry) {
Ok(s) => {
let mut owned = s.clone();
owned.push('\n');
Some(owned.into_bytes())
}
Err(e) => {
log::warn!("failed to serialize LogEntry: {:?}. Log was: {:?}", e, &entry);
None
}
})
.collect::<Vec<_>>();
let mut inner = self.inner.write().await;
let max_file_size = inner.max_file_size_bytes;
let parent = inner.output_dir.as_ref().context("no stream setup")?.clone();
let mut file = match inner.current_file.take() {
Some(f) => f,
None => {
// Continue appending to the latest file for this session if one exists.
let latest_file = parent.latest_file().await?;
match latest_file {
Some(f) => f,
None => parent.create_file().await?,
}
}
};
let mut buf_bytes = file.bytes_in_file().await?;
let mut buf: Vec<&Vec<u8>> = vec![];
for log_bytes in entries.as_slice() {
// We flush the buffer to disk if this log entry would push us over the max file size
// or if this log entry is, alone, larger the max file size (rather than discard the log entry)
if !buf.is_empty()
&& (buf_bytes + log_bytes.len() > max_file_size || log_bytes.len() > max_file_size)
{
file.write_entries(buf).await?;
buf_bytes = 0;
buf = vec![];
file = parent.create_file().await?;
}
buf_bytes += log_bytes.len();
buf.push(log_bytes);
}
if !buf.is_empty() {
// We don't need to create a new file here in the special case that
// the buffer exceeds the max file size, but hasn't been
// flushed yet (i.e. we got a single log entry > max_file_size)
if buf_bytes > max_file_size && file.bytes_in_file().await? != 0 {
file = parent.create_file().await?;
}
file.write_entries(buf).await?;
}
inner.current_file = Some(file);
for entry in raw_entries.iter() {
inner.read_stream.send(entry.clone()).await;
}
self.cleanup_logs(inner).await.unwrap_or_else(|e| log::warn!("log cleanup failed: {}", e));
Ok(())
}
/// Reads the most recent target-timestamp (a monotonic ID) from a target log
async fn read_most_recent_target_timestamp(&self) -> Result<Option<Timestamp>> {
let inner = self.inner.read().await;
let output_dir = inner.output_dir.as_ref().context("stream not setup")?;
let files = output_dir.sort_entries().await?;
for file in files.iter().rev() {
let entry = file
.stream_entries()
.await?
.filter_map(|l| l.ok())
.filter_map(|l| match l.data {
LogData::TargetLog(data) => Some(data.metadata.timestamp),
_ => None,
})
.last()
.await;
if entry.is_some() {
log::trace!("read of most recent target timestamps finished with entry found");
return Ok(entry.map(|t| t.into()));
}
}
log::trace!("read of most recent target timestamps finished with no entry found");
Ok(None)
}
/// Reads the entry timestamp of the most recent log (of any type)
async fn read_most_recent_entry_timestamp(&self) -> Result<Option<Timestamp>> {
log::trace!("beginning read of most recent entry timestamps");
let inner = self.inner.read().await;
let output_dir = inner.output_dir.as_ref().context("stream not setup")?;
let files = output_dir.sort_entries().await?;
for file in files.iter().rev() {
let entry = file
.stream_entries()
.await?
.filter_map(|l| l.ok())
.map(|l| l.timestamp)
.last()
.await;
if entry.is_some() {
log::trace!("read of most recent entry timestamps finished with entry found");
return Ok(entry);
}
}
log::trace!("read of most recent entry timestamps finished with no entry found");
Ok(None)
}
async fn clean_sessions_for_target(&self) -> Result<()> {
log::trace!("beginning to clean sessions for target.");
let inner = self.inner.read().await;
let result = inner
.output_dir
.as_ref()
.context("missing output directory")?
.parent()
.clean_sessions(inner.max_num_sessions)
.await;
log::trace!("clean sessions for target finished.");
result
}
async fn stream_entries(
&self,
stream_mode: StreamMode,
min_target_timestamp: Option<Timestamp>,
) -> Result<SessionStream> {
let ts = Some(self.read_most_recent_entry_timestamp().await?.unwrap_or(Timestamp::from(0)));
let (output_dir, read_stream) = {
let inner = self.inner.read().await;
(
inner.output_dir.as_ref().context("stream not setup")?.clone(),
inner.read_stream.clone(),
)
};
SessionStream::new(output_dir, min_target_timestamp, ts, stream_mode, read_stream).await
}
}
impl DiagnosticsStreamer<'_> {
// This should only be called by tests.
pub(crate) async fn setup_stream_with_config(
&self,
target_root_dir: TargetLogDirectory,
session_timestamp_nanos: i64,
max_file_size_bytes: usize,
max_session_size_bytes: usize,
max_num_sessions: usize,
) -> Result<()> {
// The ticks=>time conversion isn't accurate enough to use units smaller than milliseconds here.
let t = session_timestamp_nanos / 1_000_000;
let session_dir = target_root_dir.with_session(t as usize);
create_dir_all(session_dir.to_path_buf()).await.or_else(|e| match e.kind() {
ErrorKind::AlreadyExists => Ok(()),
_ => Err(e),
})?;
let mut sender;
{
let mut inner = self.inner.write().await;
inner.output_dir.replace(session_dir);
inner.max_file_size_bytes = max_file_size_bytes;
inner.max_session_size_bytes = max_session_size_bytes;
inner.max_num_sessions = max_num_sessions;
sender = inner.setup_notifier.take();
}
if let Some(s) = sender.take() {
let _ = s.send(());
}
Ok(())
}
/// Blocks until this Streamer has been setup (i.e. `setup_stream` has been called
/// at least once).
pub async fn wait_for_setup(&self) -> Result<()> {
let f = {
let inner = self.inner.read().await;
inner.setup_fut.clone()
};
f.await.map_err(|e| anyhow!(e))
}
async fn cleanup_logs<'a>(
&self,
inner: RwLockWriteGuard<'_, DiagnosticsStreamerInner<'a>>,
) -> Result<()> {
log::trace!("beginning logger clean up run");
let output_dir = inner.output_dir.as_ref().context("no stream setup")?;
let mut entries = output_dir.sort_entries().await?;
// We approximate the need to garbage collect by multiplying the number of files by
// the max file size, *excluding* the most recent file.
if entries.len() > 1
&& (entries.len() - 1) * inner.max_file_size_bytes > inner.max_session_size_bytes
{
let to_remove = entries.first_mut().unwrap();
log::info!("logger: garbage collecting log file: {:?}", to_remove);
to_remove.remove().await.context(format!("removing log dir {:?}", to_remove))?;
}
log::trace!("logger cleanup run finished.");
Ok(())
}
}
#[cfg(test)]
mod test {
use {
super::*,
diagnostics_data::LogsData,
ffx_log_data::LogData,
fuchsia_async::TimeoutExt,
std::collections::HashMap,
std::time::Duration,
tempfile::{tempdir, TempDir},
timeout::timeout,
};
const FAKE_DIR_NAME: &str = "fake_logs";
const SMALL_MAX_LOG_SIZE: usize = 10;
const LARGE_MAX_LOG_SIZE: usize = 1_000_000;
const SMALL_MAX_SESSION_SIZE: usize = 11;
const LARGE_MAX_SESSION_SIZE: usize = 1_000_001;
const DEFAULT_MAX_SESSIONS: usize = 1;
const NODENAME: &str = "my-cool-node";
const BOOT_TIME_NANOS: i64 = 123456789000000000;
const BOOT_TIME_MILLIS: u64 = 123456789000;
const TIMESTAMP: i64 = 987654321;
const READ_TIMEOUT_MILLIS: u64 = 500;
async fn collect_logs(path: PathBuf) -> Result<HashMap<String, String>> {
let mut result = HashMap::new();
let mut ent_stream = read_dir(path).await?;
while let Some(f) = ent_stream.next().await {
let f = f?;
result.insert(
String::from(f.file_name().to_string_lossy()),
std::fs::read_to_string(String::from(f.path().to_str().unwrap()))?,
);
}
Ok(result)
}
async fn collect_default_logs(dir_path: &PathBuf) -> Result<HashMap<String, String>> {
collect_default_logs_for_session(dir_path, BOOT_TIME_MILLIS).await
}
async fn collect_default_logs_for_session(
dir_path: &PathBuf,
boot_time_millis: u64,
) -> Result<HashMap<String, String>> {
let mut root = dir_path.clone();
root.push(FAKE_DIR_NAME);
root.push(NODENAME);
root.push(boot_time_millis.to_string());
collect_logs(root).await
}
fn make_target_log(ts: i64, msg: String) -> LogsData {
diagnostics_data::LogsDataBuilder::new(diagnostics_data::BuilderArgs {
timestamp_nanos: ts.into(),
component_url: Some(String::default()),
moniker: String::default(),
severity: diagnostics_data::Severity::Info,
})
.set_message(&msg)
.build()
}
fn make_malformed_log(ts: i64) -> LogEntry {
LogEntry {
data: LogData::MalformedTargetLog("fake log data".to_string()),
version: 1,
timestamp: ts.into(),
}
}
fn make_valid_log(ts: i64, msg: String) -> LogEntry {
LogEntry {
data: LogData::TargetLog(make_target_log(ts, msg)),
version: 1,
timestamp: Timestamp::from(ts),
}
}
async fn setup_default_streamer_with_temp_and_boot_time(
temp_parent: &TempDir,
boot_time_nanos: i64,
max_log_size: usize,
max_session_size: usize,
max_sessions: usize,
) -> Result<DiagnosticsStreamer<'static>> {
let mut root: PathBuf = temp_parent.path().to_path_buf().into();
root.push(FAKE_DIR_NAME.to_string());
let streamer = DiagnosticsStreamer::default();
let target_dir = TargetLogDirectory::new_with_root(root.clone(), NODENAME.to_string());
streamer
.setup_stream_with_config(
target_dir,
boot_time_nanos,
max_log_size,
max_session_size,
max_sessions,
)
.await?;
Ok(streamer)
}
async fn setup_default_streamer_with_temp(
temp_parent: &TempDir,
max_log_size: usize,
max_session_size: usize,
max_sessions: usize,
) -> Result<DiagnosticsStreamer<'static>> {
setup_default_streamer_with_temp_and_boot_time(
temp_parent,
BOOT_TIME_NANOS,
max_log_size,
max_session_size,
max_sessions,
)
.await
}
async fn setup_default_streamer(
max_log_size: usize,
max_session_size: usize,
max_sessions: usize,
) -> Result<(TempDir, DiagnosticsStreamer<'static>)> {
let temp_parent = tempdir()?;
let streamer = setup_default_streamer_with_temp(
&temp_parent,
max_log_size,
max_session_size,
max_sessions,
)
.await?;
Ok((temp_parent, streamer))
}
async fn verify_logs(mut reader: SessionStream, logs: Vec<LogEntry>) {
for log in logs.iter() {
let item = reader.iter().await.unwrap();
assert!(
item.is_some(),
"expected log {:?} has no corresponding entry in log file",
log
);
assert_eq!(item.unwrap().unwrap(), *log);
}
assert!(reader.iter().await.unwrap().is_none());
}
async fn verify_log(reader: &mut SessionStream, expected: LogEntry) -> Result<()> {
let item = reader.iter().await?.expect("missing log entry")?;
assert_eq!(item, expected);
Ok(())
}
async fn verify_times_out(reader: &mut SessionStream) {
let res = timeout(Duration::from_millis(READ_TIMEOUT_MILLIS), reader.iter()).await;
assert!(res.is_err(), "expected timeout, got {:?}", res);
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_single_log_exceeds_max_size() -> Result<()> {
let (temp, streamer) = setup_default_streamer(
SMALL_MAX_LOG_SIZE,
LARGE_MAX_SESSION_SIZE,
DEFAULT_MAX_SESSIONS,
)
.await?;
let root: PathBuf = temp.path().to_path_buf().into();
let log = make_valid_log(TIMESTAMP, "log".to_string());
streamer.append_logs(vec![log.clone()]).await?;
let results = collect_default_logs(&root).await.unwrap();
assert_eq!(results.len(), 1, "{:?}", results);
assert_eq!(
serde_json::from_str::<LogEntry>(results.values().next().unwrap()).unwrap(),
log
);
verify_logs(streamer.stream_entries(StreamMode::SnapshotAll, None).await?, vec![log]).await;
Ok(())
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_two_logs_exceeds_max_size() -> Result<()> {
let (temp, streamer) = setup_default_streamer(
SMALL_MAX_LOG_SIZE,
LARGE_MAX_SESSION_SIZE,
DEFAULT_MAX_SESSIONS,
)
.await?;
let root: PathBuf = temp.path().to_path_buf().into();
let log = make_valid_log(TIMESTAMP, "log".to_string());
let log2 = make_valid_log(TIMESTAMP, "log2".to_string());
streamer.append_logs(vec![log.clone(), log2.clone()]).await?;
let results = collect_default_logs(&root).await.unwrap();
assert_eq!(results.len(), 2, "{:?}", results);
let mut values = results.values().collect::<Vec<_>>();
values.sort();
assert_eq!(serde_json::from_str::<LogEntry>(values.get(0).unwrap()).unwrap(), log);
assert_eq!(serde_json::from_str::<LogEntry>(values.get(1).unwrap()).unwrap(), log2);
verify_logs(streamer.stream_entries(StreamMode::SnapshotAll, None).await?, vec![log, log2])
.await;
Ok(())
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_min_timestamp_across_chunks() -> Result<()> {
let (_temp, streamer) = setup_default_streamer(
SMALL_MAX_LOG_SIZE,
LARGE_MAX_SESSION_SIZE,
DEFAULT_MAX_SESSIONS,
)
.await?;
let log = make_valid_log(TIMESTAMP, "log".to_string());
let log2 = make_valid_log(TIMESTAMP + 1, "log2".to_string());
let log3 = make_valid_log(TIMESTAMP + 2, "log3".to_string());
streamer.append_logs(vec![log.clone(), log2.clone(), log3.clone()]).await?;
verify_logs(
streamer
.stream_entries(StreamMode::SnapshotAll, Some(Timestamp::from(TIMESTAMP + 1)))
.await?,
vec![log2, log3],
)
.await;
Ok(())
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_min_timestamp_single_chunks() -> Result<()> {
let (_temp, streamer) = setup_default_streamer(
LARGE_MAX_LOG_SIZE,
LARGE_MAX_SESSION_SIZE,
DEFAULT_MAX_SESSIONS,
)
.await?;
let log = make_valid_log(TIMESTAMP, "log".to_string());
let log2 = make_valid_log(TIMESTAMP + 1, "log2".to_string());
let log3 = make_valid_log(TIMESTAMP + 2, "log3".to_string());
streamer.append_logs(vec![log.clone(), log2.clone(), log3.clone()]).await?;
verify_logs(
streamer
.stream_entries(StreamMode::SnapshotAll, Some(Timestamp::from(TIMESTAMP + 1)))
.await?,
vec![log2, log3],
)
.await;
Ok(())
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_two_logs_with_two_calls_exceeds_max_size() -> Result<()> {
let (temp, streamer) = setup_default_streamer(
SMALL_MAX_LOG_SIZE,
SMALL_MAX_SESSION_SIZE,
DEFAULT_MAX_SESSIONS,
)
.await?;
let root: PathBuf = temp.path().to_path_buf().into();
let log = make_valid_log(TIMESTAMP, "log".to_string());
let log2 = make_valid_log(TIMESTAMP, "log2".to_string());
streamer.append_logs(vec![log.clone()]).await?;
streamer.append_logs(vec![log2.clone()]).await?;
let results = collect_default_logs(&root).await.unwrap();
assert_eq!(results.len(), 2, "{:?}", results);
let mut values = results.values().collect::<Vec<_>>();
values.sort();
assert_eq!(serde_json::from_str::<LogEntry>(values.get(0).unwrap()).unwrap(), log);
assert_eq!(serde_json::from_str::<LogEntry>(values.get(1).unwrap()).unwrap(), log2);
verify_logs(streamer.stream_entries(StreamMode::SnapshotAll, None).await?, vec![log, log2])
.await;
Ok(())
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_garbage_collection_doesnt_count_latest_file() -> Result<()> {
let (temp, streamer) =
setup_default_streamer(SMALL_MAX_LOG_SIZE, SMALL_MAX_LOG_SIZE, DEFAULT_MAX_SESSIONS)
.await?;
let root: PathBuf = temp.path().to_path_buf().into();
let log = make_valid_log(TIMESTAMP, "log".to_string());
let log2 = make_valid_log(TIMESTAMP, "log2".to_string());
streamer.append_logs(vec![log.clone()]).await?;
streamer.append_logs(vec![log2.clone()]).await?;
let results = collect_default_logs(&root).await.unwrap();
assert_eq!(results.len(), 2, "{:?}", results);
let mut values = results.values().collect::<Vec<_>>();
values.sort();
assert_eq!(serde_json::from_str::<LogEntry>(values.get(0).unwrap()).unwrap(), log);
assert_eq!(serde_json::from_str::<LogEntry>(values.get(1).unwrap()).unwrap(), log2);
verify_logs(streamer.stream_entries(StreamMode::SnapshotAll, None).await?, vec![log, log2])
.await;
Ok(())
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_two_logs_with_two_calls_exceeds_max_session_size() -> Result<()> {
let (temp, streamer) =
setup_default_streamer(SMALL_MAX_LOG_SIZE, SMALL_MAX_LOG_SIZE, DEFAULT_MAX_SESSIONS)
.await?;
let root: PathBuf = temp.path().to_path_buf().into();
let log = make_valid_log(TIMESTAMP, "log".to_string());
let log2 = make_valid_log(TIMESTAMP, "log2".to_string());
let log3 = make_valid_log(TIMESTAMP, "log3".to_string());
streamer.append_logs(vec![log.clone()]).await?;
streamer.append_logs(vec![log2.clone()]).await?;
// This call should delete the first log file.
streamer.append_logs(vec![log3.clone()]).await?;
let results = collect_default_logs(&root).await.unwrap();
assert_eq!(results.len(), 2, "{:?}", results);
let mut values = results.values().collect::<Vec<_>>();
values.sort();
assert_eq!(serde_json::from_str::<LogEntry>(values.get(0).unwrap()).unwrap(), log2);
assert_eq!(serde_json::from_str::<LogEntry>(values.get(1).unwrap()).unwrap(), log3);
verify_logs(
streamer.stream_entries(StreamMode::SnapshotAll, None).await?,
vec![log2, log3],
)
.await;
Ok(())
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_new_streamer_appends_to_existing_file() -> Result<()> {
let temp = tempdir()?;
let root: PathBuf = temp.path().to_path_buf().into();
let log = make_valid_log(TIMESTAMP, "log1".to_string());
let log2 = make_valid_log(TIMESTAMP + 1, "log2".to_string());
{
let streamer = setup_default_streamer_with_temp(
&temp,
LARGE_MAX_LOG_SIZE,
LARGE_MAX_SESSION_SIZE,
DEFAULT_MAX_SESSIONS,
)
.await
.unwrap();
streamer.append_logs(vec![log.clone()]).await.unwrap();
}
let streamer = setup_default_streamer_with_temp(
&temp,
LARGE_MAX_LOG_SIZE,
LARGE_MAX_SESSION_SIZE,
DEFAULT_MAX_SESSIONS,
)
.await
.unwrap();
streamer.append_logs(vec![log2.clone()]).await.unwrap();
let results = collect_default_logs(&root).await.unwrap();
assert_eq!(results.len(), 1, "{:?}", results);
let value = results.values().next().unwrap();
let results = value.split("\n").collect::<Vec<_>>();
assert_eq!(results.len(), 3, "{:?}", results);
assert_eq!(serde_json::from_str::<LogEntry>(results.get(0).unwrap()).unwrap(), log);
assert_eq!(serde_json::from_str::<LogEntry>(results.get(1).unwrap()).unwrap(), log2);
assert!(results.get(2).unwrap().is_empty());
verify_logs(streamer.stream_entries(StreamMode::SnapshotAll, None).await?, vec![log, log2])
.await;
Ok(())
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_snapshot_does_not_include_later_writes() -> Result<()> {
let (_temp, streamer) = setup_default_streamer(
SMALL_MAX_LOG_SIZE,
LARGE_MAX_SESSION_SIZE,
DEFAULT_MAX_SESSIONS,
)
.await?;
let log = make_valid_log(TIMESTAMP, "log1".to_string());
streamer.append_logs(vec![log.clone()]).await?;
let iterator = streamer.stream_entries(StreamMode::SnapshotAll, None).await?;
let log2 = make_valid_log(TIMESTAMP + 1, "log2".to_string());
streamer.append_logs(vec![log2.clone()]).await?;
verify_logs(iterator, vec![log.clone()]).await;
verify_logs(streamer.stream_entries(StreamMode::SnapshotAll, None).await?, vec![log, log2])
.await;
Ok(())
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_snapshot_returns_nothing_if_no_logs_at_write_time() -> Result<()> {
let (_temp, streamer) = setup_default_streamer(
SMALL_MAX_LOG_SIZE,
LARGE_MAX_SESSION_SIZE,
DEFAULT_MAX_SESSIONS,
)
.await?;
let mut iterator = streamer.stream_entries(StreamMode::SnapshotAll, None).await?;
let log = make_valid_log(TIMESTAMP, "log1".to_string());
streamer.append_logs(vec![log.clone()]).await?;
assert!(iterator.iter().await?.is_none());
verify_logs(streamer.stream_entries(StreamMode::SnapshotAll, None).await?, vec![log]).await;
Ok(())
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_snapshot_subscribe_intermingled_writes_all_disk() -> Result<()> {
let (_temp, streamer) = setup_default_streamer(
LARGE_MAX_LOG_SIZE,
LARGE_MAX_SESSION_SIZE,
DEFAULT_MAX_SESSIONS,
)
.await?;
let log1 = make_valid_log(TIMESTAMP, "log1".to_string());
let log2 = make_valid_log(TIMESTAMP + 1, "log2".to_string());
let log3 = make_valid_log(TIMESTAMP + 2, "log3".to_string());
let log4 = make_valid_log(TIMESTAMP + 3, "log4".to_string());
streamer.append_logs(vec![log1.clone(), log2.clone()]).await?;
let mut iterator =
streamer.stream_entries(StreamMode::SnapshotAllThenSubscribe, None).await?;
verify_log(&mut iterator, log1.clone()).await.context(format!("{:?}", log1)).unwrap();
streamer.append_logs(vec![log3.clone()]).await?;
verify_log(&mut iterator, log2.clone()).await.context(format!("{:?}", log2)).unwrap();
verify_log(&mut iterator, log3.clone()).await.context(format!("{:?}", log3)).unwrap();
streamer.append_logs(vec![log4.clone()]).await?;
verify_log(&mut iterator, log4.clone()).await.context(format!("{:?}", log4)).unwrap();
verify_times_out(&mut iterator).await;
Ok(())
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_snapshot_subscribe_intermingled_writes() -> Result<()> {
let (_temp, streamer) = setup_default_streamer(
LARGE_MAX_LOG_SIZE,
LARGE_MAX_SESSION_SIZE,
DEFAULT_MAX_SESSIONS,
)
.await?;
let log1 = make_valid_log(TIMESTAMP, "log1".to_string());
let log2 = make_valid_log(TIMESTAMP + 1, "log2".to_string());
let log3 = make_valid_log(TIMESTAMP + 2, "log3".to_string());
let log4 = make_valid_log(TIMESTAMP + 3, "log4".to_string());
streamer.append_logs(vec![log1.clone(), log2.clone()]).await?;
let mut iterator =
streamer.stream_entries(StreamMode::SnapshotAllThenSubscribe, None).await?;
verify_log(&mut iterator, log1.clone()).await.context(format!("{:?}", log1)).unwrap();
verify_log(&mut iterator, log2.clone()).await.context(format!("{:?}", log2)).unwrap();
verify_times_out(&mut iterator).await;
streamer.append_logs(vec![log3.clone()]).await?;
verify_log(&mut iterator, log3.clone()).await.context(format!("{:?}", log3)).unwrap();
streamer.append_logs(vec![log4.clone()]).await?;
verify_log(&mut iterator, log4.clone()).await.context(format!("{:?}", log4)).unwrap();
verify_times_out(&mut iterator).await;
Ok(())
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_snapshot_recent_includes_only_recent() -> Result<()> {
let (_temp, streamer) = setup_default_streamer(
SMALL_MAX_LOG_SIZE,
LARGE_MAX_SESSION_SIZE,
DEFAULT_MAX_SESSIONS,
)
.await?;
let log = make_valid_log(TIMESTAMP, "should not be read".to_string());
let log2 = make_valid_log(TIMESTAMP, "log2".to_string());
let log3 = make_valid_log(TIMESTAMP, "log3".to_string());
let log4 = make_valid_log(TIMESTAMP, "log4".to_string());
let log5 = make_valid_log(TIMESTAMP, "log5".to_string());
let log6 = make_valid_log(TIMESTAMP, "log6".to_string());
streamer
.append_logs(vec![
log.clone(),
log.clone(),
log2.clone(),
log3.clone(),
log4.clone(),
log5.clone(),
])
.await?;
// SnapshotRecent will include only the most recent four chunks
let mut iterator =
streamer.stream_entries(StreamMode::SnapshotRecentThenSubscribe, None).await?;
verify_log(&mut iterator, log2.clone()).await.context(format!("{:?}", log2)).unwrap();
verify_log(&mut iterator, log3.clone()).await.context(format!("{:?}", log3)).unwrap();
verify_log(&mut iterator, log4.clone()).await.context(format!("{:?}", log4)).unwrap();
verify_log(&mut iterator, log5.clone()).await.context(format!("{:?}", log5)).unwrap();
verify_times_out(&mut iterator).await;
streamer.append_logs(vec![log6.clone()]).await?;
verify_log(&mut iterator, log6.clone()).await.context(format!("{:?}", log6)).unwrap();
verify_times_out(&mut iterator).await;
let mut iterator =
streamer.stream_entries(StreamMode::SnapshotRecentThenSubscribe, None).await?;
verify_log(&mut iterator, log3.clone()).await.context(format!("{:?}", log3)).unwrap();
verify_log(&mut iterator, log4.clone()).await.context(format!("{:?}", log4)).unwrap();
verify_log(&mut iterator, log5.clone()).await.context(format!("{:?}", log5)).unwrap();
verify_log(&mut iterator, log6.clone()).await.context(format!("{:?}", log6)).unwrap();
verify_times_out(&mut iterator).await;
Ok(())
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_subscribe_ignores_on_disk_logs() -> Result<()> {
let (_temp, streamer) = setup_default_streamer(
LARGE_MAX_LOG_SIZE,
LARGE_MAX_SESSION_SIZE,
DEFAULT_MAX_SESSIONS,
)
.await?;
let log1 = make_valid_log(TIMESTAMP, "log1".to_string());
let log2 = make_valid_log(TIMESTAMP + 1, "log2".to_string());
let log3 = make_valid_log(TIMESTAMP + 2, "log3".to_string());
let log4 = make_valid_log(TIMESTAMP + 3, "log4".to_string());
streamer.append_logs(vec![log1.clone(), log2.clone()]).await?;
let mut iterator = streamer.stream_entries(StreamMode::Subscribe, None).await?;
streamer.append_logs(vec![log3.clone()]).await?;
verify_log(&mut iterator, log3.clone()).await.context(format!("{:?}", log3)).unwrap();
streamer.append_logs(vec![log4.clone()]).await?;
verify_log(&mut iterator, log4.clone()).await.context(format!("{:?}", log4)).unwrap();
verify_times_out(&mut iterator).await;
Ok(())
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_write_no_setup_call_errors() -> Result<()> {
let streamer = DiagnosticsStreamer::default();
let log = make_malformed_log(TIMESTAMP);
assert!(streamer.append_logs(vec![log]).await.is_err());
Ok(())
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_read_no_setup_call_errors() -> Result<()> {
let streamer = DiagnosticsStreamer::default();
assert!(streamer.stream_entries(StreamMode::SnapshotAll, None).await.is_err());
Ok(())
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_wait_for_setup() -> Result<()> {
let streamer = Arc::new(DiagnosticsStreamer::default());
assert!(streamer.stream_entries(StreamMode::SnapshotAll, None).await.is_err());
let streamer_clone = streamer.clone();
let f = streamer_clone.wait_for_setup();
// Unfortunately we can't reliably assert that this future *never* finishes, but this
// check will at least show up as a flake.
match timeout(Duration::from_millis(100), f).await {
Ok(_) => panic!("wait_for_setup should not complete before setup is completed"),
Err(_) => {}
};
let f = streamer_clone.wait_for_setup();
let target_dir =
TargetLogDirectory::new_with_root(PathBuf::from("/tmp"), NODENAME.to_string());
streamer
.setup_stream_with_config(
target_dir,
BOOT_TIME_NANOS,
LARGE_MAX_LOG_SIZE,
LARGE_MAX_SESSION_SIZE,
DEFAULT_MAX_SESSIONS,
)
.await?;
f.on_timeout(Duration::from_millis(100), || {
panic!("wait_for_setup should have completed after setup is completed")
})
.await
.unwrap();
Ok(())
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_simple_read() -> Result<()> {
let (_temp, streamer) = setup_default_streamer(
SMALL_MAX_LOG_SIZE,
LARGE_MAX_SESSION_SIZE,
DEFAULT_MAX_SESSIONS,
)
.await?;
let early_log = make_valid_log(TIMESTAMP - 1, String::default());
let log = make_valid_log(TIMESTAMP, String::default());
streamer.append_logs(vec![early_log, log]).await?;
assert_eq!(streamer.read_most_recent_target_timestamp().await?.unwrap(), TIMESTAMP.into());
Ok(())
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_read_message_contains_newline() -> Result<()> {
let (_temp, streamer) = setup_default_streamer(
SMALL_MAX_LOG_SIZE,
LARGE_MAX_SESSION_SIZE,
DEFAULT_MAX_SESSIONS,
)
.await?;
let log = make_valid_log(TIMESTAMP, String::from("hello\nworld!"));
streamer.append_logs(vec![log.clone()]).await?;
let mut iterator = streamer.stream_entries(StreamMode::SnapshotAll, None).await?;
verify_log(&mut iterator, log.clone()).await.context(format!("{:?}", log)).unwrap();
Ok(())
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_read_timestamp_returns_none_if_no_logs() -> Result<()> {
let (_temp, streamer) = setup_default_streamer(
SMALL_MAX_LOG_SIZE,
LARGE_MAX_SESSION_SIZE,
DEFAULT_MAX_SESSIONS,
)
.await?;
assert!(streamer.read_most_recent_target_timestamp().await?.is_none());
Ok(())
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_cleans_old_sessions_ignoring_boot_time() -> Result<()> {
let temp = tempdir()?;
let root: PathBuf = temp.path().to_path_buf().into();
let oldest_log = make_valid_log(TIMESTAMP, "log".to_string());
let second_oldest_log = make_valid_log(TIMESTAMP + 1, "log2".to_string());
let newest_log = make_valid_log(TIMESTAMP + 2, "log3".to_string());
{
let streamer = setup_default_streamer_with_temp_and_boot_time(
&temp,
BOOT_TIME_NANOS,
LARGE_MAX_LOG_SIZE,
LARGE_MAX_SESSION_SIZE,
DEFAULT_MAX_SESSIONS,
)
.await
.unwrap();
streamer.append_logs(vec![oldest_log.clone()]).await.unwrap();
}
{
let streamer = setup_default_streamer_with_temp_and_boot_time(
&temp,
BOOT_TIME_NANOS - 2_000_000,
LARGE_MAX_LOG_SIZE,
LARGE_MAX_SESSION_SIZE,
DEFAULT_MAX_SESSIONS,
)
.await
.unwrap();
streamer.append_logs(vec![second_oldest_log.clone()]).await.unwrap();
}
let streamer = setup_default_streamer_with_temp_and_boot_time(
&temp,
BOOT_TIME_NANOS - 3_000_000,
LARGE_MAX_LOG_SIZE,
LARGE_MAX_SESSION_SIZE,
DEFAULT_MAX_SESSIONS,
)
.await
.unwrap();
streamer.append_logs(vec![newest_log.clone()]).await.unwrap();
streamer.clean_sessions_for_target().await?;
let mut session1 = root.clone();
session1.push(FAKE_DIR_NAME);
session1.push(NODENAME);
session1.push(BOOT_TIME_MILLIS.to_string());
assert!(!session1.exists());
let mut session2 = root.clone();
session2.push(FAKE_DIR_NAME);
session2.push(NODENAME);
session2.push((BOOT_TIME_MILLIS - 2).to_string());
assert!(!session2.exists());
let results = collect_default_logs_for_session(&root, BOOT_TIME_MILLIS - 3).await.unwrap();
assert_eq!(results.len(), 1, "{:?}", results);
let mut values = results.values().collect::<Vec<_>>();
values.sort();
assert_eq!(
serde_json::from_str::<LogEntry>(values.get(0).unwrap()).unwrap(),
newest_log,
"{:?}",
results
);
verify_logs(
streamer.stream_entries(StreamMode::SnapshotAll, None).await?,
vec![newest_log],
)
.await;
Ok(())
}
}