// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

use {
    crate::model::{
        component::{ComponentInstance, InstanceState},
        error::ModelError,
        events::{dispatcher::EventDispatcherScope, event::Event, filter::EventFilter},
        hooks::{Event as HookEvent, EventType},
        model::Model,
    },
    async_trait::async_trait,
    cm_rust::CapabilityName,
    fuchsia_async as fasync,
    futures::{channel::mpsc, future::join_all, stream, SinkExt, StreamExt},
    log::error,
    moniker::{AbsoluteMoniker, ExtendedMoniker},
    std::{
        collections::{HashMap, HashSet},
        sync::{Arc, Weak},
    },
};

/// A component instance or component manager itself
pub enum ExtendedComponent {
    ComponentManager,
    ComponentInstance(Arc<ComponentInstance>),
}

impl From<Arc<ComponentInstance>> for ExtendedComponent {
    fn from(instance: Arc<ComponentInstance>) -> Self {
        Self::ComponentInstance(instance)
    }
}

impl From<&Arc<ComponentInstance>> for ExtendedComponent {
    fn from(instance: &Arc<ComponentInstance>) -> Self {
        Self::ComponentInstance(instance.clone())
    }
}

/// Implementors of this trait know how to synthesize an event.
#[async_trait]
pub trait EventSynthesisProvider: Send + Sync {
    /// Provides a synthesized event applying the given `filter` under the given `component`.
    async fn provide(&self, component: ExtendedComponent, filter: &EventFilter) -> Vec<HookEvent>;
}

/// Synthesis manager.
pub struct EventSynthesizer {
    /// A reference to the model.
    model: Weak<Model>,

    /// Maps an event name to the provider for synthesis
    providers: HashMap<CapabilityName, Arc<dyn EventSynthesisProvider>>,
}

impl EventSynthesizer {
    /// Creates a new event synthesizer.
    pub fn new(model: Weak<Model>) -> Self {
        Self { model, providers: HashMap::new() }
    }

    /// Registers a new provider that will be used when synthesizing events of the type `event`.
    pub fn register_provider(
        &mut self,
        event: EventType,
        provider: Arc<dyn EventSynthesisProvider>,
    ) {
        self.providers.insert(CapabilityName(event.to_string()), provider);
    }

    /// Spawns a synthesis task for the requested `events`. Resulting events will be sent on the
    /// `sender` channel.
    pub fn spawn_synthesis(
        &self,
        sender: mpsc::UnboundedSender<Event>,
        events: HashMap<CapabilityName, Vec<EventDispatcherScope>>,
    ) {
        SynthesisTask::new(&self, sender, events).spawn()
    }
}

/// Information about an event that will be synthesized.
struct EventSynthesisInfo {
    /// The provider of the synthesized event.
    provider: Arc<dyn EventSynthesisProvider>,

    /// The scopes under which the event will be synthesized.
    scopes: Vec<EventDispatcherScope>,
}

struct SynthesisTask {
    /// A reference to the model.
    model: Weak<Model>,

    /// The sender end of the channel where synthesized events will be sent.
    sender: mpsc::UnboundedSender<Event>,

    /// Information about the events to synthesize
    event_infos: Vec<EventSynthesisInfo>,
}

impl SynthesisTask {
    /// Creates a new synthesis task from the given events. It will ignore events for which the
    /// `synthesizer` doesn't have a provider.
    pub fn new(
        synthesizer: &EventSynthesizer,
        sender: mpsc::UnboundedSender<Event>,
        mut events: HashMap<CapabilityName, Vec<EventDispatcherScope>>,
    ) -> Self {
        let event_infos = synthesizer
            .providers
            .iter()
            .filter_map(|(event_name, provider)| {
                events
                    .remove(event_name)
                    .map(|scopes| EventSynthesisInfo { provider: provider.clone(), scopes })
            })
            .collect();
        Self { model: synthesizer.model.clone(), sender, event_infos }
    }

    /// Spawns a task that will synthesize all events that were requested when creating the
    /// `SynthesisTask`
    pub fn spawn(self) {
        if self.event_infos.is_empty() {
            return;
        }
        fasync::Task::spawn(async move {
            // If we can't find the component then we can't synthesize events.
            // This isn't necessarily an error as the model or component might've been
            // destroyed in the intervening time, so we just exit early.
            if let Some(model) = self.model.upgrade() {
                let sender = self.sender;
                let futs = self
                    .event_infos
                    .into_iter()
                    .map(|event_info| Self::run(&model, sender.clone(), event_info));
                for result in join_all(futs).await {
                    if let Err(e) = result {
                        error!("Event synthesis failed: {:?}", e);
                    }
                }
            }
        })
        .detach();
    }

    /// Performs a depth-first traversal of the component instance tree. It adds to the stream a
    /// `Running` event for all components that are running. In the case of overlapping scopes,
    /// events are deduped.  It also synthesizes events that were requested which are synthesizable
    /// (there's a provider for them). Those events will only be synthesized if their scope is
    /// within the scope of a Running scope.
    async fn run(
        model: &Arc<Model>,
        mut sender: mpsc::UnboundedSender<Event>,
        info: EventSynthesisInfo,
    ) -> Result<(), ModelError> {
        let mut visited_components = HashSet::new();
        for scope in info.scopes {
            // If the scope is component manager, synthesize the builtin events first and then
            // proceed to synthesize from the root and down.
            let scope_moniker = match scope.moniker {
                ExtendedMoniker::ComponentManager => {
                    // Ignore this error. This can occur when the event stream is closed in the
                    // middle of synthesis. We can finish synthesizing if an error happens.
                    if let Err(_) = Self::send_events(
                        &info.provider,
                        &scope,
                        ExtendedComponent::ComponentManager,
                        &mut sender,
                    )
                    .await
                    {
                        return Ok(());
                    }
                    AbsoluteMoniker::root()
                }
                ExtendedMoniker::ComponentInstance(ref scope_moniker) => scope_moniker.clone(),
            };
            let root = model.look_up(&scope_moniker).await?;
            let mut component_stream = get_subcomponents(root, visited_components.clone());
            while let Some(component) = component_stream.next().await {
                visited_components.insert(component.abs_moniker.clone());
                if let Err(_) = Self::send_events(
                    &info.provider,
                    &scope,
                    ExtendedComponent::ComponentInstance(component),
                    &mut sender,
                )
                .await
                {
                    return Ok(());
                }
            }
        }
        Ok(())
    }

    async fn send_events(
        provider: &Arc<dyn EventSynthesisProvider>,
        scope: &EventDispatcherScope,
        target_component: ExtendedComponent,
        sender: &mut mpsc::UnboundedSender<Event>,
    ) -> Result<(), anyhow::Error> {
        let events = provider.provide(target_component, &scope.filter).await;
        for event in events {
            let event = Event { event, scope_moniker: scope.moniker.clone(), responder: None };
            sender.send(event).await?;
        }
        Ok(())
    }
}

/// Returns all components that are under the given `root` component. Skips the ones whose moniker
/// is contained in the `visited` set.  The visited set is included for early pruning of a tree
/// branch.
fn get_subcomponents(
    root: Arc<ComponentInstance>,
    visited: HashSet<AbsoluteMoniker>,
) -> stream::BoxStream<'static, Arc<ComponentInstance>> {
    let pending = vec![root];
    stream::unfold((pending, visited), move |(mut pending, mut visited)| async move {
        loop {
            match pending.pop() {
                None => return None,
                Some(curr_component) => {
                    if visited.contains(&curr_component.abs_moniker) {
                        continue;
                    }
                    let state_guard = curr_component.lock_state().await;
                    match *state_guard {
                        InstanceState::New
                        | InstanceState::Discovered
                        | InstanceState::Destroyed => {}
                        InstanceState::Resolved(ref s) => {
                            for (_, child) in s.live_children() {
                                pending.push(child.clone());
                            }
                        }
                    }
                    drop(state_guard);
                    visited.insert(curr_component.abs_moniker.clone());
                    return Some((curr_component, (pending, visited)));
                }
            }
        }
    })
    .boxed()
}

#[cfg(test)]
mod tests {
    use {
        super::*,
        crate::model::{
            events::{
                mode_set::EventModeSet,
                registry::{EventRegistry, RoutedEvent, SubscriptionOptions},
                stream::EventStream,
            },
            hooks::{EventError, EventErrorPayload, EventPayload},
            testing::{routing_test_helpers::*, test_helpers::*},
        },
        cm_rust::{
            DirectoryDecl, EventMode, ExposeDecl, ExposeDirectoryDecl, ExposeSource, ExposeTarget,
        },
        fidl_fuchsia_io2 as fio,
        fuchsia_component::server::ServiceFs,
        std::iter::FromIterator,
    };

    struct CreateStreamArgs<'a> {
        registry: &'a EventRegistry,
        scope_monikers: Vec<AbsoluteMoniker>,
        events: Vec<EventType>,
        include_builtin: bool,
    }

    // Shows that we see Running only for components that are bound at the moment of subscription.
    #[fuchsia::test]
    async fn synthesize_only_running() {
        let test = setup_synthesis_test().await;

        // Bind: b, c, e. We should see Running events only for these.
        test.bind_instance(&vec!["b:0"].into()).await.expect("bind instance b success");
        test.bind_instance(&vec!["c:0"].into()).await.expect("bind instance c success");
        test.bind_instance(&vec!["c:0", "e:0"].into()).await.expect("bind instance e success");

        let registry = test.builtin_environment.event_registry.clone();
        let mut event_stream = create_stream(CreateStreamArgs {
            registry: &registry,
            scope_monikers: vec![AbsoluteMoniker::root()],
            events: vec![EventType::Started, EventType::Running],
            include_builtin: false,
        })
        .await;

        // Bind f, this will be a Started event.
        test.bind_instance(&vec!["c:0", "f:0"].into()).await.expect("bind instance success");

        let mut result_monikers = HashSet::new();
        while result_monikers.len() < 5 {
            let event = event_stream.next().await.expect("got running event");
            match event.event.result {
                Ok(EventPayload::Running { .. }) => {
                    if event.event.target_moniker.to_string() == "/c:0/f:0" {
                        // There's a chance of receiving Started and Running for the instance we
                        // just bound if it happens to start while we are synthesizing. We assert
                        // that instance separately and count it once.
                        continue;
                    }
                    result_monikers.insert(event.event.target_moniker.to_string());
                }
                Ok(EventPayload::Started { .. }) => {
                    assert_eq!(event.event.target_moniker.to_string(), "/c:0/f:0");
                    result_monikers.insert(event.event.target_moniker.to_string());
                }
                payload => panic!("Expected running. Got: {:?}", payload),
            }
        }

        // Events might be out of order, sort them
        let expected_monikers = vec!["/", "/b:0", "/c:0", "/c:0/e:0", "/c:0/f:0"];
        let mut result_monikers = Vec::from_iter(result_monikers.into_iter());
        result_monikers.sort();
        assert_eq!(expected_monikers, result_monikers);
    }

    // Shows that we see Running a single time even if the subscription scopes intersect.
    #[fuchsia::test]
    async fn synthesize_overlapping_scopes() {
        let test = setup_synthesis_test().await;

        test.bind_instance(&vec!["b:0"].into()).await.expect("bind instance b success");
        test.bind_instance(&vec!["c:0"].into()).await.expect("bind instance c success");
        test.bind_instance(&vec!["b:0", "d:0"].into()).await.expect("bind instance d success");
        test.bind_instance(&vec!["c:0", "e:0"].into()).await.expect("bind instance e success");
        test.bind_instance(&vec!["c:0", "e:0", "g:0"].into())
            .await
            .expect("bind instance g success");
        test.bind_instance(&vec!["c:0", "e:0", "h:0"].into())
            .await
            .expect("bind instance h success");
        test.bind_instance(&vec!["c:0", "f:0"].into()).await.expect("bind instance f success");

        // Subscribing with scopes /c, /c/e and /c/e/h
        let registry = test.builtin_environment.event_registry.clone();
        let mut event_stream = create_stream(CreateStreamArgs {
            registry: &registry,
            scope_monikers: vec![
                vec!["c:0"].into(),
                vec!["c:0", "e:0"].into(),
                vec!["c:0", "e:0", "h:0"].into(),
            ],
            events: vec![EventType::Started, EventType::Running],
            include_builtin: false,
        })
        .await;

        let result_monikers = get_and_sort_running_events(&mut event_stream, 5).await;
        let expected_monikers =
            vec!["/c:0", "/c:0/e:0", "/c:0/e:0/g:0", "/c:0/e:0/h:0", "/c:0/f:0"];
        assert_eq!(expected_monikers, result_monikers);

        // Verify we don't get more Running events.
        test.bind_instance(&vec!["c:0", "f:0", "i:0"].into())
            .await
            .expect("bind instance g success");
        let event = event_stream.next().await.expect("got started event");
        match event.event.result {
            Ok(EventPayload::Started { .. }) => {
                assert_eq!("/c:0/f:0/i:0", event.event.target_moniker.to_string());
            }
            payload => panic!("Expected started. Got: {:?}", payload),
        }
    }

    // Shows that we see Running only for components under the given scopes.
    #[fuchsia::test]
    async fn synthesize_non_overlapping_scopes() {
        let test = setup_synthesis_test().await;

        test.bind_instance(&vec!["b:0"].into()).await.expect("bind instance b success");
        test.bind_instance(&vec!["b:0", "d:0"].into()).await.expect("bind instance d success");
        test.bind_instance(&vec!["c:0"].into()).await.expect("bind instance c success");
        test.bind_instance(&vec!["c:0", "e:0"].into()).await.expect("bind instance e success");
        test.bind_instance(&vec!["c:0", "e:0", "g:0"].into())
            .await
            .expect("bind instance g success");
        test.bind_instance(&vec!["c:0", "e:0", "h:0"].into())
            .await
            .expect("bind instance g success");
        test.bind_instance(&vec!["c:0", "f:0"].into()).await.expect("bind instance g success");

        // Subscribing with scopes /c, /c/e and c/f/i
        let registry = test.builtin_environment.event_registry.clone();
        let mut event_stream = create_stream(CreateStreamArgs {
            registry: &registry,
            scope_monikers: vec![
                vec!["c:0"].into(),
                vec!["c:0", "e:0"].into(),
                vec!["c:0", "f:0", "i:0"].into(),
            ],
            events: vec![EventType::Started, EventType::Running],
            include_builtin: false,
        })
        .await;

        let result_monikers = get_and_sort_running_events(&mut event_stream, 5).await;
        let expected_monikers =
            vec!["/c:0", "/c:0/e:0", "/c:0/e:0/g:0", "/c:0/e:0/h:0", "/c:0/f:0"];
        assert_eq!(expected_monikers, result_monikers);

        // Verify we don't get more Running events.
        test.bind_instance(&vec!["c:0", "f:0", "i:0"].into())
            .await
            .expect("bind instance g success");
        let event = event_stream.next().await.expect("got started event");
        match event.event.result {
            Ok(EventPayload::Started { .. }) => {
                assert_eq!("/c:0/f:0/i:0", event.event.target_moniker.to_string());
            }
            payload => panic!("Expected started. Got: {:?}", payload),
        }
    }

    #[fuchsia::test]
    async fn synthesize_capability_ready() {
        let test = setup_synthesis_test().await;

        test.bind_instance(&vec!["b:0"].into()).await.expect("bind instance b success");
        test.bind_instance(&vec!["b:0", "d:0"].into()).await.expect("bind instance d success");
        test.bind_instance(&vec!["c:0"].into()).await.expect("bind instance c success");
        test.bind_instance(&vec!["c:0", "e:0"].into()).await.expect("bind instance e success");
        test.bind_instance(&vec!["c:0", "e:0", "g:0"].into())
            .await
            .expect("bind instance g success");
        test.bind_instance(&vec!["c:0", "e:0", "h:0"].into())
            .await
            .expect("bind instance h success");
        test.bind_instance(&vec!["c:0", "f:0"].into()).await.expect("bind instance f success");
        test.bind_instance(&vec!["c:0", "f:0", "i:0"].into())
            .await
            .expect("bind instance i success");

        let registry = test.builtin_environment.event_registry.clone();
        let mut event_stream = create_stream(CreateStreamArgs {
            registry: &registry,
            scope_monikers: vec![vec!["b:0"].into(), vec!["c:0", "e:0"].into()],
            events: vec![EventType::Running, EventType::CapabilityReady],
            include_builtin: false,
        })
        .await;

        // We expect 4 CapabilityReady events and 5 running events.
        // CR: b, d, e, g
        // RN: b, d, e, g, h
        let expected_capability_ready_monikers =
            vec!["/b:0", "/b:0/d:0", "/c:0/e:0", "/c:0/e:0/g:0"];
        let mut expected_running_monikers = expected_capability_ready_monikers.clone();
        expected_running_monikers.extend(vec!["/c:0/e:0/h:0"].into_iter());
        // We use sets given that the CapabilityReady could be dispatched twice: regular +
        // synthesized.
        let mut result_running_monikers = HashSet::new();
        let mut result_capability_ready_monikers = HashSet::new();
        while result_running_monikers.len() < 5 || result_capability_ready_monikers.len() < 4 {
            let event = event_stream.next().await.expect("got running event");
            match event.event.result {
                Ok(EventPayload::Running { .. }) => {
                    result_running_monikers.insert(event.event.target_moniker.to_string());
                }
                // We get an error cuz the component is not really serving the directory, but is
                // exposing it. For the purposes of the test, this is enough information.
                Err(EventError {
                    event_error_payload: EventErrorPayload::CapabilityReady { name, .. },
                    ..
                }) if name == "diagnostics" => {
                    result_capability_ready_monikers.insert(event.event.target_moniker.to_string());
                }
                payload => panic!("Expected running or capability ready. Got: {:?}", payload),
            }
        }
        let mut result_running = result_running_monikers.into_iter().collect::<Vec<_>>();
        let mut result_capability_ready =
            result_capability_ready_monikers.into_iter().collect::<Vec<_>>();
        result_running.sort();
        result_capability_ready.sort();
        assert_eq!(result_running, expected_running_monikers);
        assert_eq!(result_capability_ready, expected_capability_ready_monikers);
    }

    #[fuchsia::test]
    async fn synthesize_capability_ready_builtin() {
        let test = setup_synthesis_test().await;

        let mut fs = ServiceFs::new();
        test.builtin_environment.emit_diagnostics_for_test(&mut fs).expect("emitting diagnostics");

        let registry = test.builtin_environment.event_registry.clone();
        let mut event_stream = create_stream(CreateStreamArgs {
            registry: &registry,
            scope_monikers: vec![],
            events: vec![EventType::CapabilityReady],
            include_builtin: true,
        })
        .await;

        let event = event_stream.next().await.expect("got running event");
        match event.event.result {
            Ok(EventPayload::CapabilityReady { name, .. }) if name == "diagnostics" => {
                assert_eq!(event.event.target_moniker, ExtendedMoniker::ComponentManager);
            }
            payload => panic!("Expected running or capability ready. Got: {:?}", payload),
        }
    }

    async fn create_stream<'a>(args: CreateStreamArgs<'a>) -> EventStream {
        let mut scopes = args
            .scope_monikers
            .into_iter()
            .map(|moniker| EventDispatcherScope {
                moniker: moniker.into(),
                filter: EventFilter::debug(),
                mode_set: EventModeSet::new(cm_rust::EventMode::Sync),
            })
            .collect::<Vec<_>>();
        if args.include_builtin {
            scopes.push(EventDispatcherScope {
                moniker: ExtendedMoniker::ComponentManager,
                filter: EventFilter::debug(),
                mode_set: EventModeSet::new(cm_rust::EventMode::Sync),
            });
        }
        let events = args
            .events
            .into_iter()
            .map(|event| RoutedEvent {
                source_name: event.into(),
                mode: EventMode::Async,
                scopes: scopes.clone(),
            })
            .collect();
        args.registry
            .subscribe_with_routed_events(&SubscriptionOptions::default(), events)
            .await
            .expect("subscribe to event stream")
    }

    // Sets up the following topology (all children are lazy)
    //
    //     a
    //    / \
    //   b   c
    //  /   / \
    // d   e   f
    //    / \   \
    //   g   h   i
    async fn setup_synthesis_test() -> RoutingTest {
        let components = vec![
            (
                "a",
                ComponentDeclBuilder::new()
                    .directory(diagnostics_decl())
                    .expose(expose_diagnostics_decl())
                    .add_lazy_child("b")
                    .add_lazy_child("c")
                    .build(),
            ),
            (
                "b",
                ComponentDeclBuilder::new()
                    .directory(diagnostics_decl())
                    .expose(expose_diagnostics_decl())
                    .add_lazy_child("d")
                    .build(),
            ),
            ("c", ComponentDeclBuilder::new().add_lazy_child("e").add_lazy_child("f").build()),
            (
                "d",
                ComponentDeclBuilder::new()
                    .directory(diagnostics_decl())
                    .expose(expose_diagnostics_decl())
                    .build(),
            ),
            (
                "e",
                ComponentDeclBuilder::new()
                    .directory(diagnostics_decl())
                    .expose(expose_diagnostics_decl())
                    .add_lazy_child("g")
                    .add_lazy_child("h")
                    .build(),
            ),
            ("f", ComponentDeclBuilder::new().add_lazy_child("i").build()),
            (
                "g",
                ComponentDeclBuilder::new()
                    .directory(diagnostics_decl())
                    .expose(expose_diagnostics_decl())
                    .build(),
            ),
            ("h", ComponentDeclBuilder::new().build()),
            (
                "i",
                ComponentDeclBuilder::new()
                    .directory(diagnostics_decl())
                    .expose(expose_diagnostics_decl())
                    .build(),
            ),
        ];
        RoutingTest::new("a", components).await
    }

    async fn get_and_sort_running_events(
        event_stream: &mut EventStream,
        total: usize,
    ) -> Vec<String> {
        let mut result_monikers = Vec::new();
        for _ in 0..total {
            let event = event_stream.next().await.expect("got running event");
            match event.event.result {
                Ok(EventPayload::Running { .. }) => {
                    result_monikers.push(event.event.target_moniker.to_string());
                }
                payload => panic!("Expected running. Got: {:?}", payload),
            }
        }
        result_monikers.sort();
        result_monikers
    }

    fn diagnostics_decl() -> DirectoryDecl {
        DirectoryDeclBuilder::new("diagnostics").path("/diagnostics").build()
    }

    fn expose_diagnostics_decl() -> ExposeDecl {
        ExposeDecl::Directory(ExposeDirectoryDecl {
            source: ExposeSource::Self_,
            source_name: "diagnostics".into(),
            target_name: "diagnostics".into(),
            target: ExposeTarget::Framework,
            rights: Some(fio::Operations::Connect),
            subdir: None,
        })
    }
}
