// Copyright 2023 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use anyhow::{ensure, Context as _};
use async_stream::stream;
use diagnostics_data::LogsData;
use ffx_config::TestEnv;
use ffx_isolate::Isolate;
use fuchsia_async::TimeoutExt;
use futures::channel::mpsc::TrySendError;
use futures::{Stream, StreamExt};
use serde::Deserialize;
use std::io::{BufRead, BufReader};
use std::process::{Command, Stdio};
use std::sync::Mutex;
use std::time::Duration;
use tempfile::TempDir;
use tracing::info;
struct EmuState {
emu: std::process::Child,
/// An isolated environment for testing ffx against a running emulator.
pub struct IsolatedEmulator {
emu_name: String,
ffx_isolate: Isolate,
emu_state: Mutex<Option<EmuState>>,
children: Mutex<Vec<std::process::Child>>,
// We need to hold the below variables but not interact with them.
_temp_dir: TempDir,
_test_env: TestEnv,
impl IsolatedEmulator {
/// Create an isolated ffx environment and start an emulator in it using the default product
/// bundle and package repository from the Fuchsia build directory. Streams logs in the
/// background and allows resolving packages from universe.
pub async fn start(name: &str) -> anyhow::Result<Self> {
let amber_files_path = std::env::var("PACKAGE_REPOSITORY_PATH")
.expect("PACKAGE_REPOSITORY_PATH env var must be set -- run this test with 'fx test'");
Self::start_internal(name, Some(&amber_files_path)).await
// This is private to be used for testing with a path to a different package repo. Path
// to amber-files is optional for testing to ensure that other successful tests are actually
// matching a developer workflow.
async fn start_internal(name: &str, amber_files_path: Option<&str>) -> anyhow::Result<Self> {
let emu_name = format!("{name}-emu");
info!(%name, "making ffx isolate");
let temp_dir = tempfile::TempDir::new().context("making temp dir")?;
let test_env = ffx_config::test_init().await.context("setting up ffx test config")?;
// Create paths to the files to hold the ssh key pair.
// The key is not generated here, since ffx will generate the
// key if it is missing when starting an emulator or flashing a device.
// If a private key is supplied, it is used, but the public key path
// is still in the temp dir.
let ssh_priv_key = temp_dir.path().join("ssh_private_key");
let ssh_pub_key = temp_dir.path().join("ssh_public_key");
let ffx_isolate = Isolate::new_in_test(name, ssh_priv_key.clone(), &test_env.context)
.context("creating ffx isolate")?;
let this = Self {
_temp_dir: temp_dir,
_test_env: test_env,
emu_state: Mutex::new(None),
children: Mutex::new(vec![]),
// now we have our isolate and can call ffx commands to configure our env and start an emu
this.ffx(&["config", "set", "ssh.priv", &ssh_priv_key.to_string_lossy()])
.context("setting ssh private key config")?;
this.ffx(&["config", "set", "", &ssh_pub_key.to_string_lossy()])
.context("setting ssh public key config")?;
this.ffx(&["config", "set", "log.level", "debug"])
.context("setting ffx log level")?;
// Use the daemon based repo server. This will probably need to become a parameter.
this.ffx(&["config", "set", "repository.server.enabled", "true"])
.context("setting ffx log level")?;
// TODO(slgrady) remove once we have debugged the flake in which the ssh
// connection is never made
this.ffx(&["config", "set", "daemon.host_pipe_ssh_timeout", "110"])
.context("setting ffx daemon ssh timeout")?;
info!("starting emulator {}", this.emu_name);
let emulator_log = this.ffx_isolate.log_dir().join("emulator.log").display().to_string();
let product_bundle_path = std::env::var("PRODUCT_BUNDLE_PATH")
.expect("PRODUCT_BUNDLE_PATH env var must be set -- run this test with 'fx test'");
let mut emulator_cmd = this
// TODO(slgrady) remove once we have debugged the flake in which the
// ssh connection is never made
.context("creating emulator command")?;
let emu = emulator_cmd.spawn().context("spawning emulator command")?;
this.ffx(&["target", "wait"])
.on_timeout(Duration::from_secs(120), || anyhow::bail!("emulator never started"))
*this.emu_state.lock().unwrap() = Some(EmuState { emu });
info!("streaming system logs to output directory");
let mut system_logs_command = this
.ffx_cmd(&this.make_args(&["log", "--severity", "TRACE", "--no-color"]))
.context("creating log streaming command")?;
let emulator_system_log =
.context("creating system log file")?;
// ffx log prints lots of warnings about symbolization
.push(system_logs_command.spawn().context("spawning log streaming command")?);
// serve packages by creating a repository and a server, then registering the server
if let Some(amber_files_path) = amber_files_path {
this.ffx(&["repository", "add-from-pm", &amber_files_path])
.context("adding repository from build dir")?;
// ask the kernel to give us a random unused port
.context("starting repository server")?;
this.ffx(&["target", "repository", "register", "--alias", ""])
.context("registering repository")?;
fn make_args<'a>(&'a self, args: &[&'a str]) -> Vec<&str> {
let mut prefixed = vec!["--target", &self.emu_name];
/// Run an ffx command, logging stdout & stderr as INFO messages.
pub async fn ffx(&self, args: &[&str]) -> anyhow::Result<()> {
info!("running `ffx {args:?}`");
let output = self.ffx_isolate.ffx(&self.make_args(args)).await.context("running ffx")?;
if !output.stdout.is_empty() {
info!("stdout:\n{}", output.stdout);
if !output.stderr.is_empty() {
info!("stderr:\n{}", output.stderr);
ensure!(output.status.success(), "ffx must complete successfully");
/// Run an ffx command, returning stdout and logging stderr as an INFO message.
pub async fn ffx_output(&self, args: &[&str]) -> anyhow::Result<String> {
info!("running `ffx {args:?}`");
let output = self.ffx_isolate.ffx(&self.make_args(args)).await.context("running ffx")?;
if !output.stderr.is_empty() {
info!("stderr:\n{}", output.stderr);
"ffx must complete successfully. stdout: {}",
/// Create an ffx command, which allows for streaming stdout/stderr.
pub async fn ffx_cmd_capture(&self, args: &[&str]) -> anyhow::Result<Command> {
let mut cmd =
self.ffx_isolate.ffx_cmd(&self.make_args(args)).await.context("running ffx")?;
fn make_ssh_args<'a>(command: &[&'a str]) -> Vec<&'a str> {
let mut args = vec!["target", "ssh", "--"];
/// Run an ssh command, logging stdout & stderr as INFO messages.
pub async fn ssh(&self, command: &[&str]) -> anyhow::Result<()> {
/// Run an ssh command, returning stdout and logging stderr as an INFO message.
pub async fn ssh_output(&self, command: &[&str]) -> anyhow::Result<String> {
async fn log_stream(
mut receiver: futures::channel::mpsc::UnboundedReceiver<String>,
reader_task: fuchsia_async::Task<Result<(), TrySendError<String>>>,
) -> impl Stream<Item = anyhow::Result<LogsData>> {
/// ffx log wraps each line from archivist in its own JSON object, unwrap those here
struct FfxMachineLogLine {
data: FfxTargetLog,
struct FfxTargetLog {
#[serde(rename = "TargetLog")]
target_log: LogsData,
stream! {
while let Some(line) = {
if line.is_empty() {
let ffx_message = serde_json::from_str::<FfxMachineLogLine>(&line)
.context("parsing log line from ffx")?;
yield Ok(;
/// Collect the logs for a particular component.
pub async fn log_stream_for_moniker(
moniker: &str,
) -> anyhow::Result<impl Stream<Item = anyhow::Result<LogsData>>> {
let mut output = self
.ffx_cmd_capture(&["--machine", "json", "log", "--moniker", moniker])
.context("running ffx log")?;
let mut child = output.spawn()?;
let stdout = child.stdout.take().context("no stdout")?;
let mut reader = BufReader::new(stdout);
let (sender, receiver) = futures::channel::mpsc::unbounded();
let reader_task = fuchsia_async::Task::local(fuchsia_async::unblock(move || {
let mut output = String::new();
while let Ok(_) = reader.read_line(&mut output) {
output = String::new();
Result::<(), TrySendError<String>>::Ok(())
Ok(self.log_stream(receiver, reader_task).await)
/// Collect the logs for a particular component.
pub async fn logs_for_moniker(&self, moniker: &str) -> anyhow::Result<Vec<LogsData>> {
/// ffx log wraps each line from archivist in its own JSON object, unwrap those here
struct FfxMachineLogLine {
data: FfxTargetLog,
struct FfxTargetLog {
#[serde(rename = "TargetLog")]
target_log: LogsData,
let output = self
.ffx_output(&["--machine", "json", "log", "--moniker", moniker, "dump"])
.context("running ffx log")?;
let mut parsed = vec![];
for line in output.lines() {
if line.is_empty() {
let ffx_message = serde_json::from_str::<FfxMachineLogLine>(line)
.context("parsing log line from ffx")?;
// TODO(slgrady): remove when some variation of fxr/907483 gets added
pub async fn stop(&self) {
self.ffx(&["emu", "stop", &self.emu_name]).await.expect("emu stop failed");
let mut emu = self.emu_state.lock().unwrap();
if let Some(ref mut c) = emu.as_mut() {
impl Drop for IsolatedEmulator {
fn drop(&mut self) {
if !self.children.lock().unwrap().is_empty() {
// allow children to clean up, including streaming a few logs out
let mut children = self.children.lock().unwrap();
for child in children.iter_mut() {
let mut emu = self.emu_state.lock().unwrap();
if let Some(ref mut c) = emu.take() {
"Tearing down isolated emulator instance. Logs are in {}.",
mod tests {
use super::*;
use std::pin::pin;
async fn public_apis_succeed() {
// TODO(slgrady) change back to start() when we have debugged the flake in which the ssh
// connection is never made
let amber_files_path = std::env::var("PACKAGE_REPOSITORY_PATH")
.expect("PACKAGE_REPOSITORY_PATH env var must be set -- run this test with 'fx test'");
let emu = IsolatedEmulator::start_internal("e2e_emu_public_apis", Some(&amber_files_path))
.expect("Couldn't start emulator");
info!("Checking target monotonic time to ensure we can connect and get stdout");
let time = emu.ffx_output(&["target", "get-time"]).await.unwrap();
time.trim().parse::<u64>().expect("should have gotten a timestamp back");
info!("Checking that the emulator instance writes a system log.");
let system_log_path = emu.ffx_isolate.log_dir().join("system.log");
loop {
let contents = std::fs::read_to_string(&system_log_path).unwrap();
if !contents.is_empty() {
info!("Checking that we can read streaming logs.");
let mut remote_control_logs =
info!("Checking that we can read RCS' logs.");
let remote_control_logs = emu.logs_for_moniker("/core/remote-control").await.unwrap();
assert_eq!(remote_control_logs.is_empty(), false);
async fn resolve_package_from_server() {
let test_amber_files_path = std::env::var("TEST_PACKAGE_REPOSITORY_PATH").expect(
"TEST_PACKAGE_REPOSITORY_PATH env var must be set -- run this test with 'fx test'",
let test_package_name = std::env::var("TEST_PACKAGE_NAME")
.expect("TEST_PACKAGE_NAME env var must be set -- run this test with 'fx test'");
let test_package_url = format!("fuchsia-pkg://{test_package_name}");
let emu = IsolatedEmulator::start_internal("pkg_resolve", Some(&test_amber_files_path))
emu.ssh(&["pkgctl", "resolve", &test_package_url]).await.unwrap();
/// This ensures the above test is actually resolving the package from the package server by
/// demonstrating that the same package is unavailable when there's no server running.
async fn fail_to_resolve_package_when_no_package_server_running() {
let emu = IsolatedEmulator::start_internal("pkg_resolve_fail", None).await.unwrap();
let test_package_name = std::env::var("TEST_PACKAGE_NAME")
.expect("TEST_PACKAGE_NAME env var must be set -- run this test with 'fx test'");
let test_package_url = format!("fuchsia-pkg://{test_package_name}");
emu.ssh(&["pkgctl", "resolve", &test_package_url]).await.unwrap_err();