// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

use {
    crate::{
        capability::{CapabilitySource, InternalCapability},
        model::{
            component::{ComponentInstance, InstanceState},
            error::ModelError,
            events::{
                dispatcher::{EventDispatcher, EventDispatcherScope},
                error::EventsError,
                filter::EventFilter,
                mode_set::EventModeSet,
                stream::EventStream,
                synthesizer::{EventSynthesisProvider, EventSynthesizer},
            },
            hooks::{
                Event as ComponentEvent, EventPayload, EventType, HasEventType, Hook,
                HooksRegistration,
            },
            model::Model,
            routing::{route_capability, RouteRequest, RouteSource},
        },
    },
    async_trait::async_trait,
    cm_rust::{CapabilityName, EventMode, UseDecl, UseEventDecl},
    fuchsia_trace as trace,
    futures::lock::Mutex,
    moniker::{AbsoluteMoniker, ExtendedMoniker},
    std::{
        collections::HashMap,
        sync::{Arc, Weak},
    },
};

#[derive(Debug)]
pub struct RoutedEvent {
    pub source_name: CapabilityName,
    pub mode: EventMode,
    pub scopes: Vec<EventDispatcherScope>,
}

#[derive(Debug)]
pub struct RequestedEventState {
    pub mode: EventMode,
    pub scopes: Vec<EventDispatcherScope>,
}

impl RequestedEventState {
    pub fn new(mode: EventMode) -> Self {
        Self { mode, scopes: Vec::new() }
    }
}

#[derive(Debug)]
pub struct RouteEventsResult {
    /// Maps from source name to a mode and set of scope monikers.
    mapping: HashMap<CapabilityName, RequestedEventState>,
}

impl RouteEventsResult {
    fn new() -> Self {
        Self { mapping: HashMap::new() }
    }

    fn insert(
        &mut self,
        source_name: CapabilityName,
        mode: EventMode,
        scope: EventDispatcherScope,
    ) {
        let event_state = self.mapping.entry(source_name).or_insert(RequestedEventState::new(mode));
        if !event_state.scopes.contains(&scope) {
            event_state.scopes.push(scope);
        }
    }

    pub fn len(&self) -> usize {
        self.mapping.len()
    }

    pub fn contains_event(&self, event_name: &CapabilityName) -> bool {
        self.mapping.contains_key(event_name)
    }

    pub fn to_vec(self) -> Vec<RoutedEvent> {
        self.mapping
            .into_iter()
            .map(|(source_name, state)| RoutedEvent {
                source_name,
                mode: state.mode,
                scopes: state.scopes,
            })
            .collect()
    }
}

#[derive(Clone)]
pub struct SubscriptionOptions {
    /// Determines how event routing is done.
    pub subscription_type: SubscriptionType,
    /// Specifies the mode ComponentManager was started in.
    pub execution_mode: ExecutionMode,
}

#[derive(Debug)]
pub struct EventSubscription {
    pub event_name: CapabilityName,
    /// Determines whether component manager waits for a response from the
    /// event receiver.
    pub mode: EventMode,
}

impl EventSubscription {
    pub fn new(event_name: CapabilityName, mode: EventMode) -> Self {
        Self { event_name, mode }
    }
}

impl SubscriptionOptions {
    pub fn new(subscription_type: SubscriptionType, execution_mode: ExecutionMode) -> Self {
        Self { subscription_type, execution_mode }
    }
}

impl Default for SubscriptionOptions {
    fn default() -> SubscriptionOptions {
        SubscriptionOptions {
            subscription_type: SubscriptionType::Component(AbsoluteMoniker::root()),
            execution_mode: ExecutionMode::Production,
        }
    }
}

#[derive(Clone, PartialEq, Eq)]
pub enum SubscriptionType {
    /// Indicates that a client above the root is subscribing to events (e.g. a test).
    /// Event routing will be bypassed and all events can be subscribed.
    AboveRoot,
    /// Indicates that a component is subscribing to events and the target is
    /// the provided AbsoluteMoniker.
    Component(AbsoluteMoniker),
}

#[derive(Clone)]
pub enum ExecutionMode {
    /// Indicates that the component manager is running in Debug mode. This
    /// enables some additional events such as CapabilityRouted.
    Debug,
    /// Indicates that the component manager is running in Production mode.
    Production,
}

impl ExecutionMode {
    pub fn is_debug(&self) -> bool {
        match self {
            ExecutionMode::Debug => true,
            ExecutionMode::Production => false,
        }
    }
}

/// Subscribes to events from multiple tasks and sends events to all of them.
pub struct EventRegistry {
    model: Weak<Model>,
    dispatcher_map: Arc<Mutex<HashMap<CapabilityName, Vec<Weak<EventDispatcher>>>>>,
    event_synthesizer: EventSynthesizer,
}

impl EventRegistry {
    pub fn new(model: Weak<Model>) -> Self {
        let event_synthesizer = EventSynthesizer::new(model.clone());
        Self { model, dispatcher_map: Arc::new(Mutex::new(HashMap::new())), event_synthesizer }
    }

    pub fn hooks(self: &Arc<Self>) -> Vec<HooksRegistration> {
        vec![
            // This hook must be registered with all events.
            // However, a task will only receive events to which it subscribed.
            HooksRegistration::new(
                "EventRegistry",
                EventType::values(),
                Arc::downgrade(self) as Weak<dyn Hook>,
            ),
        ]
    }

    /// Register a provider for an synthesized event.
    pub fn register_synthesis_provider(
        &mut self,
        event: EventType,
        provider: Arc<dyn EventSynthesisProvider>,
    ) {
        self.event_synthesizer.register_provider(event, provider);
    }

    /// Subscribes to events of a provided set of EventTypes.
    pub async fn subscribe(
        &self,
        options: &SubscriptionOptions,
        subscriptions: Vec<EventSubscription>,
    ) -> Result<EventStream, ModelError> {
        // Register event capabilities if any. It identifies the sources of these events (might be
        // the parent or this component itself). It consturcts an "allow-list tree" of events and
        // component instances.
        let mut event_names = HashMap::new();
        for subscription in subscriptions {
            if event_names
                .insert(subscription.event_name.clone(), subscription.mode.clone())
                .is_some()
            {
                return Err(EventsError::duplicate_event(subscription.event_name).into());
            }
        }
        let events = match &options.subscription_type {
            SubscriptionType::AboveRoot => event_names
                .iter()
                .map(|(source_name, mode)| RoutedEvent {
                    source_name: source_name.clone(),
                    mode: mode.clone(),
                    scopes: vec![
                        EventDispatcherScope::new(AbsoluteMoniker::root().into()).for_debug()
                    ],
                })
                .collect(),
            SubscriptionType::Component(target_moniker) => {
                let route_result = self.route_events(&target_moniker, &event_names).await?;
                if route_result.len() != event_names.len() {
                    let names = event_names
                        .keys()
                        .into_iter()
                        .filter(|event_name| !route_result.contains_event(&event_name))
                        .cloned()
                        .collect();
                    return Err(EventsError::not_available(names).into());
                }
                route_result.to_vec()
            }
        };

        self.subscribe_with_routed_events(&options, events).await
    }

    pub async fn subscribe_with_routed_events(
        &self,
        options: &SubscriptionOptions,
        events: Vec<RoutedEvent>,
    ) -> Result<EventStream, ModelError> {
        // TODO(fxbug.dev/48510): get rid of this channel and use FIDL directly.
        let mut event_stream = EventStream::new();

        let mut dispatcher_map = self.dispatcher_map.lock().await;
        for event in &events {
            if EventType::synthesized_only()
                .iter()
                .all(|e| e.to_string() != event.source_name.str())
            {
                let dispatchers = dispatcher_map.entry(event.source_name.clone()).or_insert(vec![]);
                let dispatcher = event_stream.create_dispatcher(
                    options.clone(),
                    event.mode.clone(),
                    event.scopes.clone(),
                );
                dispatchers.push(dispatcher);
            }
        }

        let events = events.into_iter().map(|event| (event.source_name, event.scopes)).collect();
        self.event_synthesizer.spawn_synthesis(event_stream.sender(), events);

        Ok(event_stream)
    }

    // TODO(fxbug.dev/48510): get rid of this
    /// Sends the event to all dispatchers and waits to be unblocked by all
    async fn dispatch(&self, event: &ComponentEvent) -> Result<(), ModelError> {
        // Copy the senders so we don't hold onto the sender map lock
        // If we didn't do this, it is possible to deadlock while holding onto this lock.
        // For example,
        // Task A : call dispatch(event1) -> lock on sender map -> send -> wait for responders
        // Task B : call dispatch(event2) -> lock on sender map
        // If task B was required to respond to event1, then this is a deadlock.
        // Neither task can make progress.
        let dispatchers = {
            let mut dispatcher_map = self.dispatcher_map.lock().await;
            if let Some(dispatchers) = dispatcher_map.get_mut(&event.event_type().into()) {
                let mut strong_dispatchers = vec![];
                dispatchers.retain(|dispatcher| {
                    if let Some(dispatcher) = dispatcher.upgrade() {
                        strong_dispatchers.push(dispatcher);
                        true
                    } else {
                        false
                    }
                });
                strong_dispatchers
            } else {
                // There were no senders for this event. Do nothing.
                return Ok(());
            }
        };

        let mut responder_channels = vec![];
        for dispatcher in dispatchers {
            let result = dispatcher.dispatch(event).await;
            match result {
                Ok(Some(responder_channel)) => {
                    // A future can be canceled if the EventStream was dropped after
                    // a send. We don't crash the system when this happens. It is
                    // perfectly valid for a EventStream to be dropped. That simply
                    // means that the EventStream is no longer interested in future
                    // events. So we force each future to return a success. This
                    // ensures that all the futures can be driven to completion.
                    let responder_channel = async move {
                        trace::duration!("component_manager", "events:wait_for_resume");
                        let _ = responder_channel.await;
                        trace::flow_end!("component_manager", "event", event.id);
                    };
                    responder_channels.push(responder_channel);
                }
                // There's nothing to do if event is outside the scope of the given
                // `EventDispatcher`.
                Ok(None) => (),
                Err(_) => {
                    // A send can fail if the EventStream was dropped. We don't
                    // crash the system when this happens. It is perfectly
                    // valid for a EventStream to be dropped. That simply means
                    // that the EventStream is no longer interested in future
                    // events.
                }
            }
        }

        // Wait until all tasks have used the responder to unblock.
        {
            trace::duration!("component_manager", "events:wait_for_all_resume");
            futures::future::join_all(responder_channels).await;
        }

        Ok(())
    }

    pub async fn route_events(
        &self,
        target_moniker: &AbsoluteMoniker,
        events: &HashMap<CapabilityName, EventMode>,
    ) -> Result<RouteEventsResult, ModelError> {
        let model = self.model.upgrade().ok_or(ModelError::ModelNotAvailable)?;
        let component = model.look_up(&target_moniker).await?;
        let decl = {
            let state = component.lock_state().await;
            match *state {
                InstanceState::New | InstanceState::Discovered => {
                    panic!("route_events: not resolved");
                }
                InstanceState::Resolved(ref s) => s.decl().clone(),
                InstanceState::Destroyed => {
                    return Err(ModelError::instance_not_found(target_moniker.clone()));
                }
            }
        };

        let mut result = RouteEventsResult::new();
        for use_decl in decl.uses {
            match use_decl {
                UseDecl::Event(event_decl) => {
                    if let Some(mode) = events.get(&event_decl.target_name) {
                        let (source_name, scope_moniker) =
                            Self::route_event(event_decl.clone(), &component).await?;
                        let scope = EventDispatcherScope::new(scope_moniker)
                            .with_filter(EventFilter::new(event_decl.filter))
                            .with_mode_set(EventModeSet::new(event_decl.mode));
                        if scope.mode_set.supports_mode(mode) {
                            result.insert(source_name, mode.clone(), scope);
                        }
                    }
                }
                _ => {}
            }
        }

        Ok(result)
    }

    /// Routes an event and returns its source name and scope on success.
    async fn route_event(
        event_decl: UseEventDecl,
        component: &Arc<ComponentInstance>,
    ) -> Result<(CapabilityName, ExtendedMoniker), ModelError> {
        let route_source = route_capability(RouteRequest::UseEvent(event_decl), component).await?;
        match route_source {
            RouteSource::Event(CapabilitySource::Framework {
                capability: InternalCapability::Event(source_name),
                component,
            }) => Ok((source_name, component.moniker.into())),
            RouteSource::Event(CapabilitySource::Builtin {
                capability: InternalCapability::Event(source_name),
                ..
            }) if source_name == "capability_ready".into() => {
                Ok((source_name, ExtendedMoniker::ComponentManager))
            }
            _ => unreachable!(),
        }
    }

    #[cfg(test)]
    async fn dispatchers_per_event_type(&self, event_type: EventType) -> usize {
        let dispatcher_map = self.dispatcher_map.lock().await;
        dispatcher_map
            .get(&event_type.into())
            .map(|dispatchers| dispatchers.len())
            .unwrap_or_default()
    }
}

#[async_trait]
impl Hook for EventRegistry {
    async fn on(self: Arc<Self>, event: &ComponentEvent) -> Result<(), ModelError> {
        match &event.result {
            Ok(EventPayload::CapabilityRouted { source, .. }) => {
                // Only dispatch the CapabilityRouted event for capabilities
                // that can be in a component's namespace.
                // TODO(fxbug.dev/54251): In the future, if we wish to be able to mock or
                // interpose runners, we can introduce, a new, separate event
                // type.
                if source.can_be_in_namespace() {
                    return self.dispatch(event).await;
                }
                return Ok(());
            }
            _ => self.dispatch(event).await,
        }
    }
}

#[cfg(test)]
mod tests {
    use {
        super::*,
        crate::{
            capability::ComponentCapability,
            model::{
                component::ComponentInstance,
                environment::Environment,
                events::event::Event,
                hooks::{Event as ComponentEvent, EventError, EventErrorPayload, EventPayload},
                testing::test_helpers::{TestModelResult, *},
            },
        },
        ::routing::{
            component_instance::ComponentInstanceInterface, error::ComponentInstanceError,
        },
        cm_rust::ProtocolDecl,
        fuchsia_async as fasync, fuchsia_zircon as zx,
        matches::assert_matches,
        moniker::AbsoluteMoniker,
    };

    async fn dispatch_capability_requested_event(
        registry: &EventRegistry,
    ) -> Result<(), ModelError> {
        let (_, capability_server_end) = zx::Channel::create().unwrap();
        let capability_server_end = Arc::new(Mutex::new(Some(capability_server_end)));
        let event = ComponentEvent::new_for_test(
            AbsoluteMoniker::root(),
            "fuchsia-pkg://root",
            Ok(EventPayload::CapabilityRequested {
                source_moniker: AbsoluteMoniker::root(),
                name: "foo".to_string(),
                capability: capability_server_end,
            }),
        );
        registry.dispatch(&event).await
    }

    async fn dispatch_fake_event(registry: &EventRegistry) -> Result<(), ModelError> {
        let event = ComponentEvent::new_for_test(
            AbsoluteMoniker::root(),
            "fuchsia-pkg://root",
            Ok(EventPayload::Discovered),
        );
        registry.dispatch(&event).await
    }

    async fn dispatch_error_event(registry: &EventRegistry) -> Result<(), ModelError> {
        let root = AbsoluteMoniker::root();
        let event = ComponentEvent::new_for_test(
            root.clone(),
            "fuchsia-pkg://root",
            Err(EventError::new(
                &ModelError::instance_not_found(root.clone()),
                EventErrorPayload::Resolved,
            )),
        );
        registry.dispatch(&event).await
    }

    #[fuchsia::test]
    async fn capability_routed_dispatch() -> Result<(), ModelError> {
        let TestModelResult { model, .. } = TestEnvironmentBuilder::new().build().await;
        let registry = EventRegistry::new(Arc::downgrade(&model));
        let mut event_stream = registry
            .subscribe(
                &SubscriptionOptions::new(SubscriptionType::AboveRoot, ExecutionMode::Debug),
                vec![EventSubscription::new(EventType::CapabilityRouted.into(), EventMode::Sync)],
            )
            .await
            .expect("subscribe succeeds");
        assert_eq!(1, registry.dispatchers_per_event_type(EventType::CapabilityRouted).await);

        let component = ComponentInstance::new_root(
            Environment::empty(),
            Weak::new(),
            Weak::new(),
            "test:///root".to_string(),
        );
        let capability = ComponentCapability::Protocol(ProtocolDecl {
            name: "foo".into(),
            source_path: "/svc/foo".parse().unwrap(),
        });
        let source = CapabilitySource::Component {
            capability: capability.clone(),
            component: component.as_weak(),
        };
        let capability_provider = Arc::new(Mutex::new(None));
        let event = ComponentEvent::new_for_test(
            AbsoluteMoniker::root(),
            "fuchsia-pkg://root",
            Ok(EventPayload::CapabilityRouted { source: source.clone(), capability_provider }),
        );
        fasync::Task::spawn(async move {
            registry.dispatch(&event).await.expect("failed dispatch");
        })
        .detach();

        let event = event_stream.next().await.expect("null event");
        assert_matches!(event, Event {
            event: ComponentEvent {
                result: Ok(EventPayload::CapabilityRouted {
                    source: CapabilitySource::Component {
                        capability, ..
                    },
                    ..
                }),
                ..
            },
            scope_moniker,
            ..
        } if capability == capability && scope_moniker == AbsoluteMoniker::root().into());

        Ok(())
    }

    #[fuchsia::test]
    async fn drop_dispatcher_when_event_stream_dropped() {
        let TestModelResult { model, .. } = TestEnvironmentBuilder::new().build().await;
        let event_registry = EventRegistry::new(Arc::downgrade(&model));

        assert_eq!(0, event_registry.dispatchers_per_event_type(EventType::Discovered).await);

        let mut event_stream_a = event_registry
            .subscribe(
                &SubscriptionOptions::new(SubscriptionType::AboveRoot, ExecutionMode::Production),
                vec![EventSubscription::new(EventType::Discovered.into(), EventMode::Async)],
            )
            .await
            .expect("subscribe succeeds");

        assert_eq!(1, event_registry.dispatchers_per_event_type(EventType::Discovered).await);

        let mut event_stream_b = event_registry
            .subscribe(
                &SubscriptionOptions::new(SubscriptionType::AboveRoot, ExecutionMode::Production),
                vec![EventSubscription::new(EventType::Discovered.into(), EventMode::Async)],
            )
            .await
            .expect("subscribe succeeds");

        assert_eq!(2, event_registry.dispatchers_per_event_type(EventType::Discovered).await);

        dispatch_fake_event(&event_registry).await.unwrap();

        // Verify that both EventStreams receive the event.
        assert!(event_stream_a.next().await.is_some());
        assert!(event_stream_b.next().await.is_some());
        assert_eq!(2, event_registry.dispatchers_per_event_type(EventType::Discovered).await);

        drop(event_stream_a);

        // EventRegistry won't drop EventDispatchers until an event is dispatched.
        assert_eq!(2, event_registry.dispatchers_per_event_type(EventType::Discovered).await);

        dispatch_fake_event(&event_registry).await.unwrap();

        assert!(event_stream_b.next().await.is_some());
        assert_eq!(1, event_registry.dispatchers_per_event_type(EventType::Discovered).await);

        drop(event_stream_b);

        dispatch_fake_event(&event_registry).await.unwrap();
        assert_eq!(0, event_registry.dispatchers_per_event_type(EventType::Discovered).await);
    }

    #[fuchsia::test]
    async fn event_error_dispatch() {
        let TestModelResult { model, .. } = TestEnvironmentBuilder::new().build().await;
        let event_registry = EventRegistry::new(Arc::downgrade(&model));

        assert_eq!(0, event_registry.dispatchers_per_event_type(EventType::Resolved).await);

        let mut event_stream = event_registry
            .subscribe(
                &SubscriptionOptions::new(SubscriptionType::AboveRoot, ExecutionMode::Production),
                vec![EventSubscription::new(EventType::Resolved.into(), EventMode::Async)],
            )
            .await
            .expect("subscribed to event stream");

        assert_eq!(1, event_registry.dispatchers_per_event_type(EventType::Resolved).await);

        dispatch_error_event(&event_registry).await.unwrap();

        let event = event_stream.next().await.map(|e| e.event).unwrap();

        // Verify that we received the event error.
        assert_matches!(
            event.result,
            Err(EventError {
                source: ModelError::ComponentInstanceError {
                    err: ComponentInstanceError::InstanceNotFound { .. }
                },
                event_error_payload: EventErrorPayload::Resolved,
            })
        );
    }

    #[fuchsia::test]
    async fn capability_requested_over_two_event_streams() {
        let TestModelResult { model, .. } = TestEnvironmentBuilder::new().build().await;
        let event_registry = EventRegistry::new(Arc::downgrade(&model));

        assert_eq!(
            0,
            event_registry.dispatchers_per_event_type(EventType::CapabilityRequested).await
        );

        let options =
            SubscriptionOptions::new(SubscriptionType::AboveRoot, ExecutionMode::Production);

        let mut event_stream_a = event_registry
            .subscribe(
                &options,
                vec![EventSubscription::new(
                    EventType::CapabilityRequested.into(),
                    EventMode::Async,
                )],
            )
            .await
            .expect("subscribe succeeds");

        assert_eq!(
            1,
            event_registry.dispatchers_per_event_type(EventType::CapabilityRequested).await
        );

        let mut event_stream_b = event_registry
            .subscribe(
                &options,
                vec![EventSubscription::new(
                    EventType::CapabilityRequested.into(),
                    EventMode::Async,
                )],
            )
            .await
            .expect("subscribe succeeds");

        assert_eq!(
            2,
            event_registry.dispatchers_per_event_type(EventType::CapabilityRequested).await
        );

        dispatch_capability_requested_event(&event_registry).await.unwrap();

        let event_a = event_stream_a.next().await.map(|e| e.event).unwrap();

        // Verify that we received a valid CapabilityRequested event.
        assert_matches!(event_a.result, Ok(EventPayload::CapabilityRequested { .. }));

        let event_b = event_stream_b.next().await.map(|e| e.event).unwrap();

        // Verify that we received the event error.
        assert_matches!(
            event_b.result,
            Err(EventError {
                event_error_payload: EventErrorPayload::CapabilityRequested { .. },
                ..
            })
        );
    }
}
