// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

use {
    crate::{
        capability::{CapabilityProvider, CapabilitySource, InternalCapability},
        channel,
        model::{
            error::ModelError,
            hooks::{Event, EventPayload, EventType, Hook, HooksRegistration},
            routing::RoutingError,
        },
        work_scheduler::work_scheduler::{
            WorkScheduler, WORK_SCHEDULER_CAPABILITY_NAME, WORK_SCHEDULER_CONTROL_CAPABILITY_NAME,
        },
    },
    anyhow::Error,
    async_trait::async_trait,
    fidl::endpoints::ServerEnd,
    fidl_fuchsia_sys2 as fsys, fuchsia_async as fasync, fuchsia_zircon as zx,
    futures::TryStreamExt,
    log::warn,
    moniker::AbsoluteMoniker,
    std::{
        path::PathBuf,
        sync::{Arc, Weak},
    },
};

// TODO(markdittmer): Establish
// WorkSchedulerSystem -> (WorkScheduler, WorkSchedulerHook -> WorkScheduler).
impl WorkScheduler {
    /// Helper to specify hooks associated with `WorkScheduler`. Accepts `&Arc<WorkScheduler>` to
    /// produce references needed by `HooksRegistration` without consuming `Arc`.
    pub fn hooks(self: &Arc<Self>) -> Vec<HooksRegistration> {
        vec![HooksRegistration::new(
            "WorkScheduler",
            vec![EventType::Resolved, EventType::CapabilityRouted],
            Arc::downgrade(self) as Weak<dyn Hook>,
        )]
    }

    /// Route capability to access `fuchsia.sys2.WorkSchedulerControl` protocol as a framework
    /// capability.
    async fn on_framework_capability_routed_async<'a>(
        self: Arc<Self>,
        capability: &'a InternalCapability,
        capability_provider: Option<Box<dyn CapabilityProvider>>,
    ) -> Result<Option<Box<dyn CapabilityProvider>>, ModelError> {
        if capability_provider.is_none()
            && capability.matches_protocol(&WORK_SCHEDULER_CONTROL_CAPABILITY_NAME)
        {
            Ok(Some(Box::new(WorkSchedulerControlCapabilityProvider::new(self.clone()))
                as Box<dyn CapabilityProvider>))
        } else {
            Ok(capability_provider)
        }
    }

    /// Route capability to access `fuchsia.sys2.WorkScheduler` protocol as a scoped framework
    /// capability.
    async fn on_scoped_framework_capability_routed_async<'a>(
        self: Arc<Self>,
        scope_moniker: AbsoluteMoniker,
        capability: &'a InternalCapability,
        capability_provider: Option<Box<dyn CapabilityProvider>>,
    ) -> Result<Option<Box<dyn CapabilityProvider>>, ModelError> {
        if capability_provider.is_none()
            && capability.matches_protocol(&WORK_SCHEDULER_CAPABILITY_NAME)
        {
            // Only clients that expose the Worker protocol to the framework can
            // use WorkScheduler.
            if !self.verify_worker_exposed_to_framework(&scope_moniker).await {
                return Err(RoutingError::used_expose_not_found(
                    &scope_moniker,
                    WORK_SCHEDULER_CAPABILITY_NAME.to_string(),
                )
                .into());
            }
            Ok(Some(Box::new(WorkSchedulerCapabilityProvider::new(scope_moniker, self.clone()))
                as Box<dyn CapabilityProvider>))
        } else {
            Ok(capability_provider)
        }
    }
}

#[async_trait]
impl Hook for WorkScheduler {
    async fn on(self: Arc<Self>, event: &Event) -> Result<(), ModelError> {
        match &event.result {
            Ok(EventPayload::CapabilityRouted {
                source: CapabilitySource::Builtin { capability },
                capability_provider,
            }) => {
                let mut capability_provider = capability_provider.lock().await;
                *capability_provider = self
                    .on_framework_capability_routed_async(&capability, capability_provider.take())
                    .await?;
            }
            Ok(EventPayload::CapabilityRouted {
                source: CapabilitySource::Framework { capability, scope_moniker },
                capability_provider,
            }) => {
                let mut capability_provider = capability_provider.lock().await;
                *capability_provider = self
                    .on_scoped_framework_capability_routed_async(
                        scope_moniker.clone(),
                        &capability,
                        capability_provider.take(),
                    )
                    .await?;
            }
            Ok(EventPayload::Resolved { decl }) => {
                self.try_add_realm_as_worker(&event.target_moniker, &decl).await;
            }
            _ => {}
        };
        Ok(())
    }
}

/// `ComponentManagerCapabilityProvider` to invoke `WorkSchedulerControl` FIDL API bound to a
/// particular `WorkScheduler` object.
struct WorkSchedulerControlCapabilityProvider {
    work_scheduler: Arc<WorkScheduler>,
}

impl WorkSchedulerControlCapabilityProvider {
    fn new(work_scheduler: Arc<WorkScheduler>) -> Self {
        WorkSchedulerControlCapabilityProvider { work_scheduler }
    }

    /// Service `open` invocation via an event loop that dispatches FIDL operations to
    /// `work_scheduler`.
    async fn open_async(
        self,
        mut stream: fsys::WorkSchedulerControlRequestStream,
    ) -> Result<(), Error> {
        while let Some(request) = stream.try_next().await? {
            let work_scheduler = self.work_scheduler.clone();
            match request {
                fsys::WorkSchedulerControlRequest::GetBatchPeriod { responder, .. } => {
                    let mut result = work_scheduler.get_batch_period().await;
                    responder.send(&mut result)?;
                }
                fsys::WorkSchedulerControlRequest::SetBatchPeriod {
                    responder,
                    batch_period,
                    ..
                } => {
                    let mut result = work_scheduler.set_batch_period(batch_period).await;
                    responder.send(&mut result)?;
                }
            }
        }

        Ok(())
    }
}

#[async_trait]
impl CapabilityProvider for WorkSchedulerControlCapabilityProvider {
    /// Spawn an event loop to service `WorkScheduler` FIDL operations.
    async fn open(
        self: Box<Self>,
        _flags: u32,
        _open_mode: u32,
        _relative_path: PathBuf,
        server_end: &mut zx::Channel,
    ) -> Result<(), ModelError> {
        let server_end = channel::take_channel(server_end);
        let server_end = ServerEnd::<fsys::WorkSchedulerControlMarker>::new(server_end);
        let stream: fsys::WorkSchedulerControlRequestStream =
            server_end.into_stream().map_err(ModelError::stream_creation_error)?;
        fasync::Task::spawn(async move {
            let result = self.open_async(stream).await;
            if let Err(e) = result {
                // TODO(markdittmer): Set an epitaph to indicate this was an unexpected error.
                warn!("WorkSchedulerCapabilityProvider.open failed: {}", e);
            }
        })
        .detach();
        Ok(())
    }
}

/// `Capability` to invoke `WorkScheduler` FIDL API bound to a particular `WorkScheduler` object and
/// component instance's `AbsoluteMoniker`. All FIDL operations bound to the same object and moniker
/// observe the same collection of `WorkItem` objects.
struct WorkSchedulerCapabilityProvider {
    scope_moniker: AbsoluteMoniker,
    work_scheduler: Arc<WorkScheduler>,
}

impl WorkSchedulerCapabilityProvider {
    fn new(scope_moniker: AbsoluteMoniker, work_scheduler: Arc<WorkScheduler>) -> Self {
        WorkSchedulerCapabilityProvider { scope_moniker, work_scheduler }
    }

    /// Service `open` invocation via an event loop that dispatches FIDL operations to
    /// `work_scheduler`.
    async fn open_async(
        work_scheduler: Arc<WorkScheduler>,
        scope_moniker: AbsoluteMoniker,
        mut stream: fsys::WorkSchedulerRequestStream,
    ) -> Result<(), Error> {
        while let Some(request) = stream.try_next().await? {
            let work_scheduler = work_scheduler.clone();
            match request {
                fsys::WorkSchedulerRequest::ScheduleWork {
                    responder,
                    work_id,
                    work_request,
                    ..
                } => {
                    let mut result =
                        work_scheduler.schedule_work(&scope_moniker, &work_id, &work_request).await;
                    responder.send(&mut result)?;
                }
                fsys::WorkSchedulerRequest::CancelWork { responder, work_id, .. } => {
                    let mut result = work_scheduler.cancel_work(&scope_moniker, &work_id).await;
                    responder.send(&mut result)?;
                }
            }
        }
        Ok(())
    }
}

#[async_trait]
impl CapabilityProvider for WorkSchedulerCapabilityProvider {
    /// Spawn an event loop to service `WorkScheduler` FIDL operations.
    async fn open(
        self: Box<Self>,
        _flags: u32,
        _open_mode: u32,
        _relative_path: PathBuf,
        server_end: &mut zx::Channel,
    ) -> Result<(), ModelError> {
        let server_end = channel::take_channel(server_end);
        let server_end = ServerEnd::<fsys::WorkSchedulerMarker>::new(server_end);
        let stream: fsys::WorkSchedulerRequestStream =
            server_end.into_stream().map_err(ModelError::stream_creation_error)?;
        let work_scheduler = self.work_scheduler.clone();
        let scope_moniker = self.scope_moniker.clone();
        fasync::Task::spawn(async move {
            let result = Self::open_async(work_scheduler, scope_moniker, stream).await;
            if let Err(e) = result {
                // TODO(markdittmer): Set an epitaph to indicate this was an unexpected error.
                warn!("WorkSchedulerCapabilityProvider.open failed: {}", e);
            }
        })
        .detach();
        Ok(())
    }
}
