| // Copyright 2019 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| use { |
| analytics::{add_crash_event, get_notice}, |
| anyhow::{anyhow, Context, Result}, |
| async_once::Once, |
| async_trait::async_trait, |
| ffx_core::metrics::{add_fx_launch_event, init_metrics_svc}, |
| ffx_core::{ffx_bail, ffx_error, FfxError, Injector}, |
| ffx_daemon::{get_daemon_proxy_single_link, is_daemon_running}, |
| ffx_lib_args::{from_env, Ffx}, |
| ffx_lib_sub_command::Subcommand, |
| fidl::endpoints::create_proxy, |
| fidl_fuchsia_developer_bridge::{DaemonError, DaemonProxy, FastbootMarker, FastbootProxy}, |
| fidl_fuchsia_developer_remotecontrol::{RemoteControlMarker, RemoteControlProxy}, |
| fuchsia_async::TimeoutExt, |
| futures::{Future, FutureExt}, |
| ring::digest::{Context as ShaContext, Digest, SHA256}, |
| std::default::Default, |
| std::error::Error, |
| std::fs::File, |
| std::io::{BufReader, Read}, |
| std::time::{Duration, Instant}, |
| }; |
| |
| // Config key for event timeout. |
| const PROXY_TIMEOUT_SECS: &str = "proxy.timeout_secs"; |
| |
| // TODO(72818): improve error text for these cases |
| const TARGET_AMBIGUOUS_MSG: &str = "\ |
| We found multiple target devices matching your request. |
| |
| If a target matcher was given with --target, or a default target match is |
| set, we may have found multiple targets that match. If no target matcher was |
| specified, we may simply have found more than one potential target. |
| |
| Use `ffx target list` to list the currently visible targets. |
| |
| Use `ffx --target <matcher>` to specify a matcher for the execution of a |
| single command, or `ffx target default set <matcher>` to set the default |
| matcher."; |
| |
| // TODO(72818): improve error text for these cases |
| const TARGET_NOT_FOUND_MSG: &str = "\ |
| We weren't able to find a target matching your request. |
| |
| Use `ffx target list` to verify the state of connected devices, or use `ffx |
| --target <matcher>` to specify a different target for your request. To set |
| the default target to be used in requests without an explicit matcher, use |
| `ffx target default set <matcher>`."; |
| |
| // TODO(72818): improve error text for these cases |
| const TARGET_FAILURE_MSG: &str = "\ |
| We weren't able to open a connection to a target device. |
| |
| Use `ffx target list` to verify the state of connected devices. This error |
| probably means that either: |
| |
| 1) There are no available targets. Make sure your device is connected. |
| 2) There are multiple available targets and you haven't specified a target or |
| provided a default. |
| Tip: You can use `ffx --target \"my-nodename\" <command>` to specify a target |
| for a particular command, or use `ffx target default set \"my-nodename\"` if |
| you always want to use a particular target."; |
| |
| const CURRENT_EXE_HASH: &str = "current.hash"; |
| |
| // TODO(72818): improve error text for these cases |
| const NON_FASTBOOT_MSG: &str = "\ |
| This command needs to be run against a target in the Fastboot state. |
| Try rebooting the device into Fastboot with the command `ffx target |
| reboot --bootloader` and try re-running this command."; |
| |
| // TODO(72818): improve error text for these cases |
| const TARGET_IN_FASTBOOT: &str = "\ |
| This command cannot be run against a target in the Fastboot state. Try |
| rebooting the device or flashing the device into a running state."; |
| |
| const DAEMON_CONNECTION_ISSUE: &str = "\ |
| Timed out waiting on the Daemon.\nRun `ffx doctor` for further diagnostics"; |
| |
| struct Injection { |
| daemon_once: Once<DaemonProxy>, |
| } |
| |
| impl Default for Injection { |
| fn default() -> Self { |
| Self { daemon_once: Once::new() } |
| } |
| } |
| |
| #[async_trait] |
| impl Injector for Injection { |
| // This could get called multiple times by the plugin system via multiple threads - so make sure |
| // the spawning only happens one thread at a time. |
| async fn daemon_factory(&self) -> Result<DaemonProxy> { |
| self.daemon_once.get_or_try_init(init_daemon_proxy()).await.map(|proxy| proxy.clone()) |
| } |
| |
| async fn fastboot_factory(&self) -> Result<FastbootProxy> { |
| let daemon_proxy = self.daemon_factory().await?; |
| let (fastboot_proxy, fastboot_server_end) = create_proxy::<FastbootMarker>()?; |
| let app: Ffx = argh::from_env(); |
| let result = timeout( |
| proxy_timeout().await?, |
| daemon_proxy.get_fastboot( |
| app.target().await?.as_ref().map(|s| s.as_str()), |
| fastboot_server_end, |
| ), |
| ) |
| .await |
| .context("timeout")? |
| .context("connecting to Fastboot")?; |
| |
| match result { |
| Ok(_) => Ok(fastboot_proxy), |
| Err(DaemonError::NonFastbootDevice) => Err(ffx_error!(NON_FASTBOOT_MSG).into()), |
| Err(DaemonError::TargetAmbiguous) => Err(ffx_error!(TARGET_AMBIGUOUS_MSG).into()), |
| Err(DaemonError::TargetNotFound) => Err(ffx_error!(TARGET_NOT_FOUND_MSG).into()), |
| Err(DaemonError::TargetCacheError) => Err(ffx_error!(TARGET_FAILURE_MSG).into()), |
| Err(e) => Err(anyhow!("unexpected failure connecting to Fastboot: {:?}", e)), |
| } |
| } |
| |
| async fn remote_factory(&self) -> Result<RemoteControlProxy> { |
| let daemon_proxy = self.daemon_factory().await?; |
| let (remote_proxy, remote_server_end) = create_proxy::<RemoteControlMarker>()?; |
| let app: Ffx = argh::from_env(); |
| |
| let result = timeout( |
| proxy_timeout().await?, |
| daemon_proxy.get_remote_control( |
| app.target().await?.as_ref().map(|s| s.as_str()), |
| remote_server_end, |
| ), |
| ) |
| .await |
| .context("timeout")? |
| .context("connecting to target via daemon")?; |
| |
| match result { |
| Ok(_) => Ok(remote_proxy), |
| Err(DaemonError::TargetAmbiguous) => Err(ffx_error!(TARGET_AMBIGUOUS_MSG).into()), |
| Err(DaemonError::TargetNotFound) => Err(ffx_error!(TARGET_NOT_FOUND_MSG).into()), |
| Err(DaemonError::TargetCacheError) => Err(ffx_error!(TARGET_FAILURE_MSG).into()), |
| Err(DaemonError::TargetInFastboot) => Err(ffx_error!(TARGET_IN_FASTBOOT).into()), |
| Err(e) => Err(anyhow!("unexpected failure connecting to RCS: {:?}", e)), |
| } |
| } |
| |
| async fn is_experiment(&self, key: &str) -> bool { |
| ffx_config::get(key).await.unwrap_or(false) |
| } |
| } |
| |
| async fn init_daemon_proxy() -> Result<DaemonProxy> { |
| if !is_daemon_running().await { |
| #[cfg(not(test))] |
| ffx_daemon::spawn_daemon().await?; |
| } |
| |
| let (nodeid, proxy, link) = get_daemon_proxy_single_link(None).await?; |
| |
| // Spawn off the link task, so that FIDL functions can be called (link IO makes progress). |
| let link_task = fuchsia_async::Task::spawn(link.map(|_| ())); |
| |
| // TODO(fxb/67400) Create an e2e test. |
| #[cfg(test)] |
| let hash: String = "testcurrenthash".to_owned(); |
| #[cfg(not(test))] |
| let hash: String = |
| match ffx_config::get((CURRENT_EXE_HASH, ffx_config::ConfigLevel::Runtime)).await { |
| Ok(str) => str, |
| Err(err) => { |
| log::error!("BUG: ffx version information is missing! {:?}", err); |
| link_task.detach(); |
| return Ok(proxy); |
| } |
| }; |
| |
| let daemon_hash = timeout(proxy_timeout().await?, proxy.get_hash()) |
| .await |
| .context("timeout") |
| .map_err(|_| ffx_error!("{}", DAEMON_CONNECTION_ISSUE))? |
| .context("Getting hash from daemon")?; |
| if hash == daemon_hash { |
| link_task.detach(); |
| return Ok(proxy); |
| } |
| |
| log::info!("Daemon is a different version. Attempting to restart"); |
| |
| // Tell the daemon to quit, and wait for the link task to finish. |
| // TODO(raggi): add a timeout on this, if the daemon quit fails for some |
| // reason, the link task would hang indefinitely. |
| let (quit_result, _) = futures::future::join(proxy.quit(), link_task).await; |
| |
| if !quit_result.is_ok() { |
| ffx_bail!( |
| "FFX daemon upgrade failed unexpectedly with {:?}. \n\ |
| Try running `ffx doctor --force-daemon-restart` and then retrying your \ |
| command", |
| quit_result |
| ) |
| } |
| |
| #[cfg(not(test))] |
| ffx_daemon::spawn_daemon().await?; |
| |
| let (_nodeid, proxy, link) = get_daemon_proxy_single_link(Some(vec![nodeid])).await?; |
| |
| fuchsia_async::Task::spawn(link.map(|_| ())).detach(); |
| |
| Ok(proxy) |
| } |
| |
| async fn proxy_timeout() -> Result<Duration> { |
| let proxy_timeout: f64 = ffx_config::get(PROXY_TIMEOUT_SECS).await?; |
| Ok(Duration::from_millis((proxy_timeout * 1000.0) as u64)) |
| } |
| |
| #[derive(Debug)] |
| struct TimeoutError {} |
| impl std::fmt::Display for TimeoutError { |
| fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |
| write!(f, "timed out") |
| } |
| } |
| impl Error for TimeoutError {} |
| |
| async fn timeout<F, T>(t: Duration, f: F) -> Result<T, TimeoutError> |
| where |
| F: Future<Output = T> + Unpin, |
| { |
| // TODO(raggi): this could be made more efficient (avoiding the box) with some additional work, |
| // but for the local use cases here it's not sufficiently important. |
| let mut timer = fuchsia_async::Timer::new(t).boxed().fuse(); |
| let mut f = f.fuse(); |
| |
| futures::select! { |
| _ = timer => Err(TimeoutError{}), |
| res = f => Ok(res), |
| } |
| } |
| |
| fn is_daemon(subcommand: &Option<Subcommand>) -> bool { |
| if let Some(Subcommand::FfxDaemonPlugin(ffx_daemon_plugin_args::DaemonCommand { |
| subcommand: ffx_daemon_plugin_sub_command::Subcommand::FfxDaemonStart(_), |
| })) = subcommand |
| { |
| return true; |
| } |
| false |
| } |
| |
| fn set_hash_config(overrides: Option<String>) -> Result<Option<String>> { |
| let input = std::env::current_exe()?; |
| let reader = BufReader::new(File::open(input)?); |
| let digest = sha256_digest(reader)?; |
| |
| let runtime = format!("{}={}", CURRENT_EXE_HASH, hex::encode(digest.as_ref())); |
| match overrides { |
| Some(s) => { |
| if s.is_empty() { |
| Ok(Some(runtime)) |
| } else { |
| let new_overrides = format!("{},{}", s, runtime); |
| Ok(Some(new_overrides)) |
| } |
| } |
| None => Ok(Some(runtime)), |
| } |
| } |
| |
| fn sha256_digest<R: Read>(mut reader: R) -> Result<Digest> { |
| let mut context = ShaContext::new(&SHA256); |
| let mut buffer = [0; 1024]; |
| |
| loop { |
| let count = reader.read(&mut buffer)?; |
| if count == 0 { |
| break; |
| } |
| context.update(&buffer[..count]); |
| } |
| |
| Ok(context.finish()) |
| } |
| |
| async fn run() -> Result<()> { |
| hoist::disable_autoconnect(); |
| let app: Ffx = from_env(); |
| |
| // Configuration initialization must happen before ANY calls to the config (or the cache won't |
| // properly have the runtime parameters. |
| let overrides = set_hash_config(app.runtime_config_overrides())?; |
| ffx_config::init_config(&*app.config, &overrides, &app.env)?; |
| let log_to_stdio = app.verbose || is_daemon(&app.subcommand); |
| ffx_config::logging::init(log_to_stdio).await?; |
| |
| log::info!("starting command: {:?}", std::env::args().collect::<Vec<String>>()); |
| |
| // HACK(64402): hoist uses a lazy static initializer obfuscating access to inject |
| // this value by other means, so: |
| let _ = ffx_config::get("overnet.socket").await.map(|sockpath: String| { |
| std::env::set_var("ASCENDD", sockpath); |
| }); |
| |
| init_metrics_svc().await; // one time call to initialize app analytics |
| if let Some(note) = get_notice().await { |
| eprintln!("{}", note); |
| } |
| |
| let analytics_start = Instant::now(); |
| |
| let analytics_task = fuchsia_async::Task::spawn(async { |
| if let Err(e) = add_fx_launch_event().await { |
| log::error!("metrics submission failed: {}", e); |
| } |
| Instant::now() |
| }); |
| |
| let command_start = Instant::now(); |
| let res = ffx_lib_suite::ffx_plugin_impl(Injection::default(), app).await; |
| let command_done = Instant::now(); |
| log::info!("Command completed. Success: {}", res.is_ok()); |
| |
| let analytics_done = analytics_task |
| // TODO(66918): make configurable, and evaluate chosen time value. |
| .on_timeout(Duration::from_secs(2), || { |
| log::error!("metrics submission timed out"); |
| // Metrics timeouts should not impact user flows. |
| Instant::now() |
| }) |
| .await; |
| |
| log::info!( |
| "Run finished. success: {}, command time: {}, analytics time: {}", |
| res.is_ok(), |
| (command_done - command_start).as_secs_f32(), |
| (analytics_done - analytics_start).as_secs_f32() |
| ); |
| res |
| } |
| |
| #[fuchsia_async::run_singlethreaded] |
| async fn main() { |
| match run().await { |
| Ok(_) => { |
| // TODO add event for timing here at end |
| std::process::exit(0) |
| } |
| Err(err) => { |
| let error_code = if let Some(ffx_err) = err.downcast_ref::<FfxError>() { |
| eprintln!("{}", ffx_err); |
| match ffx_err { |
| FfxError::Error(_, code) => *code, |
| } |
| } else { |
| eprintln!("BUG: An internal command error occurred.\n{:?}", err); |
| 1 |
| }; |
| let err_msg = format!("{}", err); |
| // TODO(66918): make configurable, and evaluate chosen time value. |
| if let Err(e) = add_crash_event(&err_msg) |
| .on_timeout(Duration::from_secs(2), || { |
| log::error!("analytics timed out reporting crash event"); |
| Ok(()) |
| }) |
| .await |
| { |
| log::error!("analytics failed to submit crash event: {}", e); |
| } |
| std::process::exit(error_code); |
| } |
| } |
| } |
| |
| #[cfg(test)] |
| mod test { |
| use super::*; |
| use ascendd; |
| use async_lock::Mutex; |
| use async_net::unix::UnixListener; |
| use fidl::endpoints::{ClientEnd, RequestStream, ServiceMarker}; |
| use fidl_fuchsia_developer_bridge::{DaemonMarker, DaemonRequest, DaemonRequestStream}; |
| use fidl_fuchsia_overnet::{ServiceProviderRequest, ServiceProviderRequestStream}; |
| use fuchsia_async::Task; |
| use futures::AsyncReadExt; |
| use futures::TryStreamExt; |
| use hoist::OvernetInstance; |
| use std::path::PathBuf; |
| use std::sync::Arc; |
| use tempfile; |
| |
| fn setup_ascendd_temp() -> tempfile::TempPath { |
| let path = tempfile::NamedTempFile::new().unwrap().into_temp_path(); |
| std::fs::remove_file(&path).unwrap(); |
| std::env::set_var("ASCENDD", &path); |
| path |
| } |
| |
| #[fuchsia_async::run_singlethreaded(test)] |
| async fn test_init_daemon_proxy_link_lost() { |
| let sockpath = setup_ascendd_temp(); |
| |
| // Start a listener that accepts and immediately closes the socket.. |
| let listener = UnixListener::bind(sockpath.to_owned()).unwrap(); |
| let _listen_task = Task::spawn(async move { |
| loop { |
| drop(listener.accept().await.unwrap()); |
| } |
| }); |
| |
| let res = init_daemon_proxy().await; |
| let str = format!("{}", res.err().unwrap()); |
| assert!(str.contains("link lost")); |
| assert!(str.contains("ffx doctor")); |
| } |
| |
| #[fuchsia_async::run_singlethreaded(test)] |
| async fn test_init_daemon_proxy_timeout_no_connection() { |
| let sockpath = setup_ascendd_temp(); |
| |
| // Start a listener that never accepts the socket. |
| let _listener = UnixListener::bind(sockpath.to_owned()).unwrap(); |
| |
| let res = init_daemon_proxy().await; |
| let str = format!("{}", res.err().unwrap()); |
| assert!(str.contains("Timed out")); |
| assert!(str.contains("ffx doctor")); |
| } |
| |
| async fn test_daemon(sockpath: PathBuf, hash: &str) { |
| let daemon_hoist = Arc::new(hoist::Hoist::new().unwrap()); |
| |
| let (s, p) = fidl::Channel::create().unwrap(); |
| daemon_hoist.publish_service(DaemonMarker::NAME, ClientEnd::new(p)).unwrap(); |
| |
| let link_tasks = Arc::new(Mutex::new(Vec::<Task<()>>::new())); |
| let link_tasks1 = link_tasks.clone(); |
| |
| let listener = UnixListener::bind(sockpath.to_owned()).unwrap(); |
| let listen_task = Task::spawn(async move { |
| // let (sock, _addr) = listener.accept().await.unwrap(); |
| let mut stream = listener.incoming(); |
| while let Some(sock) = stream.try_next().await.unwrap_or(None) { |
| let hoist_clone = daemon_hoist.clone(); |
| link_tasks1.lock().await.push(Task::spawn(async move { |
| let (mut rx, mut tx) = sock.split(); |
| ascendd::run_stream( |
| hoist_clone.node(), |
| &mut rx, |
| &mut tx, |
| Some("fake daemon".to_string()), |
| None, |
| ) |
| .map(|r| eprintln!("link error: {:?}", r)) |
| .await; |
| })); |
| } |
| }); |
| |
| let mut stream = ServiceProviderRequestStream::from_channel( |
| fidl::AsyncChannel::from_channel(s).unwrap(), |
| ); |
| |
| while let Some(ServiceProviderRequest::ConnectToService { chan, .. }) = |
| stream.try_next().await.unwrap_or(None) |
| { |
| let link_tasks = link_tasks.clone(); |
| let mut stream = |
| DaemonRequestStream::from_channel(fidl::AsyncChannel::from_channel(chan).unwrap()); |
| while let Some(request) = stream.try_next().await.unwrap_or(None) { |
| match request { |
| DaemonRequest::GetHash { responder, .. } => responder.send(hash).unwrap(), |
| DaemonRequest::Quit { responder, .. } => { |
| std::fs::remove_file(sockpath).unwrap(); |
| listen_task.cancel().await; |
| responder.send(true).unwrap(); |
| // This is how long the daemon sleeps for, which |
| // is a workaround for the fact that we have no |
| // way to "flush" the response over overnet due |
| // to the constraints of mesh routing. |
| fuchsia_async::Timer::new(Duration::from_millis(20)).await; |
| link_tasks.lock().await.clear(); |
| return; |
| } |
| _ => { |
| panic!("unimplemented stub for request: {:?}", request); |
| } |
| } |
| } |
| } |
| } |
| |
| #[fuchsia_async::run_singlethreaded(test)] |
| async fn test_init_daemon_proxy_hash_matches() { |
| let sockpath = setup_ascendd_temp(); |
| |
| let sockpath1 = sockpath.to_owned(); |
| let daemons_task = Task::spawn(async move { |
| test_daemon(sockpath1.to_owned(), "testcurrenthash").await; |
| }); |
| |
| // wait until daemon binds the socket path |
| while std::fs::metadata(&sockpath).is_err() { |
| fuchsia_async::Timer::new(Duration::from_millis(20)).await |
| } |
| |
| let proxy = init_daemon_proxy().await.unwrap(); |
| proxy.quit().await.unwrap(); |
| daemons_task.await; |
| } |
| |
| #[fuchsia_async::run_singlethreaded(test)] |
| async fn test_init_daemon_proxy_upgrade() { |
| let sockpath = setup_ascendd_temp(); |
| |
| // Spawn two daemons, the first out of date, the second is up to date. |
| let sockpath1 = sockpath.to_owned(); |
| let daemons_task = Task::spawn(async move { |
| test_daemon(sockpath1.to_owned(), "oldhash").await; |
| // Note: testcurrenthash is explicitly expected by #cfg in get_daemon_proxy |
| test_daemon(sockpath1.to_owned(), "testcurrenthash").await; |
| }); |
| |
| // wait until daemon binds the socket path |
| while std::fs::metadata(&sockpath).is_err() { |
| fuchsia_async::Timer::new(Duration::from_millis(20)).await |
| } |
| |
| let proxy = init_daemon_proxy().await.unwrap(); |
| proxy.quit().await.unwrap(); |
| daemons_task.await; |
| } |
| } |