blob: 9dd9258b8f0ce1b4773e35b4b7188c31ca2ec0c4 [file] [log] [blame]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use crate::test_topology;
use diagnostics_message::fx_log_packet_t;
use fidl_fuchsia_diagnostics as fdiagnostics;
use fidl_fuchsia_logger::{LogLevelFilter, LogMarker, LogMessage, LogSinkMarker};
use fuchsia_async as fasync;
use fuchsia_component_test::{Capability, ChildOptions, RealmInstance, Ref, Route};
use fuchsia_syslog_listener::run_log_listener_with_proxy;
use fuchsia_zircon as zx;
use futures::{channel::mpsc, FutureExt, StreamExt};
use std::sync::{Arc, Mutex};
#[fuchsia::test]
async fn timestamp_sorting_for_batches() {
let (builder, test_realm) = test_topology::create(test_topology::Options::default())
.await
.expect("create base topology");
// TODO(b/297211132): this is a workaround to a current bug due to which unattributed
// connections don't currently work. In an ideal world, we'd be able to connect ot
// fuchsia.logger.LogSink directly without requiring a local child.
let (connect_socket_snd, connect_socket_rcv) = mpsc::unbounded();
let rcv_arc = Arc::new(Mutex::new(Some(connect_socket_rcv)));
let logger = test_realm
.add_local_child(
"logger",
move |handles| {
// Note: ok to unwrap, for the test purposes, this will be called once.
let rcv = rcv_arc.clone();
async move {
let log_sink = handles
.connect_to_protocol::<LogSinkMarker>()
.expect("logsink is available");
// Unwrap is fine, we call this once.
let mut rcv = rcv.lock().unwrap().take().unwrap();
while let Some(socket) = rcv.next().await {
log_sink.connect(socket).unwrap();
}
Ok(())
}
.boxed()
},
ChildOptions::new().eager(),
)
.await
.expect("add logger");
test_realm
.add_route(
Route::new()
.capability(Capability::protocol_by_name("fuchsia.logger.LogSink"))
.from(Ref::child("archivist"))
.to(&logger),
)
.await
.expect("route logsink to logger");
let instance = builder.build().await.expect("create instance");
let message_times = [1_000, 5_000, 10_000, 15_000];
let hare_times = (0, 2);
let tort_times = (1, 3);
let packets = message_times
.iter()
.map(|t| {
let mut packet = fx_log_packet_t::default();
packet.metadata.time = *t;
packet.metadata.pid = 1000;
packet.metadata.tid = 2000;
packet.metadata.severity = LogLevelFilter::Info.into_primitive().into();
packet.add_data(1, "timing log".as_bytes());
packet
})
.collect::<Vec<_>>();
let messages = packets
.iter()
.map(|p| LogMessage {
severity: fdiagnostics::Severity::Info.into_primitive() as i32,
time: p.metadata.time,
dropped_logs: 0,
msg: "timing log".to_owned(),
tags: vec!["logger".into()],
pid: p.metadata.pid,
tid: p.metadata.tid,
})
.collect::<Vec<_>>();
{
// there are two writers in this test, a "tortoise" and a "hare"
// the hare's messages are always timestamped earlier but arrive later
let (send_tort, recv_tort) = zx::Socket::create_datagram();
let (send_hare, recv_hare) = zx::Socket::create_datagram();
// put a message in each socket
send_tort.write(packets[tort_times.0].as_bytes()).unwrap();
send_hare.write(packets[hare_times.0].as_bytes()).unwrap();
// connect to log_sink and make sure we have a clean slate
let mut early_listener = listen_to_archivist(&instance);
// connect the tortoise's socket
connect_socket_snd.unbounded_send(recv_tort).expect("send socket");
let tort_expected = messages[tort_times.0].clone();
let mut expected_dump = vec![tort_expected.clone()];
assert_eq!(early_listener.next().await.unwrap(), tort_expected);
assert_eq!(dump_from_archivist(&instance).await, expected_dump);
// connect hare's socket
connect_socket_snd.unbounded_send(recv_hare).expect("send socket");
let hare_expected = messages[hare_times.0].clone();
expected_dump.push(hare_expected.clone());
expected_dump.sort_by_key(|m| m.time);
assert_eq!(early_listener.next().await.unwrap(), hare_expected);
assert_eq!(dump_from_archivist(&instance).await, expected_dump);
// start a new listener and make sure it gets backlog reversed from early listener
let mut middle_listener = listen_to_archivist(&instance);
assert_eq!(middle_listener.next().await.unwrap(), hare_expected);
assert_eq!(middle_listener.next().await.unwrap(), tort_expected);
// send the second tortoise message and assert it's seen
send_tort.write(packets[tort_times.1].as_bytes()).unwrap();
let tort_expected2 = messages[tort_times.1].clone();
expected_dump.push(tort_expected2.clone());
expected_dump.sort_by_key(|m| m.time);
assert_eq!(early_listener.next().await.unwrap(), tort_expected2);
assert_eq!(middle_listener.next().await.unwrap(), tort_expected2);
assert_eq!(dump_from_archivist(&instance).await, expected_dump);
// send the second hare message and assert it's seen
send_tort.write(packets[hare_times.1].as_bytes()).unwrap();
let hare_expected2 = messages[hare_times.1].clone();
expected_dump.push(hare_expected2.clone());
expected_dump.sort_by_key(|m| m.time);
assert_eq!(early_listener.next().await.unwrap(), hare_expected2);
assert_eq!(middle_listener.next().await.unwrap(), hare_expected2);
assert_eq!(dump_from_archivist(&instance).await, expected_dump);
// listening after all messages were seen by archivist-for-embedding should be time-ordered
let mut final_listener = listen_to_archivist(&instance);
assert_eq!(final_listener.next().await.unwrap(), hare_expected);
assert_eq!(final_listener.next().await.unwrap(), tort_expected);
assert_eq!(final_listener.next().await.unwrap(), hare_expected2);
assert_eq!(final_listener.next().await.unwrap(), tort_expected2);
}
}
async fn dump_from_archivist(instance: &RealmInstance) -> Vec<LogMessage> {
let log_proxy = instance.root.connect_to_protocol_at_exposed_dir::<LogMarker>().unwrap();
let (send_logs, recv_logs) = mpsc::unbounded();
fasync::Task::spawn(async move {
run_log_listener_with_proxy(&log_proxy, send_logs, None, true, None).await.unwrap();
})
.detach();
recv_logs.collect::<Vec<_>>().await
}
fn listen_to_archivist(instance: &RealmInstance) -> mpsc::UnboundedReceiver<LogMessage> {
let log_proxy = instance.root.connect_to_protocol_at_exposed_dir::<LogMarker>().unwrap();
let (send_logs, recv_logs) = mpsc::unbounded();
fasync::Task::spawn(async move {
run_log_listener_with_proxy(&log_proxy, send_logs, None, false, None).await.unwrap();
})
.detach();
recv_logs
}