blob: c4cf0ca397bfe47d1357abdd86f06ba2f71c18e9 [file] [log] [blame]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use crate::test_topology;
use diagnostics_message::fx_log_packet_t;
use fidl_fuchsia_logger::{LogLevelFilter, LogMarker, LogMessage, LogSinkMarker};
use fuchsia_async as fasync;
use fuchsia_component_test::RealmInstance;
use fuchsia_syslog::levels::INFO;
use fuchsia_syslog_listener::{run_log_listener_with_proxy, LogProcessor};
use fuchsia_zircon as zx;
use futures::{channel::mpsc, StreamExt};
#[fuchsia::test]
async fn timestamp_sorting_for_batches() {
// launch archivist
let (builder, _test_realm) = test_topology::create(test_topology::Options::default())
.await
.expect("create base topology");
let instance = builder.build().await.expect("create instance");
let message_times = [1_000, 5_000, 10_000, 15_000];
let hare_times = (0, 2);
let tort_times = (1, 3);
let packets = message_times
.iter()
.map(|t| {
let mut packet = fx_log_packet_t::default();
packet.metadata.time = *t;
packet.metadata.pid = 1000;
packet.metadata.tid = 2000;
packet.metadata.severity = LogLevelFilter::Info.into_primitive().into();
packet.add_data(1, "timing log".as_bytes());
packet
})
.collect::<Vec<_>>();
let messages = packets
.iter()
.map(|p| LogMessage {
severity: i32::from(INFO),
time: p.metadata.time,
dropped_logs: 0,
msg: "timing log".to_owned(),
tags: vec![format!("realm_builder:{}", instance.root.child_name())],
pid: p.metadata.pid,
tid: p.metadata.tid,
})
.collect::<Vec<_>>();
{
// there are two writers in this test, a "tortoise" and a "hare"
// the hare's messages are always timestamped earlier but arrive later
let (send_tort, recv_tort) = zx::Socket::create(zx::SocketOpts::DATAGRAM).unwrap();
let (send_hare, recv_hare) = zx::Socket::create(zx::SocketOpts::DATAGRAM).unwrap();
// put a message in each socket
send_tort.write(packets[tort_times.0].as_bytes()).unwrap();
send_hare.write(packets[hare_times.0].as_bytes()).unwrap();
// connect to log_sink and make sure we have a clean slate
let mut early_listener = listen_to_archivist(&instance);
let log_sink = instance.root.connect_to_protocol_at_exposed_dir::<LogSinkMarker>().unwrap();
// connect the tortoise's socket
log_sink.connect(recv_tort).unwrap();
let tort_expected = messages[tort_times.0].clone();
let mut expected_dump = vec![tort_expected.clone()];
assert_eq!(&early_listener.next().await.unwrap(), &tort_expected);
assert_eq!(&dump_from_archivist(&instance).await, &expected_dump);
// connect hare's socket
log_sink.connect(recv_hare).unwrap();
let hare_expected = messages[hare_times.0].clone();
expected_dump.push(hare_expected.clone());
expected_dump.sort_by_key(|m| m.time);
assert_eq!(&early_listener.next().await.unwrap(), &hare_expected);
assert_eq!(&dump_from_archivist(&instance).await, &expected_dump);
// start a new listener and make sure it gets backlog reversed from early listener
let mut middle_listener = listen_to_archivist(&instance);
assert_eq!(&middle_listener.next().await.unwrap(), &hare_expected);
assert_eq!(&middle_listener.next().await.unwrap(), &tort_expected);
// send the second tortoise message and assert it's seen
send_tort.write(packets[tort_times.1].as_bytes()).unwrap();
let tort_expected2 = messages[tort_times.1].clone();
expected_dump.push(tort_expected2.clone());
expected_dump.sort_by_key(|m| m.time);
assert_eq!(&early_listener.next().await.unwrap(), &tort_expected2);
assert_eq!(&middle_listener.next().await.unwrap(), &tort_expected2);
assert_eq!(&dump_from_archivist(&instance).await, &expected_dump);
// send the second hare message and assert it's seen
send_tort.write(packets[hare_times.1].as_bytes()).unwrap();
let hare_expected2 = messages[hare_times.1].clone();
expected_dump.push(hare_expected2.clone());
expected_dump.sort_by_key(|m| m.time);
assert_eq!(&early_listener.next().await.unwrap(), &hare_expected2);
assert_eq!(&middle_listener.next().await.unwrap(), &hare_expected2);
assert_eq!(&dump_from_archivist(&instance).await, &expected_dump);
// listening after all messages were seen by archivist-for-embedding.cmx should be time-ordered
let mut final_listener = listen_to_archivist(&instance);
assert_eq!(&final_listener.next().await.unwrap(), &hare_expected);
assert_eq!(&final_listener.next().await.unwrap(), &tort_expected);
assert_eq!(&final_listener.next().await.unwrap(), &hare_expected2);
assert_eq!(&final_listener.next().await.unwrap(), &tort_expected2);
}
}
struct Listener {
send_logs: mpsc::UnboundedSender<LogMessage>,
}
impl LogProcessor for Listener {
fn log(&mut self, message: LogMessage) {
self.send_logs.unbounded_send(message).unwrap();
}
fn done(&mut self) {
panic!("this should not be called");
}
}
async fn dump_from_archivist(instance: &RealmInstance) -> Vec<LogMessage> {
let log_proxy = instance.root.connect_to_protocol_at_exposed_dir::<LogMarker>().unwrap();
let (send_logs, recv_logs) = mpsc::unbounded();
fasync::Task::spawn(async move {
run_log_listener_with_proxy(&log_proxy, send_logs, None, true, None).await.unwrap();
})
.detach();
recv_logs.collect::<Vec<_>>().await
}
fn listen_to_archivist(instance: &RealmInstance) -> mpsc::UnboundedReceiver<LogMessage> {
let log_proxy = instance.root.connect_to_protocol_at_exposed_dir::<LogMarker>().unwrap();
let (send_logs, recv_logs) = mpsc::unbounded();
fasync::Task::spawn(async move {
run_log_listener_with_proxy(&log_proxy, send_logs, None, false, None).await.unwrap();
})
.detach();
recv_logs
}