// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

use crate::apply::{apply_system_update, ApplyProgress, ApplyState};
use crate::channel::{CurrentChannelManager, TargetChannelManager};
use crate::check::{check_for_system_update, SystemUpdateStatus};
use crate::connect::ServiceConnect;
use crate::update_monitor::{StateNotifier, UpdateMonitor};
use crate::update_service::RealStateNotifier;
use anyhow::{anyhow, Context as _, Error};
use async_generator::GeneratorState;
use fidl_fuchsia_update::{
    CheckNotStartedReason, CommitStatusProviderMarker, InstallationDeferralReason,
};
use fidl_fuchsia_update_ext::{
    query_commit_status, CheckOptions, CommitStatus, Initiator, InstallationDeferredData,
    InstallationErrorData, InstallationProgress, InstallingData, State, UpdateInfo,
};
use fuchsia_async as fasync;
use fuchsia_component::client::connect_to_service;
use fuchsia_hash::Hash;
use fuchsia_inspect as finspect;
use fuchsia_syslog::{fx_log_err, fx_log_info};
use futures::{
    channel::{mpsc, oneshot},
    future::BoxFuture,
    pin_mut,
    prelude::*,
    select,
    stream::BoxStream,
};
use std::sync::Arc;

#[derive(Debug)]
pub struct UpdateManagerControlHandle<N>(mpsc::Sender<UpdateManagerRequest<N>>);

impl<N> UpdateManagerControlHandle<N>
where
    N: StateNotifier,
{
    /// Try to start an update with the given options and optional monitor, returning whether or
    /// not the attempt was started (or attached to, if the options allow it).
    pub async fn try_start_update(
        &mut self,
        options: CheckOptions,
        callback: Option<N>,
    ) -> Result<(), CheckNotStartedReason> {
        let (send, recv) = oneshot::channel();
        let () = self
            .0
            .send(UpdateManagerRequest::TryStartUpdate { options, callback, responder: send })
            .await
            .map_err(|_| CheckNotStartedReason::Internal)?;
        recv.await.map_err(|_| CheckNotStartedReason::Internal)?
    }

    #[cfg(test)]
    pub async fn get_state(&mut self) -> Option<State> {
        let (send, recv) = oneshot::channel();
        let () = self.0.send(UpdateManagerRequest::GetState { responder: send }).await.ok()?;
        recv.await.ok()?
    }

    #[cfg(test)]
    pub async fn get_last_known_update_package_hash(&mut self) -> Option<Hash> {
        let (send, recv) = oneshot::channel();
        let () = self
            .0
            .send(UpdateManagerRequest::GetLastKnownUpdatePackageHash { responder: send })
            .await
            .ok()?;
        recv.await.ok()?
    }
}

// Manually implement Clone as not all N impl Clone, so derive(Clone) won't always impl Clone.
// See https://github.com/rust-lang/rust/issues/26925 for more context.
impl<N> Clone for UpdateManagerControlHandle<N> {
    fn clone(&self) -> Self {
        Self(self.0.clone())
    }
}

#[derive(Debug)]
pub(crate) enum UpdateManagerRequest<N> {
    TryStartUpdate {
        options: CheckOptions,
        callback: Option<N>,
        responder: oneshot::Sender<Result<(), CheckNotStartedReason>>,
    },
    #[cfg_attr(not(test), allow(dead_code))]
    GetState { responder: oneshot::Sender<Option<State>> },
    #[cfg_attr(not(test), allow(dead_code))]
    GetLastKnownUpdatePackageHash { responder: oneshot::Sender<Option<Hash>> },
}

#[derive(Debug)]
enum StatusEvent {
    State(State),
    VersionAvailableKnown(String),
}

pub struct UpdateManager<T, Ch, C, A, N, Cq>
where
    T: TargetChannelUpdater,
    Ch: CurrentChannelUpdater,
    C: UpdateChecker,
    A: UpdateApplier,
    N: StateNotifier,
    Cq: CommitQuerier,
{
    monitor: UpdateMonitor<N>,
    updater: SystemInterface<T, Ch, C, A, Cq>,
}

struct SystemInterface<T, Ch, C, A, Cq>
where
    T: TargetChannelUpdater,
    Ch: CurrentChannelUpdater,
    C: UpdateChecker,
    A: UpdateApplier,
    Cq: CommitQuerier,
{
    target_channel_updater: Arc<T>,
    current_channel_updater: Arc<Ch>,
    update_checker: C,
    update_applier: A,
    last_known_update_package: Option<Hash>,
    commit_status: Option<CommitStatus>,
    commit_querier: Cq,
}

impl<T, Ch>
    UpdateManager<T, Ch, RealUpdateChecker, RealUpdateApplier, RealStateNotifier, RealCommitQuerier>
where
    T: TargetChannelUpdater,
    Ch: CurrentChannelUpdater,
{
    pub async fn new(
        target_channel_updater: Arc<T>,
        current_channel_updater: Arc<Ch>,
        node: finspect::Node,
    ) -> Self {
        let (fut, update_monitor) = UpdateMonitor::from_inspect_node(node);
        fasync::Task::spawn(fut).detach();
        Self {
            monitor: update_monitor,
            updater: SystemInterface::new(
                target_channel_updater,
                current_channel_updater,
                RealUpdateChecker,
                RealUpdateApplier,
                None,
                RealCommitQuerier,
                None,
            ),
        }
    }
}

impl<T, Ch, C, A, N, Cq> UpdateManager<T, Ch, C, A, N, Cq>
where
    T: TargetChannelUpdater,
    Ch: CurrentChannelUpdater,
    C: UpdateChecker,
    A: UpdateApplier,
    N: StateNotifier,
    Cq: CommitQuerier,
{
    #[cfg(test)]
    pub async fn from_checker_and_applier(
        target_channel_updater: Arc<T>,
        current_channel_updater: Arc<Ch>,
        update_checker: C,
        update_applier: A,
        commit_querier: Cq,
    ) -> Self {
        let (fut, update_monitor) = UpdateMonitor::new();
        fasync::Task::spawn(fut).detach();
        Self {
            monitor: update_monitor,
            updater: SystemInterface::new(
                target_channel_updater,
                current_channel_updater,
                update_checker,
                update_applier,
                None,
                commit_querier,
                None,
            ),
        }
    }

    #[cfg(test)]
    async fn from_checker_and_applier_with_commit_status(
        target_channel_updater: Arc<T>,
        current_channel_updater: Arc<Ch>,
        update_checker: C,
        update_applier: A,
        commit_querier: Cq,
        commit_status: Option<CommitStatus>,
    ) -> Self {
        let last_known_update_package = None;
        let (fut, update_monitor) = UpdateMonitor::new();
        fasync::Task::spawn(fut).detach();
        Self {
            monitor: update_monitor,
            updater: SystemInterface::new(
                target_channel_updater,
                current_channel_updater,
                update_checker,
                update_applier,
                last_known_update_package,
                commit_querier,
                commit_status,
            ),
        }
    }

    /// Builds and returns the update manager async task, along with a control handle to interact
    /// with the task. The returned future must be polled for the update manager task to make
    /// forward progress.
    pub fn start(self) -> (UpdateManagerControlHandle<N>, impl Future<Output = ()>) {
        let (send, recv) = mpsc::channel(0);
        (UpdateManagerControlHandle(send), self.run(recv))
    }

    #[cfg(test)]
    pub fn spawn(self) -> UpdateManagerControlHandle<N> {
        let (ctl, fut) = self.start();
        fasync::Task::spawn(fut).detach();
        ctl
    }

    async fn run(self, requests: mpsc::Receiver<UpdateManagerRequest<N>>) {
        let Self { mut monitor, mut updater } = self;
        pin_mut!(requests);

        loop {
            // Get the next request to start an update attempt, responding to other requests with
            // the appropriate defaults when no update attempt is in progress.
            let (options, callback) = loop {
                let request = match requests.next().await {
                    Some(request) => request,
                    None => return,
                };

                match request {
                    UpdateManagerRequest::TryStartUpdate { options, callback, responder } => {
                        let _ = responder.send(Ok(()));
                        break (options, callback);
                    }
                    UpdateManagerRequest::GetState { responder } => {
                        let _ = responder.send(None);
                    }
                    UpdateManagerRequest::GetLastKnownUpdatePackageHash { responder } => {
                        let _ = responder.send(updater.last_known_update_package);
                    }
                }
            };

            // Start the update check with the requested options, configuring a monitor if
            // requested.
            if let Some(callback) = callback {
                monitor.add_temporary_callback(callback).await;
            }

            // Used for testing: it's ok to be slightly stale.
            let last_known_update_package = updater.last_known_update_package;

            let update_check = async_generator::generate(|mut co| {
                let updater = &mut updater;
                async move { updater.do_system_update_check(&mut co, options.initiator).await }
            });
            pin_mut!(update_check);
            let mut current_state = None;

            // Run the update check, forwarding status updates to monitors, responding to requests
            // to monitor the attempt and blocking requests to start a new update attempt.
            let update_check_res = loop {
                enum Op<N> {
                    Request(UpdateManagerRequest<N>),
                    Status(StatusEvent),
                }
                let op = select! {
                    request = requests.select_next_some() => Op::Request(request),
                    status = update_check.select_next_some() => match status {
                        GeneratorState::Yielded(status) => Op::Status(status),
                        GeneratorState::Complete(res) => break res,
                    },
                };
                match op {
                    Op::Request(UpdateManagerRequest::TryStartUpdate {
                        options,
                        callback,
                        responder,
                    }) => {
                        let _ =
                            responder.send(if !options.allow_attaching_to_existing_update_check {
                                Err(CheckNotStartedReason::AlreadyInProgress)
                            } else {
                                if let Some(callback) = callback {
                                    monitor.add_temporary_callback(callback).await;
                                }
                                Ok(())
                            });
                    }
                    Op::Request(UpdateManagerRequest::GetState { responder }) => {
                        let _ = responder.send(current_state.clone());
                    }
                    Op::Request(UpdateManagerRequest::GetLastKnownUpdatePackageHash {
                        responder,
                    }) => {
                        let _ = responder.send(last_known_update_package.clone());
                    }
                    Op::Status(StatusEvent::State(state)) => {
                        current_state = Some(state.clone());
                        let should_flush = matches!(state, State::WaitingForReboot(_));
                        monitor.advance_update_state(state).await;
                        if should_flush {
                            monitor.try_flush().await;
                        }
                    }
                    Op::Status(StatusEvent::VersionAvailableKnown(version)) => {
                        monitor.set_version_available(version);
                    }
                }
            };

            // Log the result of the update check and reset the monitor queue/inspect state for the
            // attempt.
            match update_check_res {
                Ok(()) => {}
                Err(e) => {
                    fx_log_err!("update attempt failed: {:#}", anyhow!(e));
                }
            }
            monitor.clear().await;
        }
    }
}

impl<T, Ch, C, A, Cq> SystemInterface<T, Ch, C, A, Cq>
where
    T: TargetChannelUpdater,
    Ch: CurrentChannelUpdater,
    C: UpdateChecker,
    A: UpdateApplier,
    Cq: CommitQuerier,
{
    fn new(
        target_channel_updater: Arc<T>,
        current_channel_updater: Arc<Ch>,
        update_checker: C,
        update_applier: A,
        last_known_update_package: Option<Hash>,
        commit_querier: Cq,
        commit_status: Option<CommitStatus>,
    ) -> Self {
        Self {
            target_channel_updater,
            current_channel_updater,
            update_checker,
            update_applier,
            last_known_update_package,
            commit_querier,
            commit_status,
        }
    }

    async fn do_system_update_check(
        &mut self,
        co: &mut async_generator::Yield<StatusEvent>,
        initiator: Initiator,
    ) -> Result<(), Error> {
        co.yield_(StatusEvent::State(State::CheckingForUpdates)).await;
        fx_log_info!(
            "starting update check (requested by {})",
            match initiator {
                Initiator::Service => "service",
                Initiator::User => "user",
            }
        );

        self.target_channel_updater.update().await;

        match self
            .update_checker
            .check(self.last_known_update_package.as_ref())
            .await
            .context("check_for_system_update failed")
        {
            Err(e) => {
                co.yield_(StatusEvent::State(State::ErrorCheckingForUpdate)).await;
                return Err(e);
            }
            Ok(SystemUpdateStatus::UpToDate { system_image, update_package }) => {
                fx_log_info!("current system_image hash: {}", system_image);
                fx_log_info!("system_image is already up-to-date");

                self.last_known_update_package = Some(update_package);

                self.current_channel_updater.update().await;
                co.yield_(StatusEvent::State(State::NoUpdateAvailable)).await;

                return Ok(());
            }
            Ok(SystemUpdateStatus::UpdateAvailable {
                current_system_image,
                latest_system_image,
            }) => {
                fx_log_info!("current system_image hash: {}", current_system_image);
                fx_log_info!("new system_image available: {}", latest_system_image);
                let version_available = latest_system_image.to_string();

                let status = match self.commit_status {
                    Some(CommitStatus::Committed) => Ok(CommitStatus::Committed),
                    Some(CommitStatus::Pending) | None => self
                        .commit_querier
                        .query_commit_status()
                        .await
                        .context("while querying commit status"),
                };

                match status {
                    Ok(CommitStatus::Committed) => {
                        self.commit_status = Some(CommitStatus::Committed);
                    }
                    Ok(CommitStatus::Pending) => {
                        self.commit_status = Some(CommitStatus::Pending);
                        co.yield_(StatusEvent::State(State::InstallationDeferredByPolicy(
                            InstallationDeferredData {
                                update: Some(UpdateInfo {
                                    version_available: Some(version_available.clone()),
                                    download_size: None,
                                }),
                                deferral_reason: Some(
                                    InstallationDeferralReason::CurrentSystemNotCommitted,
                                ),
                            },
                        )))
                        .await;
                        return Ok(());
                    }
                    Err(e) => return Err(e),
                }

                {
                    co.yield_(StatusEvent::VersionAvailableKnown(version_available.clone())).await;
                    co.yield_(StatusEvent::State(State::InstallingUpdate(InstallingData {
                        update: Some(UpdateInfo {
                            version_available: Some(version_available.clone()),
                            download_size: None,
                        }),
                        installation_progress: None,
                    })))
                    .await;
                }

                match self
                    .update_applier
                    .apply(initiator)
                    .await
                    .context("apply_system_update failed")
                {
                    Ok(mut stream) => {
                        let mut waiting_for_reboot = false;
                        while let Some(result) = stream.next().await {
                            match result {
                                Ok(apply_state) => {
                                    let state = match apply_state {
                                        ApplyState::InstallingUpdate(ApplyProgress {
                                            download_size,
                                            fraction_completed,
                                        }) => State::InstallingUpdate(InstallingData {
                                            update: Some(UpdateInfo {
                                                version_available: Some(version_available.clone()),
                                                download_size,
                                            }),
                                            installation_progress: Some(InstallationProgress {
                                                fraction_completed,
                                            }),
                                        }),
                                        ApplyState::WaitingForReboot(ApplyProgress {
                                            download_size,
                                            fraction_completed,
                                        }) => {
                                            waiting_for_reboot = true;
                                            State::WaitingForReboot(InstallingData {
                                                update: Some(UpdateInfo {
                                                    version_available: Some(
                                                        version_available.clone(),
                                                    ),
                                                    download_size,
                                                }),
                                                installation_progress: Some(InstallationProgress {
                                                    fraction_completed,
                                                }),
                                            })
                                        }
                                    };
                                    co.yield_(StatusEvent::State(state)).await;
                                }
                                Err((ApplyProgress { download_size, fraction_completed }, e)) => {
                                    // If we failed to unblock reboot, it will ends up here and we
                                    // should not go back to InstallationError.
                                    if !waiting_for_reboot {
                                        co.yield_(StatusEvent::State(State::InstallationError(
                                            InstallationErrorData {
                                                update: Some(UpdateInfo {
                                                    version_available: Some(version_available),
                                                    download_size,
                                                }),
                                                installation_progress: Some(InstallationProgress {
                                                    fraction_completed,
                                                }),
                                            },
                                        )))
                                        .await;
                                    }
                                    return Err(e);
                                }
                            }
                        }
                    }
                    Err(e) => {
                        co.yield_(StatusEvent::State(State::InstallationError(
                            InstallationErrorData {
                                update: Some(UpdateInfo {
                                    version_available: Some(version_available),
                                    download_size: None,
                                }),
                                installation_progress: None,
                            },
                        )))
                        .await;
                        return Err(e);
                    }
                }
            }
        }
        Ok(())
    }
}

// For mocking
pub trait UpdateChecker: Send + Sync + 'static {
    fn check<'a>(
        &self,
        last_known_update_hash: Option<&'a Hash>,
    ) -> BoxFuture<'a, Result<SystemUpdateStatus, crate::errors::Error>>;
}

pub struct RealUpdateChecker;

impl UpdateChecker for RealUpdateChecker {
    fn check<'a>(
        &self,
        last_known_update_hash: Option<&'a Hash>,
    ) -> BoxFuture<'a, Result<SystemUpdateStatus, crate::errors::Error>> {
        check_for_system_update(last_known_update_hash).boxed()
    }
}

// For mocking
pub trait TargetChannelUpdater: Send + Sync + 'static {
    fn update(&self) -> BoxFuture<'_, ()>;
}

impl<S: ServiceConnect + 'static> TargetChannelUpdater for TargetChannelManager<S> {
    fn update(&self) -> BoxFuture<'_, ()> {
        TargetChannelManager::update(self)
            .unwrap_or_else(|e| fx_log_err!("while updating target channel: {:#}", anyhow!(e)))
            .boxed()
    }
}

// For mocking
pub trait CurrentChannelUpdater: Send + Sync + 'static {
    fn update(&self) -> BoxFuture<'_, ()>;
}

impl CurrentChannelUpdater for CurrentChannelManager {
    fn update(&self) -> BoxFuture<'_, ()> {
        CurrentChannelManager::update(self)
            .unwrap_or_else(|e| fx_log_err!("while updating current channel: {:#}", anyhow!(e)))
            .boxed()
    }
}

// For mocking
pub trait UpdateApplier: Send + Sync + 'static {
    fn apply<'a>(
        &self,
        initiator: Initiator,
    ) -> BoxFuture<
        'a,
        Result<BoxStream<'a, Result<ApplyState, (ApplyProgress, anyhow::Error)>>, anyhow::Error>,
    >;
}

pub struct RealUpdateApplier;

impl UpdateApplier for RealUpdateApplier {
    fn apply<'a>(
        &self,
        initiator: Initiator,
    ) -> BoxFuture<
        'a,
        Result<BoxStream<'a, Result<ApplyState, (ApplyProgress, anyhow::Error)>>, anyhow::Error>,
    > {
        apply_system_update(initiator).boxed()
    }
}

// For mocking.
pub trait CommitQuerier: Send + Sync + 'static {
    fn query_commit_status<'a>(&self) -> BoxFuture<'a, Result<CommitStatus, anyhow::Error>>;
}

pub struct RealCommitQuerier;

impl CommitQuerier for RealCommitQuerier {
    fn query_commit_status<'a>(&self) -> BoxFuture<'a, Result<CommitStatus, anyhow::Error>> {
        async {
            let provider = connect_to_service::<CommitStatusProviderMarker>()
                .context("while connecting to commit status provider")?;
            query_commit_status(&provider).await
        }
        .boxed()
    }
}

#[cfg(test)]
pub(crate) mod tests {
    use super::*;
    use crate::errors;
    use event_queue::{ClosedClient, Notify};
    use fuchsia_async::{DurationExt, TimeoutExt};
    use fuchsia_zircon::prelude::*;
    use fuchsia_zircon::{self as zx};
    use futures::channel::mpsc::{channel, Receiver, Sender};
    use futures::channel::oneshot;
    use futures::future::BoxFuture;
    use futures::lock::Mutex as AsyncMutex;
    use matches::assert_matches;
    use parking_lot::Mutex;
    use std::sync::atomic::{AtomicU64, Ordering};

    pub const CALLBACK_CHANNEL_SIZE: usize = 20;
    pub const CURRENT_SYSTEM_IMAGE: &str =
        "0000000000000000000000000000000000000000000000000000000000000000";
    pub const LATEST_SYSTEM_IMAGE: &str =
        "1111111111111111111111111111111111111111111111111111111111111111";
    pub const CURRENT_UPDATE_PACKAGE: &str =
        "2222222222222222222222222222222222222222222222222222222222222222";

    pub(crate) struct FakeUpdateManagerControlHandle<N> {
        requests: mpsc::Receiver<UpdateManagerRequest<N>>,
    }

    impl<N> FakeUpdateManagerControlHandle<N> {
        pub(crate) fn new() -> (UpdateManagerControlHandle<N>, Self) {
            let (send, recv) = mpsc::channel(0);

            (UpdateManagerControlHandle(send), Self { requests: recv })
        }

        pub(crate) fn next(&mut self) -> Option<UpdateManagerRequest<N>> {
            self.requests.next().now_or_never().flatten()
        }
    }

    type CheckResultFactory = fn() -> Result<SystemUpdateStatus, crate::errors::Error>;

    #[derive(Clone)]
    pub struct FakeUpdateChecker {
        result: CheckResultFactory,
        call_count: Arc<AtomicU64>,
        // Taking this mutex blocks update checker.
        check_blocked: Arc<AsyncMutex<()>>,
    }
    impl FakeUpdateChecker {
        fn new(result: CheckResultFactory) -> Self {
            Self {
                result,
                call_count: Arc::new(AtomicU64::new(0)),
                check_blocked: Arc::new(AsyncMutex::new(())),
            }
        }
        pub fn new_up_to_date() -> Self {
            Self::new(|| {
                Ok(SystemUpdateStatus::UpToDate {
                    system_image: CURRENT_SYSTEM_IMAGE.parse().expect("valid hash"),
                    update_package: CURRENT_UPDATE_PACKAGE.parse().expect("valid hash"),
                })
            })
        }
        pub fn new_update_available() -> Self {
            Self::new(|| {
                Ok(SystemUpdateStatus::UpdateAvailable {
                    current_system_image: CURRENT_SYSTEM_IMAGE.parse().expect("valid hash"),
                    latest_system_image: LATEST_SYSTEM_IMAGE.parse().expect("valid hash"),
                })
            })
        }
        pub fn new_error() -> Self {
            Self::new(|| {
                Err(errors::Error::UpdatePackage(errors::UpdatePackage::Resolve(
                    zx::Status::INTERNAL,
                )))
            })
        }
        pub fn block(&self) -> Option<futures::lock::MutexGuard<'_, ()>> {
            self.check_blocked.try_lock()
        }
        pub fn call_count(&self) -> u64 {
            self.call_count.load(Ordering::SeqCst)
        }
    }
    impl UpdateChecker for FakeUpdateChecker {
        fn check<'a>(
            &self,
            _last_known_update_hash: Option<&'a Hash>,
        ) -> BoxFuture<'a, Result<SystemUpdateStatus, crate::errors::Error>> {
            let check_blocked = Arc::clone(&self.check_blocked);
            let result = (self.result)();
            self.call_count.fetch_add(1, Ordering::SeqCst);

            async move {
                check_blocked.lock().await;
                result
            }
            .boxed()
        }
    }

    #[derive(Clone)]
    pub struct FakeTargetChannelUpdater {
        call_count: Arc<AtomicU64>,
    }
    impl FakeTargetChannelUpdater {
        pub fn new() -> Self {
            Self { call_count: Arc::new(AtomicU64::new(0)) }
        }
        pub fn call_count(&self) -> u64 {
            self.call_count.load(Ordering::SeqCst)
        }
    }
    impl TargetChannelUpdater for FakeTargetChannelUpdater {
        fn update(&self) -> BoxFuture<'_, ()> {
            let call_count = self.call_count.clone();
            async move {
                call_count.fetch_add(1, Ordering::SeqCst);
            }
            .boxed()
        }
    }

    #[derive(Clone)]
    pub struct FakeCurrentChannelUpdater {
        call_count: Arc<AtomicU64>,
    }
    impl FakeCurrentChannelUpdater {
        pub fn new() -> Self {
            Self { call_count: Arc::new(AtomicU64::new(0)) }
        }
        pub fn call_count(&self) -> u64 {
            self.call_count.load(Ordering::SeqCst)
        }
    }
    impl CurrentChannelUpdater for FakeCurrentChannelUpdater {
        fn update(&self) -> BoxFuture<'_, ()> {
            let call_count = self.call_count.clone();
            async move {
                call_count.fetch_add(1, Ordering::SeqCst);
            }
            .boxed()
        }
    }

    #[derive(Clone)]
    pub struct UnreachableUpdateApplier;
    impl UpdateApplier for UnreachableUpdateApplier {
        fn apply<'a>(
            &self,
            _initiator: Initiator,
        ) -> BoxFuture<
            'a,
            Result<
                BoxStream<'a, Result<ApplyState, (ApplyProgress, anyhow::Error)>>,
                anyhow::Error,
            >,
        > {
            unreachable!();
        }
    }

    type ApplyResultFactory = fn() -> Result<
        BoxStream<'static, Result<ApplyState, (ApplyProgress, anyhow::Error)>>,
        crate::errors::Error,
    >;

    #[derive(Clone)]
    pub struct FakeUpdateApplier {
        result: ApplyResultFactory,
        call_count: Arc<AtomicU64>,
    }
    impl FakeUpdateApplier {
        pub fn new_success() -> Self {
            Self {
                result: || {
                    Ok(futures::stream::iter(vec![
                        Ok(ApplyState::InstallingUpdate(ApplyProgress::new(1000, 0.42))),
                        Ok(ApplyState::WaitingForReboot(ApplyProgress::new(1000, 1.0))),
                    ])
                    .chain(futures::stream::pending())
                    .boxed())
                },
                call_count: Arc::new(AtomicU64::new(0)),
            }
        }
        pub fn new_error() -> Self {
            Self {
                result: || Err(crate::errors::Error::SystemUpdaterFailed),
                call_count: Arc::new(AtomicU64::new(0)),
            }
        }
        pub fn call_count(&self) -> u64 {
            self.call_count.load(std::sync::atomic::Ordering::Relaxed)
        }
    }
    impl UpdateApplier for FakeUpdateApplier {
        fn apply<'a>(
            &self,
            _initiator: Initiator,
        ) -> BoxFuture<
            'a,
            Result<
                BoxStream<'a, Result<ApplyState, (ApplyProgress, anyhow::Error)>>,
                anyhow::Error,
            >,
        > {
            self.call_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
            future::ready((self.result)().map_err(|e| e.into())).boxed()
        }
    }

    #[derive(Clone)]
    pub struct FakeCommitQuerier {
        call_count: Arc<AtomicU64>,
        committed: bool,
    }

    impl FakeCommitQuerier {
        pub fn new() -> Self {
            Self { call_count: Arc::new(AtomicU64::new(0)), committed: true }
        }

        pub fn new_pending() -> Self {
            Self { call_count: Arc::new(AtomicU64::new(0)), committed: false }
        }
    }

    impl CommitQuerier for FakeCommitQuerier {
        fn query_commit_status<'a>(&self) -> BoxFuture<'a, Result<CommitStatus, anyhow::Error>> {
            self.call_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
            if self.committed {
                future::ready(Ok(CommitStatus::Committed)).boxed()
            } else {
                future::ready(Ok(CommitStatus::Pending)).boxed()
            }
        }
    }

    #[derive(Clone)]
    pub struct UnreachableNotifier;
    impl Notify for UnreachableNotifier {
        type Event = State;
        type NotifyFuture = BoxFuture<'static, Result<(), ClosedClient>>;
        fn notify(&self, _state: State) -> Self::NotifyFuture {
            unreachable!();
        }
    }

    #[derive(Clone, Debug)]
    pub struct StateChangeCollector {
        states: Arc<Mutex<Vec<State>>>,
    }
    impl StateChangeCollector {
        pub fn new() -> Self {
            Self { states: Arc::new(Mutex::new(vec![])) }
        }
        pub fn take_states(&self) -> Vec<State> {
            std::mem::replace(&mut self.states.lock(), vec![])
        }
    }
    impl Notify for StateChangeCollector {
        type Event = State;
        type NotifyFuture = future::Ready<Result<(), ClosedClient>>;
        fn notify(&self, state: State) -> Self::NotifyFuture {
            self.states.lock().push(state);
            future::ready(Ok(()))
        }
    }

    #[derive(Clone)]
    struct FakeStateNotifier {
        sender: Arc<Mutex<Sender<State>>>,
    }
    impl FakeStateNotifier {
        fn new_callback_and_receiver() -> (Self, Receiver<State>) {
            let (sender, receiver) = channel(CALLBACK_CHANNEL_SIZE);
            (Self { sender: Arc::new(Mutex::new(sender)) }, receiver)
        }
    }
    impl Notify for FakeStateNotifier {
        type Event = State;
        type NotifyFuture = future::Ready<Result<(), ClosedClient>>;
        fn notify(&self, state: State) -> Self::NotifyFuture {
            self.sender.lock().try_send(state).expect("FakeStateNotifier failed to send state");
            future::ready(Ok(()))
        }
    }

    type FakeUpdateManager = UpdateManager<
        FakeTargetChannelUpdater,
        FakeCurrentChannelUpdater,
        FakeUpdateChecker,
        FakeUpdateApplier,
        FakeStateNotifier,
        FakeCommitQuerier,
    >;

    type BlockingManagerManager = UpdateManager<
        FakeTargetChannelUpdater,
        FakeCurrentChannelUpdater,
        BlockingUpdateChecker,
        FakeUpdateApplier,
        FakeStateNotifier,
        FakeCommitQuerier,
    >;

    async fn next_n_states(receiver: &mut Receiver<State>, n: usize) -> Vec<State> {
        let mut v = Vec::with_capacity(n);
        for _ in 0..n {
            v.push(receiver.next().await.expect("next_n_states stream empty"));
        }
        v
    }

    #[fasync::run_singlethreaded(test)]
    async fn test_correct_initial_state() {
        let mut manager = FakeUpdateManager::from_checker_and_applier(
            Arc::new(FakeTargetChannelUpdater::new()),
            Arc::new(FakeCurrentChannelUpdater::new()),
            FakeUpdateChecker::new_up_to_date(),
            FakeUpdateApplier::new_success(),
            FakeCommitQuerier::new(),
        )
        .await
        .spawn();

        assert_eq!(manager.get_state().await, None);
    }

    #[fasync::run_singlethreaded(test)]
    async fn test_last_update_package_changed_when_no_update_available() {
        let fake_update_checker = FakeUpdateChecker::new_up_to_date();

        let mut manager = FakeUpdateManager::from_checker_and_applier_with_commit_status(
            Arc::new(FakeTargetChannelUpdater::new()),
            Arc::new(FakeCurrentChannelUpdater::new()),
            fake_update_checker,
            FakeUpdateApplier::new_success(),
            FakeCommitQuerier::new(),
            None,
        )
        .await
        .spawn();
        let (callback, receiver) = FakeStateNotifier::new_callback_and_receiver();

        let options = CheckOptions::builder().initiator(Initiator::User).build();
        assert_eq!(manager.try_start_update(options, Some(callback)).await, Ok(()));

        assert_eq!(
            receiver.collect::<Vec<State>>().await,
            vec![State::CheckingForUpdates, State::NoUpdateAvailable]
        );

        assert_eq!(
            manager.get_last_known_update_package_hash().await,
            Some(CURRENT_UPDATE_PACKAGE.parse().unwrap())
        );
    }

    #[fasync::run_singlethreaded(test)]
    async fn test_last_update_package_unchanged_when_update_available() {
        let fake_update_checker = FakeUpdateChecker::new_update_available();

        let mut manager = FakeUpdateManager::from_checker_and_applier_with_commit_status(
            Arc::new(FakeTargetChannelUpdater::new()),
            Arc::new(FakeCurrentChannelUpdater::new()),
            fake_update_checker,
            FakeUpdateApplier::new_success(),
            FakeCommitQuerier::new(),
            None,
        )
        .await
        .spawn();
        let (callback, mut receiver) = FakeStateNotifier::new_callback_and_receiver();

        let options = CheckOptions::builder().initiator(Initiator::User).build();
        assert_eq!(manager.try_start_update(options, Some(callback)).await, Ok(()));

        assert_eq!(
            next_n_states(&mut receiver, 4).await,
            vec![
                State::CheckingForUpdates,
                State::InstallingUpdate(InstallingData {
                    update: Some(UpdateInfo {
                        version_available: Some(LATEST_SYSTEM_IMAGE.to_string()),
                        download_size: None,
                    }),
                    installation_progress: None,
                }),
                State::InstallingUpdate(InstallingData {
                    update: Some(UpdateInfo {
                        version_available: Some(LATEST_SYSTEM_IMAGE.to_string()),
                        download_size: Some(1000),
                    }),
                    installation_progress: Some(InstallationProgress {
                        fraction_completed: Some(0.42)
                    })
                }),
                State::WaitingForReboot(InstallingData {
                    update: Some(UpdateInfo {
                        version_available: Some(LATEST_SYSTEM_IMAGE.to_string()),
                        download_size: Some(1000),
                    }),
                    installation_progress: Some(InstallationProgress {
                        fraction_completed: Some(1.0)
                    })
                }),
            ]
        );

        assert_eq!(manager.get_last_known_update_package_hash().await, None);
    }

    #[fasync::run_singlethreaded(test)]
    async fn test_is_current_status_committed_called_when_none() {
        let fake_commit_querier = FakeCommitQuerier::new();
        let fidl_call_count = Arc::clone(&fake_commit_querier.call_count);

        let mut manager = FakeUpdateManager::from_checker_and_applier_with_commit_status(
            Arc::new(FakeTargetChannelUpdater::new()),
            Arc::new(FakeCurrentChannelUpdater::new()),
            FakeUpdateChecker::new_update_available(),
            FakeUpdateApplier::new_success(),
            fake_commit_querier,
            None,
        )
        .await
        .spawn();

        let options = CheckOptions::builder().initiator(Initiator::User).build();
        assert_eq!(manager.try_start_update(options, None).await, Ok(()));

        assert_eq!(fidl_call_count.load(Ordering::SeqCst), 1);
    }

    #[fasync::run_singlethreaded(test)]
    async fn test_is_current_status_committed_called_when_pending() {
        let fake_commit_querier = FakeCommitQuerier::new();
        let fidl_call_count = Arc::clone(&fake_commit_querier.call_count);

        let mut manager = FakeUpdateManager::from_checker_and_applier_with_commit_status(
            Arc::new(FakeTargetChannelUpdater::new()),
            Arc::new(FakeCurrentChannelUpdater::new()),
            FakeUpdateChecker::new_update_available(),
            FakeUpdateApplier::new_success(),
            fake_commit_querier,
            Some(CommitStatus::Pending),
        )
        .await
        .spawn();

        let options = CheckOptions::builder().initiator(Initiator::User).build();
        assert_eq!(manager.try_start_update(options, None).await, Ok(()));

        assert_eq!(fidl_call_count.load(Ordering::SeqCst), 1);
    }

    #[fasync::run_singlethreaded(test)]
    async fn test_is_current_status_committed_not_called_when_committed() {
        let fake_commit_querier = FakeCommitQuerier::new();
        let fidl_call_count = Arc::clone(&fake_commit_querier.call_count);

        let mut manager = FakeUpdateManager::from_checker_and_applier_with_commit_status(
            Arc::new(FakeTargetChannelUpdater::new()),
            Arc::new(FakeCurrentChannelUpdater::new()),
            FakeUpdateChecker::new_update_available(),
            FakeUpdateApplier::new_success(),
            fake_commit_querier,
            Some(CommitStatus::Committed),
        )
        .await
        .spawn();

        let options = CheckOptions::builder().initiator(Initiator::User).build();
        assert_eq!(manager.try_start_update(options, None).await, Ok(()));

        assert_eq!(fidl_call_count.load(Ordering::SeqCst), 0);
    }

    #[fasync::run_singlethreaded(test)]
    async fn test_try_start_update_returns_started() {
        let mut manager = FakeUpdateManager::from_checker_and_applier(
            Arc::new(FakeTargetChannelUpdater::new()),
            Arc::new(FakeCurrentChannelUpdater::new()),
            FakeUpdateChecker::new_up_to_date(),
            FakeUpdateApplier::new_success(),
            FakeCommitQuerier::new(),
        )
        .await
        .spawn();

        let options = CheckOptions::builder().initiator(Initiator::User).build();
        assert_eq!(manager.try_start_update(options, None).await, Ok(()));
    }

    #[fasync::run_singlethreaded(test)]
    async fn test_temporary_callbacks_dropped_after_update_attempt() {
        let mut manager = FakeUpdateManager::from_checker_and_applier(
            Arc::new(FakeTargetChannelUpdater::new()),
            Arc::new(FakeCurrentChannelUpdater::new()),
            FakeUpdateChecker::new_up_to_date(),
            FakeUpdateApplier::new_success(),
            FakeCommitQuerier::new(),
        )
        .await
        .spawn();
        let (callback, receiver) = FakeStateNotifier::new_callback_and_receiver();

        let options = CheckOptions::builder().initiator(Initiator::User).build();
        manager.try_start_update(options.clone(), Some(callback)).await.unwrap();

        // Drain the stream of status updates, which is only closed when the update attempt
        // completes and the callbacks are dropped, so this would hang if the callback is not
        // dropped after the update attempt.
        assert_eq!(
            receiver.collect::<Vec<State>>().await,
            vec![State::CheckingForUpdates, State::NoUpdateAvailable]
        );
    }

    #[fasync::run_singlethreaded(test)]
    async fn test_try_start_update_callback_when_up_to_date() {
        let mut manager = FakeUpdateManager::from_checker_and_applier(
            Arc::new(FakeTargetChannelUpdater::new()),
            Arc::new(FakeCurrentChannelUpdater::new()),
            FakeUpdateChecker::new_up_to_date(),
            FakeUpdateApplier::new_success(),
            FakeCommitQuerier::new(),
        )
        .await
        .spawn();
        let (callback, receiver) = FakeStateNotifier::new_callback_and_receiver();

        let options = CheckOptions::builder().initiator(Initiator::User).build();
        manager.try_start_update(options, Some(callback)).await.unwrap();

        assert_eq!(
            receiver.collect::<Vec<State>>().await,
            vec![State::CheckingForUpdates, State::NoUpdateAvailable]
        );
    }

    #[fasync::run_singlethreaded(test)]
    async fn test_try_start_update_callback_when_update_available_and_apply_errors() {
        let mut manager = FakeUpdateManager::from_checker_and_applier(
            Arc::new(FakeTargetChannelUpdater::new()),
            Arc::new(FakeCurrentChannelUpdater::new()),
            FakeUpdateChecker::new_update_available(),
            FakeUpdateApplier::new_error(),
            FakeCommitQuerier::new(),
        )
        .await
        .spawn();
        let (callback, receiver) = FakeStateNotifier::new_callback_and_receiver();
        let expected_update_info = Some(UpdateInfo {
            version_available: Some(LATEST_SYSTEM_IMAGE.to_string()),
            download_size: None,
        });

        let options = CheckOptions::builder().initiator(Initiator::User).build();
        manager.try_start_update(options, Some(callback)).await.unwrap();

        assert_eq!(
            receiver.collect::<Vec<State>>().await,
            vec![
                State::CheckingForUpdates,
                State::InstallingUpdate(InstallingData {
                    update: expected_update_info.clone(),
                    installation_progress: None,
                }),
                State::InstallationError(InstallationErrorData {
                    update: expected_update_info,
                    installation_progress: None,
                }),
            ]
        );
    }

    #[fasync::run_singlethreaded(test)]
    async fn test_try_start_update_callback_when_update_available_and_apply_succeeds() {
        let mut manager = FakeUpdateManager::from_checker_and_applier(
            Arc::new(FakeTargetChannelUpdater::new()),
            Arc::new(FakeCurrentChannelUpdater::new()),
            FakeUpdateChecker::new_update_available(),
            FakeUpdateApplier::new_success(),
            FakeCommitQuerier::new(),
        )
        .await
        .spawn();
        let (callback, mut receiver) = FakeStateNotifier::new_callback_and_receiver();

        let options = CheckOptions::builder().initiator(Initiator::User).build();
        manager.try_start_update(options, Some(callback)).await.unwrap();

        assert_eq!(
            next_n_states(&mut receiver, 4).await,
            vec![
                State::CheckingForUpdates,
                State::InstallingUpdate(InstallingData {
                    update: Some(UpdateInfo {
                        version_available: Some(LATEST_SYSTEM_IMAGE.to_string()),
                        download_size: None,
                    }),
                    installation_progress: None
                }),
                State::InstallingUpdate(InstallingData {
                    update: Some(UpdateInfo {
                        version_available: Some(LATEST_SYSTEM_IMAGE.to_string()),
                        download_size: Some(1000),
                    }),
                    installation_progress: Some(InstallationProgress {
                        fraction_completed: Some(0.42)
                    })
                }),
                State::WaitingForReboot(InstallingData {
                    update: Some(UpdateInfo {
                        version_available: Some(LATEST_SYSTEM_IMAGE.to_string()),
                        download_size: Some(1000),
                    }),
                    installation_progress: Some(InstallationProgress {
                        fraction_completed: Some(1.0)
                    })
                }),
            ]
        );

        // The update attempt will never leave the WaitingForReboot state.
        assert_eq!(
            receiver.next().map(Some).on_timeout(100.millis().after_now(), || None).await,
            None
        );
    }

    #[fasync::run_singlethreaded(test)]
    async fn test_check_start_update_callback_when_update_available_and_pending() {
        let mut manager = FakeUpdateManager::from_checker_and_applier(
            Arc::new(FakeTargetChannelUpdater::new()),
            Arc::new(FakeCurrentChannelUpdater::new()),
            FakeUpdateChecker::new_update_available(),
            FakeUpdateApplier::new_success(),
            FakeCommitQuerier::new_pending(),
        )
        .await
        .spawn();
        let (callback, mut receiver) = FakeStateNotifier::new_callback_and_receiver();

        let options = CheckOptions::builder().initiator(Initiator::User).build();
        manager.try_start_update(options, Some(callback)).await.unwrap();

        assert_eq!(
            next_n_states(&mut receiver, 2).await,
            vec![
                State::CheckingForUpdates,
                State::InstallationDeferredByPolicy(InstallationDeferredData {
                    update: Some(UpdateInfo {
                        version_available: Some(LATEST_SYSTEM_IMAGE.to_string()),
                        download_size: None
                    }),
                    deferral_reason: Some(InstallationDeferralReason::CurrentSystemNotCommitted)
                }),
            ]
        );
    }

    #[fasync::run_singlethreaded(test)]
    async fn test_channel_updater_called() {
        let channel_updater = Arc::new(FakeTargetChannelUpdater::new());
        let mut manager = UpdateManager::from_checker_and_applier(
            Arc::clone(&channel_updater),
            Arc::new(FakeCurrentChannelUpdater::new()),
            FakeUpdateChecker::new_up_to_date(),
            UnreachableUpdateApplier,
            FakeCommitQuerier::new(),
        )
        .await
        .spawn();
        let (callback, receiver) = FakeStateNotifier::new_callback_and_receiver();

        let options = CheckOptions::builder().initiator(Initiator::User).build();
        manager.try_start_update(options, Some(callback)).await.unwrap();
        let _ = receiver.collect::<Vec<State>>().await;

        assert_eq!(channel_updater.call_count(), 1);
    }

    #[fasync::run_singlethreaded(test)]
    async fn test_update_applier_called_if_update_available() {
        let update_applier = FakeUpdateApplier::new_error();
        let mut manager = FakeUpdateManager::from_checker_and_applier(
            Arc::new(FakeTargetChannelUpdater::new()),
            Arc::new(FakeCurrentChannelUpdater::new()),
            FakeUpdateChecker::new_update_available(),
            update_applier.clone(),
            FakeCommitQuerier::new(),
        )
        .await
        .spawn();
        let (callback, receiver) = FakeStateNotifier::new_callback_and_receiver();

        let options = CheckOptions::builder().initiator(Initiator::User).build();
        manager.try_start_update(options, Some(callback)).await.unwrap();
        let _ = receiver.collect::<Vec<State>>().await;

        assert_eq!(update_applier.call_count(), 1);
    }

    #[fasync::run_singlethreaded(test)]
    async fn test_update_applier_not_called_if_up_to_date() {
        let update_applier = FakeUpdateApplier::new_error();
        let current_channel_updater = Arc::new(FakeCurrentChannelUpdater::new());
        let mut manager = FakeUpdateManager::from_checker_and_applier(
            Arc::new(FakeTargetChannelUpdater::new()),
            Arc::clone(&current_channel_updater),
            FakeUpdateChecker::new_up_to_date(),
            update_applier.clone(),
            FakeCommitQuerier::new(),
        )
        .await
        .spawn();
        let (callback, receiver) = FakeStateNotifier::new_callback_and_receiver();

        let options = CheckOptions::builder().initiator(Initiator::User).build();
        manager.try_start_update(options, Some(callback)).await.unwrap();
        let _ = receiver.collect::<Vec<State>>().await;

        assert_eq!(update_applier.call_count(), 0);
        assert_eq!(current_channel_updater.call_count(), 1);
    }

    #[fasync::run_singlethreaded(test)]
    async fn test_return_to_initial_state_on_update_check_error() {
        let mut manager = FakeUpdateManager::from_checker_and_applier(
            Arc::new(FakeTargetChannelUpdater::new()),
            Arc::new(FakeCurrentChannelUpdater::new()),
            FakeUpdateChecker::new_error(),
            FakeUpdateApplier::new_error(),
            FakeCommitQuerier::new(),
        )
        .await
        .spawn();
        let (callback, receiver) = FakeStateNotifier::new_callback_and_receiver();

        let options = CheckOptions::builder().initiator(Initiator::User).build();
        manager.try_start_update(options, Some(callback)).await.unwrap();
        let _ = receiver.collect::<Vec<State>>().await;

        assert_eq!(manager.get_state().await, Default::default());
    }

    #[fasync::run_singlethreaded(test)]
    async fn test_return_to_initial_state_on_update_apply_error() {
        let mut manager = FakeUpdateManager::from_checker_and_applier(
            Arc::new(FakeTargetChannelUpdater::new()),
            Arc::new(FakeCurrentChannelUpdater::new()),
            FakeUpdateChecker::new_update_available(),
            FakeUpdateApplier::new_error(),
            FakeCommitQuerier::new(),
        )
        .await
        .spawn();
        let (callback, receiver) = FakeStateNotifier::new_callback_and_receiver();

        let options = CheckOptions::builder().initiator(Initiator::User).build();
        manager.try_start_update(options, Some(callback)).await.unwrap();
        let _ = receiver.collect::<Vec<State>>().await;

        assert_eq!(manager.get_state().await, Default::default());
    }

    #[derive(Clone)]
    pub struct BlockingUpdateChecker {
        blocker: future::Shared<oneshot::Receiver<()>>,
    }
    impl BlockingUpdateChecker {
        pub fn new_checker_and_sender() -> (Self, oneshot::Sender<()>) {
            let (sender, receiver) = oneshot::channel();
            let blocking_update_checker = BlockingUpdateChecker { blocker: receiver.shared() };
            (blocking_update_checker, sender)
        }
    }
    impl UpdateChecker for BlockingUpdateChecker {
        fn check<'a>(
            &self,
            _last_known_update_hash: Option<&'a Hash>,
        ) -> BoxFuture<'a, Result<SystemUpdateStatus, crate::errors::Error>> {
            let blocker = self.blocker.clone();
            async move {
                assert!(blocker.await.is_ok(), "blocking future cancelled");
                Ok(SystemUpdateStatus::UpdateAvailable {
                    current_system_image: CURRENT_SYSTEM_IMAGE.parse().expect("valid hash"),
                    latest_system_image: LATEST_SYSTEM_IMAGE.parse().expect("valid hash"),
                })
            }
            .boxed()
        }
    }

    #[fasync::run_singlethreaded(test)]
    async fn test_get_state_in_checking_for_updates() {
        let (blocking_update_checker, sender) = BlockingUpdateChecker::new_checker_and_sender();
        let mut manager = BlockingManagerManager::from_checker_and_applier(
            Arc::new(FakeTargetChannelUpdater::new()),
            Arc::new(FakeCurrentChannelUpdater::new()),
            blocking_update_checker,
            FakeUpdateApplier::new_error(),
            FakeCommitQuerier::new(),
        )
        .await
        .spawn();

        let options = CheckOptions::builder().initiator(Initiator::User).build();
        manager.try_start_update(options, None).await.unwrap();

        // Wait for the update attempt to enter the CheckingForUpdates state, panicing if it
        // completes prematurely.
        loop {
            let state = manager.get_state().await.unwrap();
            if state == State::CheckingForUpdates {
                break;
            }
        }

        // Unblock the update attempt and verify that it eventually enters the idle state.
        sender.send(()).unwrap();
        while let Some(_) = manager.get_state().await {}
    }

    #[fasync::run_singlethreaded(test)]
    async fn test_no_concurrent_update_attempts_if_attach_not_requested() {
        let (blocking_update_checker, sender) = BlockingUpdateChecker::new_checker_and_sender();
        let update_applier = FakeUpdateApplier::new_error();
        let mut manager = BlockingManagerManager::from_checker_and_applier(
            Arc::new(FakeTargetChannelUpdater::new()),
            Arc::new(FakeCurrentChannelUpdater::new()),
            blocking_update_checker,
            update_applier.clone(),
            FakeCommitQuerier::new(),
        )
        .await
        .spawn();
        let (callback, receiver) = FakeStateNotifier::new_callback_and_receiver();

        let options = CheckOptions::builder().initiator(Initiator::User).build();
        let res0 = manager.try_start_update(options.clone(), Some(callback)).await;
        let res1 = manager.try_start_update(options, None).await;
        assert_matches!(sender.send(()), Ok(()));
        let _ = receiver.collect::<Vec<State>>().await;

        assert_eq!(res0, Ok(()));
        assert_eq!(res1, Err(CheckNotStartedReason::AlreadyInProgress));
        assert_eq!(update_applier.call_count(), 1);
    }

    #[fasync::run_singlethreaded(test)]
    async fn test_merge_update_attempt_monitors_if_attach_requested() {
        let (blocking_update_checker, sender) = BlockingUpdateChecker::new_checker_and_sender();
        let update_applier = FakeUpdateApplier::new_error();
        let mut manager = BlockingManagerManager::from_checker_and_applier(
            Arc::new(FakeTargetChannelUpdater::new()),
            Arc::new(FakeCurrentChannelUpdater::new()),
            blocking_update_checker,
            update_applier.clone(),
            FakeCommitQuerier::new(),
        )
        .await
        .spawn();
        let (callback0, receiver0) = FakeStateNotifier::new_callback_and_receiver();
        let (callback1, receiver1) = FakeStateNotifier::new_callback_and_receiver();
        let options = CheckOptions::builder().initiator(Initiator::User);

        let res0 = manager.try_start_update(options.clone().build(), Some(callback0)).await;
        let res1 = manager
            .try_start_update(
                options.allow_attaching_to_existing_update_check(true).build(),
                Some(callback1),
            )
            .await;
        assert_matches!(sender.send(()), Ok(()));
        let states0 = receiver0.collect::<Vec<State>>().await;
        let states1 = receiver1.collect::<Vec<State>>().await;

        assert_eq!(res0, Ok(()));
        assert_eq!(res1, Ok(()));
        assert_eq!(update_applier.call_count(), 1);
        assert_eq!(states0, states1);
    }
}
