// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

use {
    anyhow::{format_err, Error},
    bt_rfcomm::{profile::build_rfcomm_protocol, ServerChannel},
    fidl_fuchsia_bluetooth::ErrorCode,
    fidl_fuchsia_bluetooth_bredr as bredr, fuchsia_async as fasync,
    fuchsia_bluetooth::{
        detachable_map::DetachableMap,
        types::{Channel, PeerId},
    },
    fuchsia_inspect as inspect,
    fuchsia_inspect_derive::{AttachError, Inspect},
    futures::{lock::Mutex, FutureExt},
    log::{info, trace},
    std::{
        collections::{HashMap, HashSet},
        convert::TryFrom,
        sync::Arc,
    },
};

use crate::rfcomm::{session::Session, types::SignaledTask};

/// Manages the current clients of the RFCOMM server. Provides an API for
/// registering, unregistering, and relaying RFCOMM channels to clients.
pub struct Clients {
    /// The currently registered clients. Each registered client is identified
    /// by a unique ServerChannel.
    channel_receivers: Mutex<HashMap<ServerChannel, bredr::ConnectionReceiverProxy>>,
}

impl Clients {
    pub fn new() -> Self {
        Self { channel_receivers: Mutex::new(HashMap::new()) }
    }

    /// Returns the number of available spaces for clients that can be registered.
    async fn available_space(&self) -> usize {
        let server_channels = self.channel_receivers.lock().await;
        ServerChannel::all().filter(|sc| !server_channels.contains_key(&sc)).count()
    }

    /// Removes the client that has registered `server_channel`.
    async fn remove(&self, server_channel: &ServerChannel) {
        self.channel_receivers.lock().await.remove(server_channel);
    }

    /// Clears all the registered clients.
    async fn clear(&self) {
        self.channel_receivers.lock().await.clear();
    }

    /// Reserves the next available ServerChannel for a client represented by a `proxy`.
    ///
    /// If allocated, returns the ServerChannel assigned to the client, None otherwise.
    pub async fn new_client(&self, proxy: bredr::ConnectionReceiverProxy) -> Option<ServerChannel> {
        let mut server_channels = self.channel_receivers.lock().await;
        let new_channel = ServerChannel::all().find(|sc| !server_channels.contains_key(&sc));
        new_channel.map(|channel| {
            trace!("Allocating {:?}", channel);
            server_channels.insert(channel, proxy);
            channel
        })
    }

    /// Delivers the `channel` to the client that has registered the `server_channel`.
    /// Returns an error if delivery fails or if there is no such client.
    pub async fn deliver_channel(
        &self,
        peer_id: PeerId,
        server_channel: ServerChannel,
        channel: Channel,
    ) -> Result<(), Error> {
        let clients = self.channel_receivers.lock().await;
        let client = clients
            .get(&server_channel)
            .ok_or(format_err!("ServerChannel {:?} not registered", server_channel))?;
        // Build the RFCOMM protocol descriptor and relay the channel.
        let mut protocol: Vec<bredr::ProtocolDescriptor> =
            build_rfcomm_protocol(server_channel).iter().map(Into::into).collect();
        client
            .connected(
                &mut peer_id.into(),
                bredr::Channel::try_from(channel).unwrap(),
                &mut protocol.iter_mut(),
            )
            .map_err(|e| format_err!("{:?}", e))
    }
}

/// The RfcommServer handles connection requests from profiles clients and remote peers.
pub struct RfcommServer {
    /// The currently registered profile clients of the RFCOMM server.
    clients: Arc<Clients>,

    /// Active sessions between us and a remote peer. Each Session will multiplex
    /// RFCOMM connections over a single L2CAP channel.
    /// There can only be one session per remote peer. See RFCOMM Section 5.2.
    sessions: DetachableMap<PeerId, Session>,

    /// Inspect node for Sessions to attach to.
    inspect: inspect::Node,
}

impl Inspect for &mut RfcommServer {
    fn iattach(self, parent: &inspect::Node, name: impl AsRef<str>) -> Result<(), AttachError> {
        self.inspect = parent.create_child(name);
        Ok(())
    }
}

impl RfcommServer {
    pub fn new() -> Self {
        Self {
            clients: Arc::new(Clients::new()),
            sessions: DetachableMap::new(),
            inspect: inspect::Node::default(),
        }
    }

    /// Returns true if a session identified by `id` exists and is currently
    /// active.
    /// An RFCOMM Session is active if there is a currently running processing task.
    pub fn is_active_session(&mut self, id: &PeerId) -> bool {
        self.sessions.get(id).map_or(false, |session| session.upgrade().is_some())
    }

    /// Returns the number of available server channels in this server.
    pub async fn available_server_channels(&self) -> usize {
        self.clients.available_space().await
    }

    /// De-allocates the server `channels` provided.
    pub async fn free_server_channels(&mut self, channels: &HashSet<ServerChannel>) {
        for sc in channels {
            self.clients.remove(sc).await;
        }
    }

    /// De-allocates all the server channels in this server.
    pub async fn free_all_server_channels(&mut self) {
        self.clients.clear().await;
    }

    /// Reserves the next available ServerChannel for a client's `proxy`.
    ///
    /// Returns the allocated ServerChannel.
    pub async fn allocate_server_channel(
        &mut self,
        proxy: bredr::ConnectionReceiverProxy,
    ) -> Option<ServerChannel> {
        self.clients.new_client(proxy).await
    }

    /// Opens an RFCOMM channel specified by `server_channel` with the remote peer.
    ///
    /// Returns an error if there is no session established with the peer.
    pub async fn open_rfcomm_channel(
        &mut self,
        id: PeerId,
        server_channel: ServerChannel,
        responder: bredr::ProfileConnectResponder,
    ) -> Result<(), Error> {
        trace!("Received request to open RFCOMM channel {:?} with peer {:?}", server_channel, id);

        match self.sessions.get(&id).and_then(|s| s.upgrade()) {
            None => {
                // Peer either disconnected or doesn't exist.
                let _ = responder.send(&mut Err(ErrorCode::Failed));
                Err(format_err!("Invalid peer ID {:?}", id))
            }
            Some(session) => {
                let channel_opened_callback =
                    Box::new(move |channel: Result<Channel, ErrorCode>| {
                        let mut channel = channel.map(|c| bredr::Channel::try_from(c).unwrap());
                        responder.send(&mut channel).map_err(|e| format_err!("{:?}", e))
                    });
                session.open_rfcomm_channel(server_channel, channel_opened_callback).await;
                Ok(())
            }
        }
    }

    /// Handles an incoming L2CAP connection from the remote peer.
    ///
    /// If there is already an active session established with this peer, returns an Error
    /// as there can only be one active session per peer.
    /// Otherwise, creates and stores a new session over the provided `l2cap` channel.
    pub fn new_l2cap_connection(&mut self, id: PeerId, l2cap: Channel) -> Result<(), Error> {
        if self.is_active_session(&id) {
            return Err(format_err!("RFCOMM Session already exists with peer {:?}", id));
        }
        info!("Received new l2cap connection from peer {:?}", id);

        // Create a new RFCOMM Session with the provided `channel_opened_callback` which will be
        // called anytime an RFCOMM channel is created. Opened RFCOMM channels will be delivered
        // to the `clients` of the `RfcommServer`.
        let clients = self.clients.clone();
        let channel_opened_callback = Box::new(move |server_channel, channel| {
            let peer_id = id;
            let clients = clients.clone();
            async move { clients.deliver_channel(peer_id, server_channel, channel).await }.boxed()
        });
        let mut session = Session::create(id, l2cap, channel_opened_callback);
        let _ = session.iattach(&self.inspect, inspect::unique_name("peer_"));
        let closed_fut = session.finished();
        self.sessions.insert(id, session);

        // Task eagerly removes the Session from the set of active sessions upon termination.
        let detached_session = self.sessions.get(&id).expect("just inserted");
        fasync::Task::spawn(async move {
            let _ = closed_fut.await;
            detached_session.detach();
        })
        .detach();

        Ok(())
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use {
        bt_rfcomm::{frame::mux_commands::*, frame::*, Role, DLCI},
        fidl::{
            encoding::Decodable,
            endpoints::{create_proxy, create_proxy_and_stream},
        },
        fidl_fuchsia_bluetooth_bredr::ConnectionReceiverMarker,
        fuchsia_async as fasync,
        fuchsia_bluetooth::types::Channel,
        futures::{pin_mut, task::Poll, AsyncWriteExt, StreamExt},
        matches::assert_matches,
    };

    use crate::rfcomm::test_util::{expect_frame_received_by_peer, send_peer_frame};

    fn setup_rfcomm_manager() -> (fasync::Executor, RfcommServer) {
        let exec = fasync::Executor::new().unwrap();
        let rfcomm = RfcommServer::new();
        (exec, rfcomm)
    }

    #[fasync::run_singlethreaded(test)]
    async fn test_allocate_server_channel() {
        let mut rfcomm = RfcommServer::new();

        let expected_free_channels = ServerChannel::all().count();
        assert_eq!(rfcomm.available_server_channels().await, expected_free_channels);

        // Allocating a server channel should be OK.
        let (c, _s) = create_proxy::<ConnectionReceiverMarker>().unwrap();
        let first_channel =
            rfcomm.allocate_server_channel(c.clone()).await.expect("should allocate");

        // Allocate the remaining n-1 channels.
        let mut n = expected_free_channels - 1;
        while n > 0 {
            assert!(rfcomm.allocate_server_channel(c.clone()).await.is_some());
            n -= 1;
        }

        // Allocating another should fail.
        assert_eq!(rfcomm.available_server_channels().await, 0);
        assert!(rfcomm.allocate_server_channel(c.clone()).await.is_none());

        // De-allocating should work.
        let mut single_channel = HashSet::new();
        single_channel.insert(first_channel);
        rfcomm.free_server_channels(&single_channel).await;

        // We should be able to allocate another now that space has freed.
        let (c, _s) = create_proxy::<ConnectionReceiverMarker>().unwrap();
        assert!(rfcomm.allocate_server_channel(c).await.is_some());
    }

    #[test]
    fn test_new_l2cap_connection() {
        let (mut exec, mut rfcomm) = setup_rfcomm_manager();

        let id = PeerId(123);
        let (mut remote, channel) = Channel::create();
        assert!(rfcomm.new_l2cap_connection(id, channel).is_ok());

        // The Session should still be active.
        assert!(rfcomm.is_active_session(&id));

        // Simulate peer sending RFCOMM data to the session - should be OK.
        let buf = [0x00, 0x00, 0x00];
        let mut write_fut = remote.write(&buf[..]);
        match exec.run_until_stalled(&mut write_fut) {
            Poll::Ready(Ok(x)) => {
                assert_eq!(x, 3);
            }
            x => panic!("Expected write ready but got {:?}", x),
        }

        // Remote peer disconnects - drive the background processing task to detect disconnection.
        drop(remote);
        let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());

        // The session should be inactive now.
        assert!(!rfcomm.is_active_session(&id));
        // Checking again is OK.
        assert!(!rfcomm.is_active_session(&id));
    }

    #[test]
    fn test_new_rfcomm_channel_is_relayed_to_client() {
        let (mut exec, mut rfcomm) = setup_rfcomm_manager();

        // Profile-client reserves a server channel.
        let (c, mut s) = create_proxy_and_stream::<ConnectionReceiverMarker>().unwrap();
        let first_channel = {
            let fut = rfcomm.allocate_server_channel(c.clone());
            pin_mut!(fut);
            match exec.run_until_stalled(&mut fut) {
                Poll::Ready(Some(sc)) => sc,
                x => panic!("Expected server channel but got {:?}", x),
            }
        };

        let profile_client_fut = s.next();
        pin_mut!(profile_client_fut);
        assert!(exec.run_until_stalled(&mut profile_client_fut).is_pending());

        // Start up a session with remote peer.
        let id = PeerId(1);
        let (local, mut remote) = Channel::create();
        assert!(rfcomm.new_l2cap_connection(id, local).is_ok());
        assert!(rfcomm.is_active_session(&id));

        // Remote peer requests to start up session multiplexer.
        let sabm = Frame::make_sabm_command(Role::Unassigned, DLCI::MUX_CONTROL_DLCI);
        send_peer_frame(remote.as_ref(), sabm);
        // Expect to send a positive response to the peer.
        expect_frame_received_by_peer(&mut exec, &mut remote);

        // Remote peer requests to open an RFCOMM channel.
        let user_dlci = first_channel.to_dlci(Role::Responder).unwrap();
        let user_sabm = Frame::make_sabm_command(Role::Initiator, user_dlci);
        send_peer_frame(remote.as_ref(), user_sabm);
        // Expect to send a positive response to the peer.
        expect_frame_received_by_peer(&mut exec, &mut remote);

        // The Session should open a new RFCOMM channel for the provided `user_dlci`, and
        // the Channel should be relayed to the profile client.
        match exec.run_until_stalled(&mut profile_client_fut) {
            Poll::Ready(Some(Ok(bredr::ConnectionReceiverRequest::Connected { .. }))) => {}
            x => panic!("Expected connection but got {:?}", x),
        }
    }

    #[fasync::run_singlethreaded(test)]
    async fn test_register_and_deliver_inbound_channel_to_clients() {
        let clients = Clients::new();

        // Initial capacity is the range of all valid Server Channels (1..30).
        let mut expected_space = 30;
        assert_eq!(clients.available_space().await, expected_space);

        // Attempting to deliver an inbound channel for an unregistered ServerChannel should be
        // an error.
        let random_server_channel = ServerChannel::try_from(10).unwrap();
        let (local, _remote) = Channel::create();
        assert!(clients.deliver_channel(PeerId(1), random_server_channel, local).await.is_err());

        // Registering a new client should be OK.
        let (c, s) = create_proxy_and_stream::<bredr::ConnectionReceiverMarker>().unwrap();
        let server_channel = clients.new_client(c).await.unwrap();
        expected_space -= 1;
        assert_eq!(clients.available_space().await, expected_space);

        // Delivering channel to registered client should be OK.
        let (local, _remote) = Channel::create();
        assert!(clients.deliver_channel(PeerId(1), server_channel, local).await.is_ok());

        // Client disconnects - delivering a new channel should fail.
        drop(s);
        let (local, _remote) = Channel::create();
        assert!(clients.deliver_channel(PeerId(1), server_channel, local).await.is_err());
    }

    /// Makes a client Profile::Connect() request and returns the responder for the request
    /// and a Future associated with the request.
    #[track_caller]
    fn make_client_connect_request(
        exec: &mut fasync::Executor,
        id: PeerId,
    ) -> (
        bredr::ProfileConnectResponder,
        fidl::client::QueryResponseFut<Result<bredr::Channel, ErrorCode>>,
    ) {
        let (profile, mut profile_server) =
            create_proxy_and_stream::<bredr::ProfileMarker>().unwrap();
        let mut profile_stream = Box::pin(profile_server.next());
        let connect_request =
            profile.connect(&mut id.into(), &mut bredr::ConnectParameters::new_empty());
        let responder = match exec.run_until_stalled(&mut profile_stream) {
            Poll::Ready(Some(Ok(bredr::ProfileRequest::Connect { responder, .. }))) => responder,
            x => panic!("Expected ready connect request but got: {:?}", x),
        };
        (responder, connect_request)
    }

    #[test]
    fn test_request_outbound_connection_succeeds() {
        let (mut exec, mut rfcomm) = setup_rfcomm_manager();

        // Start up a session with remote peer.
        let id = PeerId(1);
        let (local, mut remote) = Channel::create();
        assert!(rfcomm.new_l2cap_connection(id, local).is_ok());

        // Simulate a client connect request.
        let (responder, connect_request_fut) = make_client_connect_request(&mut exec, id);
        pin_mut!(connect_request_fut);
        assert!(exec.run_until_stalled(&mut connect_request_fut).is_pending());
        // We expect the open channel request to be OK - still awaiting the channel.
        let server_channel = ServerChannel::try_from(9).unwrap();
        let expected_dlci = server_channel.to_dlci(Role::Responder).unwrap();
        let mut outbound_fut = Box::pin(rfcomm.open_rfcomm_channel(id, server_channel, responder));
        assert_matches!(exec.run_until_stalled(&mut outbound_fut), Poll::Ready(Ok(_)));
        assert!(exec.run_until_stalled(&mut connect_request_fut).is_pending());

        // Expect to send a frame to the peer - SABM for mux startup.
        expect_frame_received_by_peer(&mut exec, &mut remote);
        // Simulate peer responding positively.
        let ua = Frame::make_ua_response(Role::Unassigned, DLCI::MUX_CONTROL_DLCI);
        send_peer_frame(remote.as_ref(), ua);

        // Expect to send a frame to peer - parameter negotiation.
        expect_frame_received_by_peer(&mut exec, &mut remote);
        // Simulate peer responding positively.
        let data = MuxCommand {
            params: MuxCommandParams::ParameterNegotiation(ParameterNegotiationParams {
                dlci: expected_dlci,
                credit_based_flow_handshake: CreditBasedFlowHandshake::SupportedResponse,
                priority: 12,
                max_frame_size: 100,
                initial_credits: 1,
            }),
            command_response: CommandResponse::Response,
        };
        let pn_response = Frame::make_mux_command(Role::Responder, data);
        send_peer_frame(remote.as_ref(), pn_response);

        // Expect to send a frame to peer - SABM for channel opening.
        expect_frame_received_by_peer(&mut exec, &mut remote);
        // Simulate peer responding positively.
        let ua = Frame::make_ua_response(Role::Responder, expected_dlci);
        send_peer_frame(remote.as_ref(), ua);

        // The channel should be established and relayed to the client that requested it.
        assert_matches!(exec.run_until_stalled(&mut connect_request_fut), Poll::Ready(Ok(Ok(_))));
    }

    #[test]
    fn test_request_outbound_connection_invalid_peer() {
        let (mut exec, mut rfcomm) = setup_rfcomm_manager();

        // Simulate a client connect request.
        let random_id = PeerId(41);
        let (responder, connect_request_fut) = make_client_connect_request(&mut exec, random_id);
        pin_mut!(connect_request_fut);
        assert!(exec.run_until_stalled(&mut connect_request_fut).is_pending());

        // We expect the open channel request to fail.
        let server_channel = ServerChannel::try_from(8).unwrap();
        let mut outbound_fut =
            Box::pin(rfcomm.open_rfcomm_channel(random_id, server_channel, responder));
        assert_matches!(exec.run_until_stalled(&mut outbound_fut), Poll::Ready(Err(_)));
        // Responder should be notified of failure.
        assert_matches!(exec.run_until_stalled(&mut connect_request_fut), Poll::Ready(Ok(Err(_))));
    }
}
