blob: d6ac1ea5db69e9bff2c9a9d872a5aa32e0b7b7e6 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
crate::update::{Config, EnvironmentConnector, RebootController, Updater},
anyhow::anyhow,
event_queue::{EventQueue, Notify},
fidl_fuchsia_update_installer::UpdateNotStartedReason,
fidl_fuchsia_update_installer_ext::State,
fuchsia_async as fasync, fuchsia_inspect as inspect,
fuchsia_syslog::{fx_log_err, fx_log_warn},
fuchsia_zircon as zx,
futures::{
channel::{mpsc, oneshot},
prelude::*,
select,
stream::FusedStream,
},
};
const INSPECT_STATUS_NODE_NAME: &str = "status";
/// Start a install manager task that:
/// * Runs an update attempt in a seperate task.
/// * Provides a control handle to forward FIDL requests to the update attempt task.
pub async fn start_install_manager<N, U, E>(
updater: U,
node: inspect::Node,
) -> (InstallManagerControlHandle<N>, impl Future<Output = ()>)
where
N: Notify<Event = State> + Send + 'static,
U: Updater,
E: EnvironmentConnector,
{
let (send, recv) = mpsc::channel(0);
(InstallManagerControlHandle(send), run::<N, U, E>(recv, updater, node))
}
/// The install manager task.
async fn run<N, U, E>(
mut recv: mpsc::Receiver<ControlRequest<N>>,
mut updater: U,
node: inspect::Node,
) where
N: Notify<Event = State> + Send + 'static,
U: Updater,
E: EnvironmentConnector,
{
// Set up event queue to keep track of all the Monitors.
let (monitor_queue_fut, mut monitor_queue) = EventQueue::new();
let eq_task = fasync::Task::spawn(monitor_queue_fut);
// Each iteration of this loop is one update attempt.
loop {
// There is no active update attempt, so let's wait for a start request.
let StartRequestData { config, monitor, reboot_controller, responder } =
match handle_idle_control_request(&mut recv).await {
Some(start_data) => start_data,
None => {
// Ensure all monitors get the remaining state updates.
drop(monitor_queue);
eq_task.await;
return;
}
};
let reboot_controller = reboot_controller.unwrap_or_else(RebootController::unblocked);
// We connect to FIDL services on each update attempt (rather than once at the
// beginning) to prevent stale connections.
let env = match E::connect() {
Ok(env) => env,
Err(e) => {
fx_log_err!("Error connecting to services: {:#}", anyhow!(e));
// This fails the update attempt because it drops the responder, which closes
// the zx channel that we got the start request from.
continue;
}
};
// Now we can actually start the task that manages the update attempt.
let update_url = &config.update_url.clone();
let should_write_recovery = config.should_write_recovery;
let (attempt_id, attempt_stream) = updater.update(config, env, reboot_controller).await;
futures::pin_mut!(attempt_stream);
// Set up inspect nodes.
let mut status_node = node.create_child(INSPECT_STATUS_NODE_NAME);
let _time_property =
node.create_int("start_timestamp_nanos", zx::Time::get_monotonic().into_nanos());
// Don't forget to add the first monitor to the queue and respond to StartUpdate :)
if let Err(e) = monitor_queue.add_client(monitor).await {
fx_log_warn!("error adding client to monitor queue: {:#}", anyhow!(e));
}
let _ = responder.send(Ok(attempt_id.clone()));
// For this update attempt, handle events both from the FIDL server and the update task.
loop {
// We use this enum to make the body of the select as short as possible. Otherwise,
// we'd need to set the crate's recursion_limit to be higher.
enum Op<N: Notify> {
Request(ControlRequest<N>),
Status(Option<State>),
};
let op = select! {
req = recv.select_next_some() => Op::Request(req),
state = attempt_stream.next() => Op::Status(state)
};
match op {
// We got a FIDL requests (via the mpsc::Receiver).
Op::Request(req) => {
handle_active_control_request(
req,
&mut monitor_queue,
&attempt_id,
&update_url,
should_write_recovery,
)
.await
}
// The update task has given us a progress update, so let's forward
// that to all the monitors.
Op::Status(Some(state)) => {
drop(status_node);
status_node = node.create_child(INSPECT_STATUS_NODE_NAME);
state.write_to_inspect(&status_node);
if let Err(e) = monitor_queue.queue_event(state).await {
fx_log_warn!("error sending state to monitor_queue: {:#}", anyhow!(e));
}
}
// The update task tells us the update is over, so let's notify all monitors.
Op::Status(None) => {
drop(status_node);
if let Err(e) = monitor_queue.clear().await {
fx_log_warn!("error clearing clients of monitor_queue: {:#}", anyhow!(e));
}
break;
}
}
}
}
}
/// Returns when we get a Start control request (i.e. a StartUpdate FIDL request forwarded
/// from the FIDL server).
async fn handle_idle_control_request<N>(
recv: &mut mpsc::Receiver<ControlRequest<N>>,
) -> Option<StartRequestData<N>>
where
N: Notify,
{
// If the stream of control requests terminated while an update attempt was running,
// this stream has already yielded None, and so further calls to next() may panic.
// Instead, check if the stream is terminated via its FusedStream implementation
// before proceeding.
if recv.is_terminated() {
return None;
}
// Right now we are in a state where there is no update running.
while let Some(control_request) = recv.next().await {
match control_request {
ControlRequest::Start(start_data) => {
return Some(start_data);
}
ControlRequest::Monitor(MonitorRequestData { responder, .. }) => {
let _ = responder.send(false);
}
}
}
None
}
/// Handle the logic for the install manager task will do when receiving a control request
/// while an update is underway.
async fn handle_active_control_request<N>(
req: ControlRequest<N>,
monitor_queue: &mut event_queue::ControlHandle<N>,
attempt_id: &str,
update_url: &fuchsia_url::pkg_url::PkgUrl,
should_write_recovery: bool,
) where
N: Notify,
{
match req {
ControlRequest::Start(StartRequestData {
responder,
config,
monitor,
reboot_controller,
}) => {
// Only attach monitor if she's compatible with current update check.
// Note: We can only attach a reboot controller during the FIRST start request.
// Any subsequent request with a reboot controller should fail.
if reboot_controller.is_none()
&& config.allow_attach_to_existing_attempt
&& &config.update_url == update_url
&& config.should_write_recovery == should_write_recovery
{
if let Err(e) = monitor_queue.add_client(monitor).await {
fx_log_warn!("error adding client to monitor queue: {:#}", anyhow!(e));
}
let _ = responder.send(Ok(attempt_id.to_string()));
} else {
let _ = responder.send(Err(UpdateNotStartedReason::AlreadyInProgress));
}
}
ControlRequest::Monitor(MonitorRequestData { attempt_id: id, monitor, responder }) => {
// If an attempt ID is provided, ensure it matches the current attempt.
if let Some(id) = id {
if &id != attempt_id {
let _ = responder.send(false);
return;
}
}
if let Err(e) = monitor_queue.add_client(monitor).await {
fx_log_warn!("error adding client to monitor queue: {:#}", anyhow!(e));
}
let _ = responder.send(true);
}
}
}
/// A handle to forward installer FIDL requests to the install manager task.
#[derive(Clone)]
pub struct InstallManagerControlHandle<N>(mpsc::Sender<ControlRequest<N>>)
where
N: Notify;
/// Error indicating that the install manager task no longer exists.
#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)]
#[error("install manager dropped before all its control handles")]
pub struct InstallManagerGone;
impl From<mpsc::SendError> for InstallManagerGone {
fn from(_: mpsc::SendError) -> Self {
InstallManagerGone
}
}
impl From<oneshot::Canceled> for InstallManagerGone {
fn from(_: oneshot::Canceled) -> Self {
InstallManagerGone
}
}
/// This can be used to forward requests to the install manager task.
impl<N> InstallManagerControlHandle<N>
where
N: Notify,
{
/// Forward StartUpdate requests to the install manager task.
pub async fn start_update(
&mut self,
config: Config,
monitor: N,
reboot_controller: Option<RebootController>,
) -> Result<Result<String, UpdateNotStartedReason>, InstallManagerGone> {
let (responder, receive_response) = oneshot::channel();
self.0
.send(ControlRequest::Start(StartRequestData {
config,
monitor,
reboot_controller,
responder,
}))
.await?;
Ok(receive_response.await?)
}
/// Forward MonitorUpdate requests to the install manager task.
pub async fn monitor_update(
&mut self,
attempt_id: Option<String>,
monitor: N,
) -> Result<bool, InstallManagerGone> {
let (responder, receive_response) = oneshot::channel();
self.0
.send(ControlRequest::Monitor(MonitorRequestData { attempt_id, monitor, responder }))
.await?;
Ok(receive_response.await?)
}
}
/// Requests that can be forwarded to the sinstall manager task.
enum ControlRequest<N>
where
N: Notify,
{
Start(StartRequestData<N>),
Monitor(MonitorRequestData<N>),
}
struct StartRequestData<N>
where
N: Notify,
{
config: Config,
monitor: N,
reboot_controller: Option<RebootController>,
responder: oneshot::Sender<Result<String, UpdateNotStartedReason>>,
}
struct MonitorRequestData<N>
where
N: Notify,
{
attempt_id: Option<String>,
monitor: N,
responder: oneshot::Sender<bool>,
}
#[cfg(test)]
mod tests {
use {
super::*,
crate::update::{ConfigBuilder, Environment, NamespaceBuildInfo, NamespaceCobaltConnector},
async_trait::async_trait,
event_queue::ClosedClient,
fidl_fuchsia_hardware_power_statecontrol::AdminMarker as PowerStateControlMarker,
fidl_fuchsia_paver::{BootManagerMarker, DataSinkMarker},
fidl_fuchsia_pkg::{PackageCacheMarker, PackageResolverMarker},
fidl_fuchsia_space::ManagerMarker as SpaceManagerMarker,
fidl_fuchsia_update_installer_ext::{Progress, UpdateInfo, UpdateInfoAndProgress},
fuchsia_inspect::{assert_inspect_tree, testing::AnyProperty, Inspector},
mpsc::{Receiver, Sender},
parking_lot::Mutex,
std::sync::Arc,
};
const CALLBACK_CHANNEL_SIZE: usize = 20;
struct FakeStateNotifier {
sender: Arc<Mutex<Sender<State>>>,
}
impl FakeStateNotifier {
fn new_callback_and_receiver() -> (Self, Receiver<State>) {
let (sender, receiver) = mpsc::channel(CALLBACK_CHANNEL_SIZE);
(Self { sender: Arc::new(Mutex::new(sender)) }, receiver)
}
}
impl Notify for FakeStateNotifier {
type Event = State;
type NotifyFuture = future::Ready<Result<(), ClosedClient>>;
fn notify(&self, state: State) -> Self::NotifyFuture {
self.sender.lock().try_send(state).expect("FakeStateNotifier failed to send state");
future::ready(Ok(()))
}
}
struct StubEnvironmentConnector;
impl EnvironmentConnector for StubEnvironmentConnector {
fn connect() -> Result<Environment, anyhow::Error> {
let (data_sink, _) = fidl::endpoints::create_proxy::<DataSinkMarker>().unwrap();
let (boot_manager, _) = fidl::endpoints::create_proxy::<BootManagerMarker>().unwrap();
let (pkg_resolver, _) =
fidl::endpoints::create_proxy::<PackageResolverMarker>().unwrap();
let (pkg_cache, _) = fidl::endpoints::create_proxy::<PackageCacheMarker>().unwrap();
let (space_manager, _) = fidl::endpoints::create_proxy::<SpaceManagerMarker>().unwrap();
let (power_state_control, _) =
fidl::endpoints::create_proxy::<PowerStateControlMarker>().unwrap();
Ok(Environment {
data_sink,
boot_manager,
pkg_resolver,
pkg_cache,
space_manager,
power_state_control,
build_info: NamespaceBuildInfo,
cobalt_connector: NamespaceCobaltConnector,
pkgfs_system: None,
})
}
}
struct FakeUpdater(mpsc::Receiver<(String, mpsc::Receiver<State>)>);
impl FakeUpdater {
fn new(receiver: mpsc::Receiver<(String, mpsc::Receiver<State>)>) -> Self {
Self(receiver)
}
}
#[async_trait(?Send)]
impl Updater for FakeUpdater {
type UpdateStream = mpsc::Receiver<State>;
async fn update(
&mut self,
_config: Config,
_env: Environment,
_reboot_controller: RebootController,
) -> (String, Self::UpdateStream) {
self.0.next().await.unwrap()
}
}
async fn start_install_manager_with_update_id(
id: &str,
) -> (
InstallManagerControlHandle<FakeStateNotifier>,
fasync::Task<()>,
mpsc::Sender<(String, mpsc::Receiver<State>)>,
mpsc::Sender<State>,
) {
let inspector = Inspector::new();
let node = inspector.root().create_child("test_does_not_use_inspect");
start_install_manager_with_update_id_and_node(id, node).await
}
async fn start_install_manager_with_update_id_and_node(
id: &str,
node: inspect::Node,
) -> (
InstallManagerControlHandle<FakeStateNotifier>,
fasync::Task<()>,
mpsc::Sender<(String, mpsc::Receiver<State>)>,
mpsc::Sender<State>,
) {
// We use this channel to send the attempt id and state receiver to the update task, for
// each update attempt. This allows tests to control when an update attempt ends -- all they
// need to do is drop the state sender.
let (mut updater_sender, updater_receiver) = mpsc::channel(0);
let updater = FakeUpdater::new(updater_receiver);
let (state_sender, state_receiver) = mpsc::channel(0);
let (install_manager_ch, fut) =
start_install_manager::<FakeStateNotifier, FakeUpdater, StubEnvironmentConnector>(
updater, node,
)
.await;
let install_manager_task = fasync::Task::local(fut);
// We just use try_send because send calls are blocked on the receiver receiving the
// event... and the receiver won't receive the event until we do a start_update request.
updater_sender.try_send((id.to_string(), state_receiver)).unwrap();
(install_manager_ch, install_manager_task, updater_sender, state_sender)
}
#[fasync::run_singlethreaded(test)]
async fn monitor_update_fails_when_no_update_running() {
let (mut install_manager_ch, _install_manager_task, _updater_sender, _state_sender) =
start_install_manager_with_update_id("my-attempt").await;
let (notifier0, _state_receiver0) = FakeStateNotifier::new_callback_and_receiver();
let (notifier1, _state_receiver1) = FakeStateNotifier::new_callback_and_receiver();
assert_eq!(install_manager_ch.monitor_update(None, notifier0).await, Ok(false));
assert_eq!(
install_manager_ch.monitor_update(Some("my-attempt".to_string()), notifier1).await,
Ok(false)
);
}
#[fasync::run_singlethreaded(test)]
async fn monitor_update_fails_with_wrong_id() {
let (mut install_manager_ch, _install_manager_task, _updater_sender, _state_sender) =
start_install_manager_with_update_id("my-attempt").await;
let (notifier0, _state_receiver0) = FakeStateNotifier::new_callback_and_receiver();
let (notifier1, _state_receiver1) = FakeStateNotifier::new_callback_and_receiver();
assert_eq!(
install_manager_ch
.start_update(ConfigBuilder::new().build().unwrap(), notifier0, None)
.await,
Ok(Ok("my-attempt".to_string()))
);
assert_eq!(
install_manager_ch.monitor_update(Some("unknown id".to_string()), notifier1).await,
Ok(false)
);
}
#[fasync::run_singlethreaded(test)]
async fn monitor_connects_via_start_update() {
let (mut install_manager_ch, _install_manager_task, _updater_sender, mut state_sender) =
start_install_manager_with_update_id("my-attempt").await;
let (notifier, state_receiver) = FakeStateNotifier::new_callback_and_receiver();
assert_eq!(
install_manager_ch
.start_update(ConfigBuilder::new().build().unwrap(), notifier, None)
.await,
Ok(Ok("my-attempt".to_string()))
);
let () = state_sender.send(State::Prepare).await.unwrap();
let () = state_sender.send(State::FailPrepare).await.unwrap();
drop(state_sender);
assert_eq!(
state_receiver.collect::<Vec<State>>().await,
vec![State::Prepare, State::FailPrepare]
);
}
#[fasync::run_singlethreaded(test)]
async fn monitor_update_succeeds() {
let (mut install_manager_ch, _install_manager_task, _updater_sender, mut state_sender) =
start_install_manager_with_update_id("my-attempt").await;
let (notifier0, state_receiver0) = FakeStateNotifier::new_callback_and_receiver();
let (notifier1, state_receiver1) = FakeStateNotifier::new_callback_and_receiver();
let (notifier2, state_receiver2) = FakeStateNotifier::new_callback_and_receiver();
// Show we can successfuly add each notifier via either start_update or monitor_update.
assert_eq!(
install_manager_ch
.start_update(ConfigBuilder::new().build().unwrap(), notifier0, None)
.await,
Ok(Ok("my-attempt".to_string()))
);
assert_eq!(install_manager_ch.monitor_update(None, notifier1).await, Ok(true));
assert_eq!(
install_manager_ch.monitor_update(Some("my-attempt".to_string()), notifier2).await,
Ok(true)
);
// Send state updates to the update task.
let () = state_sender.send(State::Prepare).await.unwrap();
let () = state_sender.send(State::FailPrepare).await.unwrap();
drop(state_sender);
// Each monitor should get these events.
assert_eq!(
state_receiver0.collect::<Vec<State>>().await,
vec![State::Prepare, State::FailPrepare]
);
assert_eq!(
state_receiver1.collect::<Vec<State>>().await,
vec![State::Prepare, State::FailPrepare]
);
assert_eq!(
state_receiver2.collect::<Vec<State>>().await,
vec![State::Prepare, State::FailPrepare]
);
}
#[fasync::run_singlethreaded(test)]
async fn succeed_additional_start_requests_when_compatible() {
let (mut install_manager_ch, _install_manager_task, _updater_sender, mut state_sender) =
start_install_manager_with_update_id("my-attempt").await;
let (notifier0, state_receiver0) = FakeStateNotifier::new_callback_and_receiver();
let (notifier1, state_receiver1) = FakeStateNotifier::new_callback_and_receiver();
assert_eq!(
install_manager_ch
.start_update(ConfigBuilder::new().build().unwrap(), notifier0, None)
.await,
Ok(Ok("my-attempt".to_string()))
);
// The second start_update request is acceptable because the config is compatible.
assert_eq!(
install_manager_ch
.start_update(
ConfigBuilder::new().allow_attach_to_existing_attempt(true).build().unwrap(),
notifier1,
None
)
.await,
Ok(Ok("my-attempt".to_string()))
);
// Send state updates to the update task, then end the update by dropping the sender.
let () = state_sender.send(State::Prepare).await.unwrap();
let () = state_sender.send(State::FailPrepare).await.unwrap();
drop(state_sender);
// Each monitor should get these events.
assert_eq!(
state_receiver0.collect::<Vec<State>>().await,
vec![State::Prepare, State::FailPrepare]
);
assert_eq!(
state_receiver1.collect::<Vec<State>>().await,
vec![State::Prepare, State::FailPrepare]
);
}
#[fasync::run_singlethreaded(test)]
async fn fail_additional_start_requests_when_config_incompatible() {
let (mut install_manager_ch, _install_manager_task, mut updater_sender, state_sender0) =
start_install_manager_with_update_id("first-attempt-id").await;
let (notifier0, mut state_receiver0) = FakeStateNotifier::new_callback_and_receiver();
let (notifier1, _state_receiver1) = FakeStateNotifier::new_callback_and_receiver();
let (notifier2, _state_receiver2) = FakeStateNotifier::new_callback_and_receiver();
let (notifier3, _state_receiver3) = FakeStateNotifier::new_callback_and_receiver();
let (notifier4, _state_receiver4) = FakeStateNotifier::new_callback_and_receiver();
assert_eq!(
install_manager_ch
.start_update(ConfigBuilder::new().build().unwrap(), notifier0, None)
.await,
Ok(Ok("first-attempt-id".to_string()))
);
// Fails because allow_attach_to_existing_attempt is false.
assert_eq!(
install_manager_ch
.start_update(ConfigBuilder::new().build().unwrap(), notifier1, None)
.await,
Ok(Err(UpdateNotStartedReason::AlreadyInProgress))
);
// Fails because different update URL.
assert_eq!(
install_manager_ch
.start_update(
ConfigBuilder::new()
.update_url("fuchsia-pkg://fuchsia.com/different-url")
.allow_attach_to_existing_attempt(true)
.build()
.unwrap(),
notifier2,
None
)
.await,
Ok(Err(UpdateNotStartedReason::AlreadyInProgress))
);
// Fails because incompatible configs (i.e. should_write_recovery is different).
assert_eq!(
install_manager_ch
.start_update(
ConfigBuilder::new()
.allow_attach_to_existing_attempt(true)
.should_write_recovery(false)
.build()
.unwrap(),
notifier3,
None
)
.await,
Ok(Err(UpdateNotStartedReason::AlreadyInProgress))
);
// Fails because we can't attach reboot controller in second start request.
assert_eq!(
install_manager_ch
.start_update(
ConfigBuilder::new().allow_attach_to_existing_attempt(true).build().unwrap(),
notifier4,
Some(RebootController::unblocked()),
)
.await,
Ok(Err(UpdateNotStartedReason::AlreadyInProgress))
);
// End the current update attempt.
drop(state_sender0);
assert_eq!(state_receiver0.next().await, None);
// Set what update() should return in the second attempt.
let (_state_sender1, recv) = mpsc::channel(0);
updater_sender.try_send(("second-attempt-id".to_string(), recv)).unwrap();
// Now that there's no update in progress, start_update should work regardless of config.
let (notifier5, _state_receiver5) = FakeStateNotifier::new_callback_and_receiver();
assert_eq!(
install_manager_ch
.start_update(ConfigBuilder::new().build().unwrap(), notifier5, None)
.await,
Ok(Ok("second-attempt-id".to_string()))
);
}
#[fasync::run_singlethreaded(test)]
async fn update_attempt_finishes_after_dropping_control_handle() {
let (mut install_manager_ch, install_manager_task, _updater_sender, mut state_sender) =
start_install_manager_with_update_id("my-attempt").await;
let (notifier, state_receiver) = FakeStateNotifier::new_callback_and_receiver();
assert_eq!(
install_manager_ch
.start_update(ConfigBuilder::new().build().unwrap(), notifier, None)
.await,
Ok(Ok("my-attempt".to_string()))
);
// Close the channel that sends ControlRequests to the update manager task.
drop(install_manager_ch);
// Even though the ControlRequest channel was dropped, the update attempt should still
// be running it should be able to receive state events from the monitor stream.
let () = state_sender.send(State::Prepare).await.unwrap();
let () = state_sender.send(State::FailPrepare).await.unwrap();
// Even if we drop the sender (which ends the update attempt), the state receivers
// should still receive all the events we've sent up until this point.
drop(state_sender);
assert_eq!(
state_receiver.collect::<Vec<State>>().await,
vec![State::Prepare, State::FailPrepare]
);
// Ensures the update manager task stops after it sends the buffered state events to monitors.
install_manager_task.await;
}
#[fasync::run_singlethreaded(test)]
async fn writes_status_update_to_inspect() {
let inspector = Inspector::new();
let (mut install_manager_ch, _install_manager_task, _updater_sender, mut state_sender) =
start_install_manager_with_update_id_and_node(
"my-attempt",
inspector.root().create_child("current_attempt"),
)
.await;
let (notifier, mut state_receiver) = FakeStateNotifier::new_callback_and_receiver();
assert_eq!(
install_manager_ch
.start_update(ConfigBuilder::new().build().unwrap(), notifier, None)
.await,
Ok(Ok("my-attempt".to_string()))
);
let () = state_sender.send(State::Prepare).await.unwrap();
// Note for inspect tests: it is very important that we read the state from monitors
// to prevent race conditions. We can only guarantee inspect state is written once the
// status update is forwarded to monitors.
let _ = state_receiver.next().await;
assert_inspect_tree!(
inspector,
root: {
current_attempt: {
start_timestamp_nanos: AnyProperty,
status: {
state: "prepare"
}
}
}
);
}
#[fasync::run_singlethreaded(test)]
async fn writes_newest_status_update_to_inspect() {
let inspector = Inspector::new();
let (mut install_manager_ch, _install_manager_task, _updater_sender, mut state_sender) =
start_install_manager_with_update_id_and_node(
"my-attempt",
inspector.root().create_child("current_attempt"),
)
.await;
let (notifier, mut state_receiver) = FakeStateNotifier::new_callback_and_receiver();
assert_eq!(
install_manager_ch
.start_update(ConfigBuilder::new().build().unwrap(), notifier, None)
.await,
Ok(Ok("my-attempt".to_string()))
);
let () = state_sender.send(State::Prepare).await.unwrap();
let () = state_sender
.send(State::Fetch(
UpdateInfoAndProgress::builder()
.info(UpdateInfo::builder().download_size(100).build())
.progress(
Progress::builder().fraction_completed(0.5).bytes_downloaded(50).build(),
)
.build(),
))
.await
.unwrap();
let _ = state_receiver.next().await;
let _ = state_receiver.next().await;
assert_inspect_tree!(
inspector,
root: {
current_attempt: {
start_timestamp_nanos: AnyProperty,
status: {
state: "fetch",
info: {
download_size: 100u64,
},
progress: {
fraction_completed: 0.5,
bytes_downloaded: 50u64,
},
}
}
}
);
}
#[fasync::run_singlethreaded(test)]
async fn writes_status_update_to_inspect_on_second_attempt() {
let inspector = Inspector::new();
let (mut install_manager_ch, _install_manager_task, mut updater_sender, mut state_sender) =
start_install_manager_with_update_id_and_node(
"first-attempt-id",
inspector.root().create_child("current_attempt"),
)
.await;
let (notifier, mut state_receiver) = FakeStateNotifier::new_callback_and_receiver();
// Start first update attempt and show status node is populated.
assert_eq!(
install_manager_ch
.start_update(ConfigBuilder::new().build().unwrap(), notifier, None)
.await,
Ok(Ok("first-attempt-id".to_string()))
);
let () = state_sender.send(State::Prepare).await.unwrap();
let _ = state_receiver.next().await;
assert_inspect_tree!(
inspector,
root: {
current_attempt: {
start_timestamp_nanos: AnyProperty,
status: {
state: "prepare"
}
}
}
);
// End the first update attempt, show status node is removed.
drop(state_sender);
let _ = state_receiver.next().await;
assert_inspect_tree!(
inspector,
root: {
current_attempt: {}
}
);
// Start second update attempt and show status node is once again populated.
let (mut state_sender1, recv) = mpsc::channel(0);
updater_sender.try_send(("second-attempt-id".to_string(), recv)).unwrap();
let (notifier, mut state_receiver1) = FakeStateNotifier::new_callback_and_receiver();
assert_eq!(
install_manager_ch
.start_update(ConfigBuilder::new().build().unwrap(), notifier, None)
.await,
Ok(Ok("second-attempt-id".to_string()))
);
let () = state_sender1.send(State::FailPrepare).await.unwrap();
let _ = state_receiver1.next().await;
assert_inspect_tree!(
inspector,
root: {
current_attempt: {
start_timestamp_nanos: AnyProperty,
status: {
state: "fail_prepare"
}
}
}
);
}
#[fasync::run_singlethreaded(test)]
async fn writes_empty_attempt_node_if_no_attempt_running() {
let inspector = Inspector::new();
let (mut _install_manager_ch, _install_manager_task, _updater_sender, _state_sender) =
start_install_manager_with_update_id_and_node(
"my-attempt",
inspector.root().create_child("current_attempt"),
)
.await;
assert_inspect_tree!(
inspector,
root: {
current_attempt: {}
}
);
}
/// The update attempt has started (so we should have a status node), but
/// we haven't gotten a status update (so the said node should be empty).
#[fasync::run_singlethreaded(test)]
async fn writes_empty_status_node() {
let inspector = Inspector::new();
let (mut install_manager_ch, _install_manager_task, _updater_sender, _state_sender) =
start_install_manager_with_update_id_and_node(
"my-attempt",
inspector.root().create_child("current_attempt"),
)
.await;
let (notifier, _state_receiver) = FakeStateNotifier::new_callback_and_receiver();
assert_eq!(
install_manager_ch
.start_update(ConfigBuilder::new().build().unwrap(), notifier, None)
.await,
Ok(Ok("my-attempt".to_string()))
);
assert_inspect_tree!(
inspector,
root: {
current_attempt: {
start_timestamp_nanos: AnyProperty,
status: {},
}
}
);
}
}