// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

use {
    crate::{
        events::{
            CapabilityRouted, Event, EventMode, EventSource, EventStream, EventStreamError,
            EventSubscription, RoutingProtocol,
        },
        matcher::EventMatcher,
    },
    anyhow::Error,
    async_trait::async_trait,
    fidl::endpoints::{create_request_stream, ClientEnd, ServerEnd, ServiceMarker},
    fidl_fuchsia_io as fio, fidl_fuchsia_sys2 as fsys, fuchsia_async as fasync,
    futures::{
        future::{AbortHandle, Abortable, TryFutureExt},
        StreamExt,
    },
    log::warn,
    std::sync::Arc,
};

/// A ProtocolInterposer allows a test to sit between a service and a client
/// and mutate or silently observe messages being passed between them.
///
/// Client <---> ProtocolInterposer <---> Server
#[async_trait]
pub trait ProtocolInterposer: 'static + Send + Sync {
    type Marker: ServiceMarker;

    async fn interpose(
        self: Arc<Self>,
        event_source: &EventSource,
        matcher: EventMatcher,
    ) -> AbortHandle {
        let matcher = matcher.capability_name(Self::Marker::NAME);
        let server_end = event_source
            .subscribe_endpoint(vec![EventSubscription::new(
                vec![CapabilityRouted::NAME],
                EventMode::Sync,
            )])
            .await
            .expect("Could not subscribe to CapabilityRouted event for interposition");
        let (abort_handle, abort_registration) = AbortHandle::new_pair();

        // Spawn a new thread to listen to CapabilityRoutedEvents.
        // We use a new thread here because running this on the main thread may
        // not work if a test writer needs to do blocking operations.
        fasync::Task::blocking(
            Abortable::new(
                async move {
                    let mut event_stream = EventStream::new(
                        server_end
                            .into_stream()
                            .expect("Could not create EventStream from ServerEnd"),
                    );
                    loop {
                        // Wait for a capability routed event that matches
                        let event =
                            match matcher.clone().wait::<CapabilityRouted>(&mut event_stream).await
                            {
                                Ok(e) => e,
                                Err(e) => match e.downcast::<EventStreamError>() {
                                    Ok(EventStreamError::StreamClosed) => return,
                                    Err(e) => panic!("Unknown error! {:?}", e),
                                },
                            };

                        // An event was found! Inject the route.
                        if event.is_ok() {
                            let (provider_client_end, server_end) = self.clone().route();
                            event
                                .protocol_proxy()
                                .expect("Event does not have routing protocol")
                                .replace_and_open(provider_client_end, server_end)
                                .await
                                .expect("Could not set provider for CapabilityRouted event");
                        }
                    }
                },
                abort_registration,
            )
            .unwrap_or_else(|_| ()),
        )
        .detach();

        abort_handle
    }

    fn route(self: Arc<Self>) -> (ClientEnd<fsys::CapabilityProviderMarker>, fidl::Channel) {
        // Create the Interposer <---> Server channel
        let (client_end, server_end) = fidl::Channel::create().expect("could not create channel");

        // Create the CapabilityProvider channel
        let (provider_client_end, mut provider_capability_stream) =
            create_request_stream::<fsys::CapabilityProviderMarker>()
                .expect("Could not create request stream for CapabilityProvider");

        // Spawn a task to handle this new route
        fasync::Task::spawn(async move {
            if let Some(Ok(fsys::CapabilityProviderRequest::Open {
                server_end,
                flags,
                mode,
                path,
                responder,
            })) = provider_capability_stream.next().await
            {
                // Unblock component manager
                responder.send().expect("Failed to respond to CapabilityProvider Open");

                if !path.is_empty() {
                    warn!(
                        "Interposed service {} was provided a non-empty path: {}",
                        Self::Marker::NAME,
                        path
                    );
                }

                if flags != (fio::OPEN_RIGHT_READABLE | fio::OPEN_RIGHT_WRITABLE) {
                    warn!(
                        "Interposed service {} was provided unexpected flags: 0x{:x}",
                        Self::Marker::NAME,
                        flags
                    );
                }

                let mode_type = mode & fio::MODE_TYPE_MASK;
                if mode_type != fio::MODE_TYPE_SERVICE {
                    warn!(
                        "Interposed service {} was provided unexpected mode type: 0x{:x}",
                        Self::Marker::NAME,
                        mode_type
                    );
                }

                // Create the proxy for the Interposer <---> Server connection
                let proxy = ClientEnd::<Self::Marker>::new(client_end)
                    .into_proxy()
                    .expect("could not convert into proxy");

                // Create the stream for the Client <---> Interposer connection
                let stream = ServerEnd::<Self::Marker>::new(server_end)
                    .into_stream()
                    .expect("could not convert channel into stream");

                // Start interposing!
                self.serve(stream, proxy).await.expect("Interposition failed");
            }
        })
        .detach();

        (provider_client_end, server_end)
    }

    /// This function will be run asynchronously when a client attempts
    /// to connect to the service being interposed. `from_client` is a stream of
    /// requests coming in from the client and `to_server` is a proxy to the
    /// real server.
    async fn serve(
        self: Arc<Self>,
        mut from_client: <<Self as ProtocolInterposer>::Marker as ServiceMarker>::RequestStream,
        to_server: <<Self as ProtocolInterposer>::Marker as ServiceMarker>::Proxy,
    ) -> Result<(), Error>;
}
