// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

use anyhow::{format_err, Error};
use bt_avdtp as avdtp;
use fidl_fuchsia_bluetooth_bredr::{
    self as bredr, ChannelParameters, ProfileDescriptor, ProfileProxy,
};
use fuchsia_async as fasync;
use fuchsia_bluetooth::{
    detachable_map::{DetachableMap, DetachableWeak},
    inspect::DebugExt,
    types::{Channel, PeerId},
};
use fuchsia_inspect::{self as inspect, NumericProperty, Property};
use fuchsia_inspect_derive::{AttachError, Inspect};
use fuchsia_sync::Mutex;
use fuchsia_zircon as zx;
use futures::{
    channel::{mpsc, oneshot},
    stream::{Stream, StreamExt},
    task::{Context, Poll},
    Future, FutureExt, TryFutureExt,
};
use std::{
    collections::{hash_map::Entry, HashMap, HashSet},
    pin::Pin,
    sync::Arc,
};
use tracing::{info, warn};

use crate::codec::CodecNegotiation;
use crate::stream::StreamsBuilder;
use crate::{peer::Peer, permits::Permits};

/// Statistics node for tracking various information about a peer that has been encountered.
/// Typically used as an inspect tree node.
struct PeerStats {
    id: PeerId,
    inspect_node: inspect::Node,
    /// The number of times that this peer has been successfully connected to since discovery.
    connection_count: inspect::UintProperty,
}

impl PeerStats {
    fn new(id: PeerId) -> Self {
        Self { id, inspect_node: Default::default(), connection_count: Default::default() }
    }

    fn set_descriptor(&mut self, descriptor: &ProfileDescriptor) {
        self.inspect_node.record_string("descriptor", descriptor.debug());
    }

    fn record_connected(&mut self) {
        let _ = self.connection_count.add(1);
    }
}

impl Inspect for &mut PeerStats {
    fn iattach(self, parent: &inspect::Node, name: impl AsRef<str>) -> Result<(), AttachError> {
        self.inspect_node = parent.create_child(name.as_ref());
        self.inspect_node.record_string("id", self.id.to_string());
        self.connection_count = self.inspect_node.create_uint("connection_count", 0);
        Ok(())
    }
}

#[derive(Default)]
struct DiscoveredPeers {
    /// The peers that we have discovered, with their descriptors and potential preferred
    /// endpoint directions. Because the same peer can be discovered multiple times, with
    /// potentially different endpoints, we maintain a set of advertised directions.
    descriptors: HashMap<PeerId, (ProfileDescriptor, HashSet<avdtp::EndpointType>)>,
    /// Holds the child nodes which include the ids and profile descriptors for inspect.
    stats: HashMap<PeerId, PeerStats>,
    /// Inspect node, usually at "discovered" in the tree.
    inspect_node: inspect::Node,
}

impl DiscoveredPeers {
    fn insert(
        &mut self,
        id: PeerId,
        descriptor: ProfileDescriptor,
        directions: HashSet<avdtp::EndpointType>,
    ) {
        match self.descriptors.entry(id) {
            Entry::Occupied(mut entry) => {
                entry.get_mut().0 = descriptor;
                entry.get_mut().1.extend(&directions);
            }
            Entry::Vacant(entry) => {
                let _ = entry.insert((descriptor, directions));
            }
        };
        self.stats
            .entry(id)
            .or_insert({
                let mut new_stats = PeerStats::new(id);
                let _ = new_stats.iattach(&self.inspect_node, inspect::unique_name("peer_"));
                new_stats
            })
            .set_descriptor(&descriptor);
    }

    fn connected(&mut self, id: PeerId) {
        if let Some(stats) = self.stats.get_mut(&id) {
            stats.record_connected();
        }
    }

    /// Returns the descriptor and preferred endpoint direction associated with the peer `id`.
    fn get(&self, id: &PeerId) -> Option<(ProfileDescriptor, Option<avdtp::EndpointType>)> {
        self.descriptors.get(id).map(|(desc, dirs)| (desc.clone(), find_preferred_direction(dirs)))
    }
}

impl Inspect for &mut DiscoveredPeers {
    fn iattach(self, parent: &inspect::Node, name: impl AsRef<str>) -> Result<(), AttachError> {
        self.inspect_node = parent.create_child(name.as_ref());
        Ok(())
    }
}

/// Given a set of endpoint `directions`, returns the preferred direction or None
/// if both Sink and Source are specified.
fn find_preferred_direction(
    directions: &HashSet<avdtp::EndpointType>,
) -> Option<avdtp::EndpointType> {
    if directions.len() == 1 {
        directions.iter().next().cloned()
    } else {
        // Otherwise, either there are no A2DP services or both Sink & Source are specified
        // in which case there is no preferred direction.
        None
    }
}

/// Make an outgoing connection to a peer.
async fn connect_peer(
    proxy: ProfileProxy,
    id: PeerId,
    channel_params: ChannelParameters,
) -> Result<Channel, Error> {
    info!(%id, "Connecting to peer");
    let connect_fut = proxy.connect(
        &id.into(),
        &bredr::ConnectParameters::L2cap(bredr::L2capParameters {
            psm: Some(bredr::PSM_AVDTP),
            parameters: Some(channel_params),
            ..Default::default()
        }),
    );
    let channel = match connect_fut.await {
        Err(e) => {
            warn!(%id, ?e, "FIDL error on connect");
            return Err(e.into());
        }
        Ok(Err(e)) => return Err(format_err!("Bluetooth connect error: {e:?}")),
        Ok(Ok(channel)) => channel,
    };

    let channel = channel
        .try_into()
        .map_err(|e| format_err!("Couldn't convert FIDL to BT channel: {e:?}"))?;
    Ok(channel)
}

/// ConnectedPeers manages the set of connected peers based on discovery, new connection, and
/// peer session lifetime.
pub struct ConnectedPeers {
    /// The set of connected peers.
    connected: DetachableMap<PeerId, Peer>,
    /// Tasks for peers that we are attempting to connect to.
    /// Used to ensure only one outgoing attempt exists at once.
    connection_attempts: Mutex<HashMap<PeerId, fasync::Task<()>>>,
    /// ProfileDescriptors from discovering the peer, stored here even if the peer is disconnected
    discovered: Mutex<DiscoveredPeers>,
    /// Streams builder, provides a set of streams and negotiation when a peer is connected
    streams_builder: StreamsBuilder,
    /// The permits that each peer uses to validate that we can start a stream.
    permits: Permits,
    /// Profile Proxy, used to connect new transport sockets.
    profile: ProfileProxy,
    /// Cobalt logger to use and hand out to peers, if we are using one.
    metrics: bt_metrics::MetricsLogger,
    /// The 'peers' node of the inspect tree. All connected peers own a child node of this node.
    inspect: inspect::Node,
    /// Inspect node for which is the current preferred peer direction.
    inspect_peer_direction: inspect::StringProperty,
    /// Listeners for new connected peers
    connected_peer_senders: Mutex<Vec<mpsc::Sender<DetachableWeak<PeerId, Peer>>>>,
    /// Task handles for newly connected peer stream starts.
    // TODO(https://fxbug.dev/42146917): Completed tasks aren't garbage-collected yet.
    start_stream_tasks: Mutex<HashMap<PeerId, fasync::Task<()>>>,
    /// Preferred direction for new peers.  This is the direction we prefer the peer's endpoint to
    /// be, i.e. if we prefer Sink, locally we are Source.
    preferred_peer_direction: Mutex<avdtp::EndpointType>,
}

impl ConnectedPeers {
    pub fn new(
        streams_builder: StreamsBuilder,
        permits: Permits,
        profile: ProfileProxy,
        metrics: bt_metrics::MetricsLogger,
    ) -> Self {
        Self {
            connected: DetachableMap::new(),
            connection_attempts: Mutex::new(HashMap::new()),
            discovered: Default::default(),
            streams_builder,
            profile,
            permits,
            inspect: inspect::Node::default(),
            inspect_peer_direction: inspect::StringProperty::default(),
            metrics,
            connected_peer_senders: Default::default(),
            start_stream_tasks: Default::default(),
            preferred_peer_direction: Mutex::new(avdtp::EndpointType::Sink),
        }
    }

    pub(crate) fn get_weak(&self, id: &PeerId) -> Option<DetachableWeak<PeerId, Peer>> {
        self.connected.get(id)
    }

    pub(crate) fn get(&self, id: &PeerId) -> Option<Arc<Peer>> {
        self.get_weak(id).and_then(|p| p.upgrade())
    }

    pub fn is_connected(&self, id: &PeerId) -> bool {
        self.connected.contains_key(id)
    }

    /// Attempts to start streaming on `peer` by collecting the remote streaming endpoint
    /// information, selecting a compatible peer using `negotiation` and starting the stream.
    /// Does nothing and returns Ok(()) if the peer is already streaming or will start streaming
    /// on it's own.
    async fn start_streaming(
        peer: &DetachableWeak<PeerId, Peer>,
        negotiation: CodecNegotiation,
    ) -> Result<(), anyhow::Error> {
        let remote_streams = {
            let strong = peer.upgrade().ok_or(format_err!("Disconnected"))?;
            if strong.streaming_active() {
                return Ok(());
            }
            strong.collect_capabilities()
        }
        .await?;

        let (negotiated, remote_seid) =
            negotiation.select(&remote_streams).ok_or(format_err!("No compatible stream found"))?;

        let strong = peer.upgrade().ok_or(format_err!("Disconnected"))?;
        if strong.streaming_active() {
            let peer_id = peer.key();
            info!(%peer_id, "Not starting streaming, it's already started");
            return Ok(());
        }
        strong.stream_start(remote_seid, negotiated).await.map_err(Into::into)
    }

    pub fn found(
        &self,
        id: PeerId,
        desc: ProfileDescriptor,
        preferred_directions: HashSet<avdtp::EndpointType>,
    ) {
        self.discovered.lock().insert(id, desc.clone(), preferred_directions);
        if let Some(peer) = self.get(&id) {
            let _ = peer.set_descriptor(desc);
        }
    }

    pub fn set_preferred_peer_direction(&self, direction: avdtp::EndpointType) {
        *self.preferred_peer_direction.lock() = direction;
        self.inspect_peer_direction.set(&format!("{direction:?}"));
    }

    pub fn preferred_peer_direction(&self) -> avdtp::EndpointType {
        *self.preferred_peer_direction.lock()
    }

    pub fn try_connect(
        &self,
        id: PeerId,
        channel_params: ChannelParameters,
    ) -> impl Future<Output = Result<Option<Channel>, Error>> {
        let proxy = self.profile.clone();
        let connected = self.is_connected(&id);
        let (sender, recv) = oneshot::channel();
        let recv =
            recv.map_ok_or_else(|_e| Err(format_err!("Connection task canceled")), Into::into);
        if connected {
            if let Err(e) = sender.send(Ok(None)) {
                warn!(%id, ?e, "Failed to notify already-connected");
            }
            return recv;
        }
        let mut attempts = self.connection_attempts.lock();
        if let Some(previous_connect_task) = attempts.remove(&id) {
            // We are the only place that can poll the connect task, check if it finished.
            if previous_connect_task.now_or_never().is_none() {
                warn!(%id, "Cancelling previous connect attempt");
            }
        }
        let connect_task = fasync::Task::spawn(async move {
            if let Err(e) = sender.send(connect_peer(proxy, id, channel_params).await.map(Some)) {
                warn!(%id, ?e, "Failed to send channel connect result");
            }
        });
        let _ = attempts.insert(id, connect_task);
        recv
    }

    /// Accept a channel that is connected to the peer `id`.
    /// If `initiator_delay` is set, attempt to start a stream after the specified delay.
    /// `initiator_delay` has no effect if the peer already has a control channel.
    /// Returns a weak peer pointer (even if it was previously connected) if successful.
    pub async fn connected(
        &self,
        id: PeerId,
        channel: Channel,
        initiator_delay: Option<zx::Duration>,
    ) -> Result<DetachableWeak<PeerId, Peer>, Error> {
        if let Some(weak) = self.get_weak(&id) {
            let peer = weak.upgrade().ok_or(format_err!("Disconnected connecting transport"))?;
            if let Err(e) = peer.receive_channel(channel) {
                warn!(%id, %e, "failed to connect channel");
                return Err(e.into());
            }
            return Ok(weak);
        }

        let entry = self.connected.lazy_entry(&id);

        info!(%id, "peer connected");
        let avdtp_peer = avdtp::Peer::new(channel);

        let mut peer = Peer::create(
            id,
            avdtp_peer,
            self.streams_builder.peer_streams(&id, None).await?,
            Some(self.permits.clone()),
            self.profile.clone(),
            self.metrics.clone(),
        );

        self.discovered.lock().connected(id);

        let peer_preferred_direction = if let Some((desc, dir)) = self.discovered.lock().get(&id) {
            let _ = peer.set_descriptor(desc);
            dir
        } else {
            None
        };

        if let Err(e) = peer.iattach(&self.inspect, inspect::unique_name("peer_")) {
            warn!(%id, ?e, "Couldn't attach inspect");
        }

        let closed_fut = peer.closed();
        let peer = match entry.try_insert(peer) {
            Err(_peer) => {
                warn!(%id, "Peer connected while we were setting up");
                return self.get_weak(&id).ok_or(format_err!("Peer missing"));
            }
            Ok(weak_peer) => weak_peer,
        };

        if let Some(delay) = initiator_delay {
            let peer = peer.clone();
            let peer_id = peer.key().clone();
            // Bias the codec negotiation with the peer's preferred direction that was discovered
            // from the SDP service search.
            let negotiation = self
                .streams_builder
                .negotiation(
                    &id,
                    None,
                    peer_preferred_direction.unwrap_or(self.preferred_peer_direction()),
                )
                .await?;
            let start_stream_task = fuchsia_async::Task::local(async move {
                let delay_sec = delay.into_millis() as f64 / 1000.0;
                info!(id = %peer.key(), "dwelling {delay_sec}s for peer initiation");
                fasync::Timer::new(fasync::Time::after(delay)).await;

                if let Err(e) = ConnectedPeers::start_streaming(&peer, negotiation).await {
                    info!(id = %peer.key(), ?e, "Peer start streaming failed");
                    peer.detach();
                }
            });
            if self.start_stream_tasks.lock().insert(peer_id, start_stream_task).is_some() {
                info!(%peer_id, "Replacing previous start stream dwell");
            }
        }

        // Remove the peer when we disconnect.
        fasync::Task::local(async move {
            closed_fut.await;
            peer.detach();
        })
        .detach();

        let peer = self.get_weak(&id).ok_or(format_err!("Peer missing"))?;
        self.notify_connected(&peer);
        Ok(peer)
    }

    /// Notify the listeners that a new peer has been connected to.
    fn notify_connected(&self, peer: &DetachableWeak<PeerId, Peer>) {
        let mut senders = self.connected_peer_senders.lock();
        senders.retain_mut(|sender| sender.try_send(peer.clone()).is_ok());
    }

    /// Get a stream that produces peers that have been connected.
    pub fn connected_stream(&self) -> PeerConnections {
        let (sender, receiver) = mpsc::channel(0);
        self.connected_peer_senders.lock().push(sender);
        PeerConnections { stream: receiver }
    }
}

impl Inspect for &mut ConnectedPeers {
    fn iattach(self, parent: &inspect::Node, name: impl AsRef<str>) -> Result<(), AttachError> {
        self.inspect = parent.create_child(name.as_ref());
        let peer_dir_str = format!("{:?}", self.preferred_peer_direction());
        self.inspect_peer_direction =
            self.inspect.create_string("preferred_peer_direction", peer_dir_str);
        self.streams_builder.iattach(&self.inspect, "streams_builder")?;
        self.discovered.lock().iattach(&self.inspect, "discovered")
    }
}

/// Provides a stream of peers that have been connected to. This stream produces an item whenever
/// an A2DP peer has been connected.  It will produce None when no more peers will be connected.
pub struct PeerConnections {
    stream: mpsc::Receiver<DetachableWeak<PeerId, Peer>>,
}

impl Stream for PeerConnections {
    type Item = DetachableWeak<PeerId, Peer>;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        self.stream.poll_next_unpin(cx)
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    use async_utils::PollExt;
    use bt_avdtp::{Request, ServiceCapability};
    use diagnostics_assertions::assert_data_tree;
    use fidl::endpoints::create_proxy_and_stream;
    use fidl_fuchsia_bluetooth_bredr::{
        AudioOffloadExtProxy, ProfileMarker, ProfileRequestStream, ServiceClassProfileIdentifier,
    };
    use futures::future::BoxFuture;
    use std::pin::pin;

    use crate::codec::MediaCodecConfig;
    use crate::media_task::{MediaTaskBuilder, MediaTaskError, MediaTaskRunner};
    use crate::media_types::*;

    fn run_to_stalled(exec: &mut fasync::TestExecutor) {
        let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
    }

    fn exercise_avdtp(exec: &mut fasync::TestExecutor, remote: Channel, peer: &Peer) {
        let remote_avdtp = avdtp::Peer::new(remote);
        let mut remote_requests = remote_avdtp.take_request_stream();

        // Should be able to actually communicate via the peer.
        let avdtp = peer.avdtp();
        let discover_fut = avdtp.discover();

        let mut discover_fut = pin!(discover_fut);

        assert!(exec.run_until_stalled(&mut discover_fut).is_pending());

        let responder = match exec.run_until_stalled(&mut remote_requests.next()) {
            Poll::Ready(Some(Ok(Request::Discover { responder }))) => responder,
            x => panic!("Expected a Ready Discovery request but got {:?}", x),
        };

        let endpoint_id = avdtp::StreamEndpointId::try_from(1).expect("endpointid creation");

        let information = avdtp::StreamInformation::new(
            endpoint_id,
            false,
            avdtp::MediaType::Audio,
            avdtp::EndpointType::Source,
        );

        responder.send(&[information]).expect("Sending response should have worked");

        let _stream_infos = match exec.run_until_stalled(&mut discover_fut) {
            Poll::Ready(Ok(infos)) => infos,
            x => panic!("Expected a Ready response but got {:?}", x),
        };
    }

    fn setup_connected_peer_test(
    ) -> (fasync::TestExecutor, PeerId, ConnectedPeers, ProfileRequestStream) {
        let exec = fasync::TestExecutor::new();
        let (proxy, stream) =
            create_proxy_and_stream::<ProfileMarker>().expect("Profile proxy should be created");
        let id = PeerId(1);

        let peers = ConnectedPeers::new(
            StreamsBuilder::default(),
            Permits::new(1),
            proxy,
            bt_metrics::MetricsLogger::default(),
        );

        (exec, id, peers, stream)
    }

    #[fuchsia::test]
    fn connect_creates_peer() {
        let (mut exec, id, peers, _stream) = setup_connected_peer_test();

        let (remote, channel) = Channel::create();

        let peer = exec
            .run_singlethreaded(peers.connected(id, channel, None))
            .expect("peer should connect");
        let peer = peer.upgrade().expect("peer should be connected");

        exercise_avdtp(&mut exec, remote, &peer);
    }

    #[fuchsia::test]
    fn connect_notifies_streams() {
        let (mut exec, id, peers, _stream) = setup_connected_peer_test();

        let (remote, channel) = Channel::create();

        let mut peer_stream = peers.connected_stream();
        let mut peer_stream_two = peers.connected_stream();

        let peer = exec
            .run_singlethreaded(peers.connected(id, channel, None))
            .expect("peer should connect");
        let peer = peer.upgrade().expect("peer should be connected");

        // Peers should have been notified of the new peer
        let weak = exec.run_singlethreaded(peer_stream.next()).expect("peer stream to produce");
        assert_eq!(weak.key(), &id);
        let weak = exec.run_singlethreaded(peer_stream_two.next()).expect("peer stream to produce");
        assert_eq!(weak.key(), &id);

        exercise_avdtp(&mut exec, remote, &peer);

        // If you drop one stream, the other one should still produce.
        drop(peer_stream);

        let id2 = PeerId(2);
        let (remote2, channel2) = Channel::create();
        let peer2 = exec
            .run_singlethreaded(peers.connected(id2, channel2, None))
            .expect("peer should connect");
        let peer2 = peer2.upgrade().expect("peer two should be connected");

        let weak = exec.run_singlethreaded(peer_stream_two.next()).expect("peer stream to produce");
        assert_eq!(weak.key(), &id2);

        exercise_avdtp(&mut exec, remote2, &peer2);
    }

    #[fuchsia::test]
    fn find_preferred_direction_returns_correct_endpoints() {
        let empty = HashSet::new();
        assert_eq!(find_preferred_direction(&empty), None);

        let sink_only = HashSet::from_iter(vec![avdtp::EndpointType::Sink].into_iter());
        assert_eq!(find_preferred_direction(&sink_only), Some(avdtp::EndpointType::Sink));

        let source_only = HashSet::from_iter(vec![avdtp::EndpointType::Source].into_iter());
        assert_eq!(find_preferred_direction(&source_only), Some(avdtp::EndpointType::Source));

        let both = HashSet::from_iter(
            vec![avdtp::EndpointType::Sink, avdtp::EndpointType::Source].into_iter(),
        );
        assert_eq!(find_preferred_direction(&both), None);
    }

    // Expected chosen ID for the AAC stream endpoint.
    const AAC_SEID: u8 = 8;
    // Expected chosen ID for the SBC sink stream endpoint.
    const SBC_SINK_SEID: u8 = 9;
    // Expected chosen ID for the SBC source stream endpoint.
    const SBC_SOURCE_SEID: u8 = 10;

    fn aac_sink_codec() -> avdtp::ServiceCapability {
        AacCodecInfo::new(
            AacObjectType::MANDATORY_SNK,
            AacSamplingFrequency::MANDATORY_SNK,
            AacChannels::MANDATORY_SNK,
            true,
            0, // 0 = Unknown constant bitrate support (A2DP Sec. 4.5.2.4)
        )
        .unwrap()
        .into()
    }

    fn sbc_sink_codec() -> avdtp::ServiceCapability {
        SbcCodecInfo::new(
            SbcSamplingFrequency::MANDATORY_SNK,
            SbcChannelMode::MANDATORY_SNK,
            SbcBlockCount::MANDATORY_SNK,
            SbcSubBands::MANDATORY_SNK,
            SbcAllocation::MANDATORY_SNK,
            SbcCodecInfo::BITPOOL_MIN,
            SbcCodecInfo::BITPOOL_MAX,
        )
        .unwrap()
        .into()
    }

    fn sbc_source_codec() -> avdtp::ServiceCapability {
        SbcCodecInfo::new(
            SbcSamplingFrequency::FREQ48000HZ,
            SbcChannelMode::JOINT_STEREO,
            SbcBlockCount::MANDATORY_SRC,
            SbcSubBands::MANDATORY_SRC,
            SbcAllocation::MANDATORY_SRC,
            SbcCodecInfo::BITPOOL_MIN,
            SbcCodecInfo::BITPOOL_MAX,
        )
        .unwrap()
        .into()
    }

    #[derive(Clone)]
    struct FakeBuilder {
        capability: avdtp::ServiceCapability,
        direction: avdtp::EndpointType,
    }

    impl MediaTaskBuilder for FakeBuilder {
        fn configure(
            &self,
            _peer_id: &PeerId,
            codec_config: &MediaCodecConfig,
        ) -> Result<Box<dyn MediaTaskRunner>, MediaTaskError> {
            if self.capability.codec_type() == Some(codec_config.codec_type()) {
                return Ok(Box::new(FakeRunner {}));
            }
            Err(MediaTaskError::Other(String::from("Unsupported configuring")))
        }

        fn direction(&self) -> bt_avdtp::EndpointType {
            self.direction
        }

        fn supported_configs(
            &self,
            _peer_id: &PeerId,
            _offload: Option<AudioOffloadExtProxy>,
        ) -> BoxFuture<'static, Result<Vec<MediaCodecConfig>, MediaTaskError>> {
            futures::future::ready(Ok(vec![(&self.capability).try_into().unwrap()])).boxed()
        }
    }

    struct FakeRunner {}

    impl MediaTaskRunner for FakeRunner {
        fn start(
            &mut self,
            _stream: avdtp::MediaStream,
            _offload: Option<AudioOffloadExtProxy>,
        ) -> Result<Box<dyn crate::media_task::MediaTask>, MediaTaskError> {
            Err(MediaTaskError::Other(String::from("unimplemented starting")))
        }
    }

    /// Sets up a test in which we expect to select a stream and connect to a peer.
    /// Returns the executor, connected peers (under test), request stream for profile interaction,
    /// and an SBC and AAC Sink service capability.
    fn setup_negotiation_test() -> (
        fasync::TestExecutor,
        ConnectedPeers,
        ProfileRequestStream,
        ServiceCapability,
        ServiceCapability,
    ) {
        let exec = fasync::TestExecutor::new_with_fake_time();
        exec.set_fake_time(fasync::Time::from_nanos(1_000_000));
        let (proxy, stream) =
            create_proxy_and_stream::<ProfileMarker>().expect("Profile proxy should be created");

        let aac_sink_codec = aac_sink_codec();
        let sbc_sink_codec = sbc_sink_codec();
        let aac_sink_builder = FakeBuilder {
            capability: aac_sink_codec.clone(),
            direction: avdtp::EndpointType::Sink,
        };
        let sbc_sink_builder = FakeBuilder {
            capability: sbc_sink_codec.clone(),
            direction: avdtp::EndpointType::Sink,
        };
        let sbc_source_builder =
            FakeBuilder { capability: sbc_source_codec(), direction: avdtp::EndpointType::Source };

        let mut streams_builder = StreamsBuilder::default();
        streams_builder.add_builder(aac_sink_builder);
        streams_builder.add_builder(sbc_sink_builder);
        streams_builder.add_builder(sbc_source_builder);

        let peers = ConnectedPeers::new(
            streams_builder,
            Permits::new(1),
            proxy,
            bt_metrics::MetricsLogger::default(),
        );

        (exec, peers, stream, sbc_sink_codec, aac_sink_codec)
    }

    #[fuchsia::test]
    fn streaming_start_with_streaming_peer_is_noop() {
        let (mut exec, peers, _stream, sbc_codec, _aac_codec) = setup_negotiation_test();
        let id = PeerId(1);
        let (remote, channel) = Channel::create();
        let remote = avdtp::Peer::new(remote);

        let delay = zx::Duration::from_seconds(1);

        let mut remote_requests = remote.take_request_stream();

        // This starts the task in the background waiting.
        let mut connected_fut = std::pin::pin!(peers.connected(id, channel, Some(delay)));
        assert!(exec.run_until_stalled(&mut connected_fut).expect("ready").is_ok());
        let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());

        // Before the delay expires, the peer starts the stream.

        let seid: avdtp::StreamEndpointId = SBC_SINK_SEID.try_into().expect("seid to be okay");
        let config_caps = &[ServiceCapability::MediaTransport, sbc_codec];
        let set_config_fut = remote.set_configuration(&seid, &seid, config_caps);
        let mut set_config_fut = pin!(set_config_fut);
        match exec.run_until_stalled(&mut set_config_fut) {
            Poll::Ready(Ok(())) => {}
            x => panic!("Expected set config to be ready and Ok, got {:?}", x),
        };

        // The remote peer doesn't need to actually open, Set Configuration is enough of a signal.
        // wait for the delay to expire now.

        exec.set_fake_time(fasync::Time::after(delay) + zx::Duration::from_micros(1));
        let _ = exec.wake_expired_timers();

        let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());

        // Shouldn't start a discovery, since the stream is scheduled to start already.
        assert!(exec.run_until_stalled(&mut remote_requests.next()).is_pending());
    }

    fn sbc_source_endpoint() -> (avdtp::StreamEndpointId, avdtp::StreamInformation) {
        let remote_sbc_seid: avdtp::StreamEndpointId = 1u8.try_into().unwrap();
        let info = avdtp::StreamInformation::new(
            remote_sbc_seid.clone(),
            false,
            avdtp::MediaType::Audio,
            avdtp::EndpointType::Source,
        );
        (remote_sbc_seid, info)
    }

    fn aac_source_endpoint() -> (avdtp::StreamEndpointId, avdtp::StreamInformation) {
        let remote_aac_seid: avdtp::StreamEndpointId = 2u8.try_into().unwrap();
        let info = avdtp::StreamInformation::new(
            remote_aac_seid.clone(),
            false,
            avdtp::MediaType::Audio,
            avdtp::EndpointType::Source,
        );
        (remote_aac_seid, info)
    }

    fn sbc_sink_endpoint() -> (avdtp::StreamEndpointId, avdtp::StreamInformation) {
        let remote_sbc_seid: avdtp::StreamEndpointId = 3u8.try_into().unwrap();
        let info = avdtp::StreamInformation::new(
            remote_sbc_seid.clone(),
            false,
            avdtp::MediaType::Audio,
            avdtp::EndpointType::Sink,
        );
        (remote_sbc_seid, info)
    }

    /// Expects an AVDTP Discovery request on the `requests` stream. Responds to
    /// the request with the provided `response` endpoints.
    fn expect_peer_discovery(
        exec: &mut fasync::TestExecutor,
        requests: &mut avdtp::RequestStream,
        response: Vec<avdtp::StreamInformation>,
    ) {
        match exec.run_until_stalled(&mut requests.next()) {
            Poll::Ready(Some(Ok(avdtp::Request::Discover { responder }))) => {
                responder.send(&response).expect("response succeeds");
            }
            x => panic!("Expected a discovery request to be sent after delay, got {:?}", x),
        };
    }

    #[fuchsia::test]
    fn streaming_start_configure_while_discovery() {
        let (mut exec, peers, _stream, sbc_codec, _aac_codec) = setup_negotiation_test();
        let id = PeerId(1);
        let (remote, channel) = Channel::create();
        let remote = avdtp::Peer::new(remote);

        let delay = zx::Duration::from_seconds(1);

        let mut remote_requests = remote.take_request_stream();

        // This starts the task in the background waiting.
        let mut connected_fut = std::pin::pin!(peers.connected(id, channel, Some(delay)));
        assert!(exec.run_until_stalled(&mut connected_fut).expect("ready").is_ok());
        let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());

        // The delay expires, and the discovery is start!
        exec.set_fake_time(fasync::Time::after(delay) + zx::Duration::from_micros(1));
        let _ = exec.wake_expired_timers();
        expect_peer_discovery(
            &mut exec,
            &mut remote_requests,
            vec![sbc_source_endpoint().1, aac_source_endpoint().1],
        );

        // The remote peer doesn't need to actually open, Set Configuration is enough of a signal.
        let seid: avdtp::StreamEndpointId = SBC_SINK_SEID.try_into().expect("seid to be okay");
        let config_caps = &[ServiceCapability::MediaTransport, sbc_codec.clone()];
        let set_config_fut = remote.set_configuration(&seid, &seid, config_caps);
        let mut set_config_fut = pin!(set_config_fut);
        match exec.run_until_stalled(&mut set_config_fut) {
            Poll::Ready(Ok(())) => {}
            x => panic!("Expected set config to be ready and Ok, got {:?}", x),
        };

        // Can finish the collection process, but not attempt to configure or start a stream.
        loop {
            match exec.run_until_stalled(&mut remote_requests.next()) {
                Poll::Ready(Some(Ok(avdtp::Request::GetCapabilities { responder, .. }))) => {
                    responder
                        .send(&[avdtp::ServiceCapability::MediaTransport, sbc_codec.clone()])
                        .expect("respond succeeds");
                }
                Poll::Ready(x) => panic!("Got unexpected request: {:?}", x),
                Poll::Pending => break,
            }
        }
    }

    /// Tests connection initiation selects the appropriate stream endpoint based
    /// on a biased codec negotiation that is set from the peer's discovered services.
    #[fuchsia::test]
    fn connect_initiation_uses_biased_codec_negotiation_by_peer() {
        let (mut exec, peers, _stream, sbc_codec, _aac_codec) = setup_negotiation_test();
        let id = PeerId(1);
        let (remote, channel) = Channel::create();

        // System biases towards the Source direction (called when the AudioMode FIDL changes).
        peers.set_preferred_peer_direction(avdtp::EndpointType::Source);

        // New fake peer discovered with some descriptor - the peer's SDP entry shows Sink.
        let remote = avdtp::Peer::new(remote);
        let desc = ProfileDescriptor {
            profile_id: ServiceClassProfileIdentifier::AdvancedAudioDistribution,
            major_version: 1,
            minor_version: 2,
        };
        let preferred_direction = vec![avdtp::EndpointType::Sink];
        let delay = zx::Duration::from_seconds(1);
        peers.found(id, desc, HashSet::from_iter(preferred_direction.into_iter()));

        let connected_fut = peers.connected(id, channel, Some(delay));
        let mut connected_fut = std::pin::pin!(connected_fut);
        let _ = exec
            .run_until_stalled(&mut connected_fut)
            .expect("is ready")
            .expect("connect control channel is ok");
        // run the start task until it's stalled.
        let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());

        let mut remote_requests = remote.take_request_stream();

        // Should wait for the specified amount of time.
        assert!(exec.run_until_stalled(&mut remote_requests.next()).is_pending());

        exec.set_fake_time(fasync::Time::after(delay + zx::Duration::from_micros(1)));
        let _ = exec.wake_expired_timers();

        let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
        // Even though the peer supports both SBC Sink and Source, we expect to negotiate and start
        // on the Sink endpoint since that is the peer's preferred one.
        let (peer_sbc_source_seid, peer_sbc_source_endpoint) = sbc_source_endpoint();
        let (peer_sbc_sink_seid, peer_sbc_sink_endpoint) = sbc_sink_endpoint();
        expect_peer_discovery(
            &mut exec,
            &mut remote_requests,
            vec![peer_sbc_source_endpoint, peer_sbc_sink_endpoint],
        );
        for _twice in 1..=2 {
            match exec.run_until_stalled(&mut remote_requests.next()) {
                Poll::Ready(Some(Ok(avdtp::Request::GetCapabilities { stream_id, responder }))) => {
                    let codec = match stream_id {
                        id if id == peer_sbc_source_seid => sbc_codec.clone(),
                        id if id == peer_sbc_sink_seid => sbc_codec.clone(),
                        x => panic!("Got unexpected get_capabilities seid {:?}", x),
                    };
                    responder
                        .send(&[avdtp::ServiceCapability::MediaTransport, codec])
                        .expect("respond succeeds");
                }
                x => panic!("Expected a ready get capabilities request, got {:?}", x),
            };
        }

        match exec.run_until_stalled(&mut remote_requests.next()) {
            Poll::Ready(Some(Ok(avdtp::Request::SetConfiguration {
                local_stream_id,
                remote_stream_id,
                capabilities: _,
                responder,
            }))) => {
                // We expect the set configuration to apply to the remote peer's Sink SEID and the
                // local Source SEID.
                assert_eq!(peer_sbc_sink_seid, local_stream_id);
                let local_sbc_source_seid: avdtp::StreamEndpointId =
                    SBC_SOURCE_SEID.try_into().unwrap();
                assert_eq!(local_sbc_source_seid, remote_stream_id);
                responder.send().expect("response sends");
            }
            x => panic!("Expected a ready set configuration request, got {:?}", x),
        };
    }

    /// Tests connection initiation selects the appropriate stream endpoint based
    /// on a biased codec negotiation that is set from by the system (in practice, the AudioMode
    /// FIDL). This case typically occurs when a peer advertises both sink and source, and therefore
    /// has no preference for the endpoint direction.
    #[fuchsia::test]
    fn connect_initiation_uses_biased_codec_negotiation_by_system() {
        let (mut exec, peers, _stream, sbc_codec, _aac_codec) = setup_negotiation_test();
        let id = PeerId(1);
        let (remote, channel) = Channel::create();

        // System biases towards the Source direction (called when the AudioMode FIDL changes).
        peers.set_preferred_peer_direction(avdtp::EndpointType::Source);

        // New fake peer discovered with separate Sink and Source entries.
        let remote = avdtp::Peer::new(remote);
        let desc = ProfileDescriptor {
            profile_id: ServiceClassProfileIdentifier::AdvancedAudioDistribution,
            major_version: 1,
            minor_version: 2,
        };
        peers.found(
            id,
            desc.clone(),
            HashSet::from_iter(vec![avdtp::EndpointType::Source].into_iter()),
        );
        peers.found(id, desc, HashSet::from_iter(vec![avdtp::EndpointType::Sink].into_iter()));

        let delay = zx::Duration::from_seconds(1);
        let connect_fut = peers.connected(id, channel, Some(delay));
        let mut connect_fut = std::pin::pin!(connect_fut);
        let _ = exec
            .run_until_stalled(&mut connect_fut)
            .expect("ready")
            .expect("connect control channel is ok");
        let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());

        let mut remote_requests = remote.take_request_stream();
        // Should wait for the specified amount of time.
        assert!(exec.run_until_stalled(&mut remote_requests.next()).is_pending());
        exec.set_fake_time(fasync::Time::after(delay + zx::Duration::from_micros(1)));
        let _ = exec.wake_expired_timers();
        let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());

        // Because the peer advertises both Sink and Source, we fall back to the system-biased
        // direction, which is Source for the peer.
        let (peer_sbc_source_seid, peer_sbc_source_endpoint) = sbc_source_endpoint();
        let (peer_sbc_sink_seid, peer_sbc_sink_endpoint) = sbc_sink_endpoint();
        expect_peer_discovery(
            &mut exec,
            &mut remote_requests,
            vec![peer_sbc_source_endpoint, peer_sbc_sink_endpoint],
        );
        for _twice in 1..=2 {
            match exec.run_until_stalled(&mut remote_requests.next()) {
                Poll::Ready(Some(Ok(avdtp::Request::GetCapabilities { stream_id, responder }))) => {
                    let codec = match stream_id {
                        id if id == peer_sbc_source_seid => sbc_codec.clone(),
                        id if id == peer_sbc_sink_seid => sbc_codec.clone(),
                        x => panic!("Got unexpected get_capabilities seid {:?}", x),
                    };
                    responder
                        .send(&[avdtp::ServiceCapability::MediaTransport, codec])
                        .expect("respond succeeds");
                }
                x => panic!("Expected a ready get capabilities request, got {:?}", x),
            };
        }

        match exec.run_until_stalled(&mut remote_requests.next()) {
            Poll::Ready(Some(Ok(avdtp::Request::SetConfiguration {
                local_stream_id,
                remote_stream_id,
                capabilities: _,
                responder,
            }))) => {
                // We expect the set configuration to apply to the remote peer's Source SEID and the
                // local Sink SEID.
                assert_eq!(peer_sbc_source_seid, local_stream_id);
                let local_sbc_sink_seid: avdtp::StreamEndpointId =
                    SBC_SINK_SEID.try_into().unwrap();
                assert_eq!(local_sbc_sink_seid, remote_stream_id);
                responder.send().expect("response sends");
            }
            x => panic!("Expected a ready set configuration request, got {:?}", x),
        };
    }

    #[fuchsia::test]
    fn connect_initiation_uses_negotiation() {
        let (mut exec, peers, _stream, sbc_codec, aac_codec) = setup_negotiation_test();
        let id = PeerId(1);
        let (remote, channel) = Channel::create();
        let remote = avdtp::Peer::new(remote);

        let delay = zx::Duration::from_seconds(1);

        let mut connect_fut = std::pin::pin!(peers.connected(id, channel, Some(delay)));
        let _ = exec
            .run_until_stalled(&mut connect_fut)
            .expect("ready")
            .expect("connect control channel is ok");

        // run the start task until it's stalled.
        let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());

        let mut remote_requests = remote.take_request_stream();

        // Should wait for the specified amount of time.
        assert!(exec.run_until_stalled(&mut remote_requests.next()).is_pending());

        exec.set_fake_time(fasync::Time::after(delay + zx::Duration::from_micros(1)));
        let _ = exec.wake_expired_timers();

        let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());

        // Should discover remote streams, negotiate, and start.
        let (peer_sbc_seid, peer_sbc_endpoint) = sbc_source_endpoint();
        let (peer_aac_seid, peer_aac_endpoint) = aac_source_endpoint();
        expect_peer_discovery(
            &mut exec,
            &mut remote_requests,
            vec![peer_sbc_endpoint, peer_aac_endpoint],
        );
        for _twice in 1..=2 {
            match exec.run_until_stalled(&mut remote_requests.next()) {
                Poll::Ready(Some(Ok(avdtp::Request::GetCapabilities { stream_id, responder }))) => {
                    let codec = match stream_id {
                        id if id == peer_sbc_seid => sbc_codec.clone(),
                        id if id == peer_aac_seid => aac_codec.clone(),
                        x => panic!("Got unexpected get_capabilities seid {:?}", x),
                    };
                    responder
                        .send(&[avdtp::ServiceCapability::MediaTransport, codec])
                        .expect("respond succeeds");
                }
                x => panic!("Expected a ready get capabilities request, got {:?}", x),
            };
        }

        match exec.run_until_stalled(&mut remote_requests.next()) {
            Poll::Ready(Some(Ok(avdtp::Request::SetConfiguration {
                local_stream_id,
                remote_stream_id,
                capabilities: _,
                responder,
            }))) => {
                // Should set the aac stream, matched with local AAC seid.
                assert_eq!(peer_aac_seid, local_stream_id);
                let local_aac_seid: avdtp::StreamEndpointId = AAC_SEID.try_into().unwrap();
                assert_eq!(local_aac_seid, remote_stream_id);
                responder.send().expect("response sends");
            }
            x => panic!("Expected a ready set configuration request, got {:?}", x),
        };
    }

    #[fuchsia::test]
    fn connected_peers_inspect() {
        let (mut exec, id, mut peers, _stream) = setup_connected_peer_test();

        let inspect = inspect::Inspector::default();
        peers.iattach(inspect.root(), "peers").expect("should attach to inspect tree");

        assert_data_tree!(inspect, root: {
            peers: { streams_builder: contains {}, discovered: contains {}, preferred_peer_direction: "Sink" }});

        peers.set_preferred_peer_direction(avdtp::EndpointType::Source);

        assert_data_tree!(inspect, root: {
            peers: { streams_builder: contains {}, discovered: contains {}, preferred_peer_direction: "Source" }});

        // Connect a peer, it should show up in the tree.
        let (_remote, channel) = Channel::create();
        assert!(exec.run_singlethreaded(peers.connected(id, channel, None)).is_ok());

        assert_data_tree!(inspect, root: {
            peers: {
                discovered: contains {},
                preferred_peer_direction: "Source",
                streams_builder: contains {},
                peer_0: { id: "0000000000000001", local_streams: contains {} }
            }
        });
    }

    #[fuchsia::test]
    fn try_connect_cancels_previous_attempt() {
        let (mut exec, id, peers, mut profile_stream) = setup_connected_peer_test();

        let mut connect_fut = peers.try_connect(id, ChannelParameters::default());

        // Should get a request to connect, which we will stall and not respond to.
        let responder = match exec.run_singlethreaded(profile_stream.next()) {
            Some(Ok(bredr::ProfileRequest::Connect { responder, .. })) => responder,
            x => panic!("Expected Profile connect, got {x:?}"),
        };

        // Trying to connect again should cancel the first try, and send another connect.
        let mut connect_again_fut = peers.try_connect(id, ChannelParameters::default());
        let responder_two = match exec.run_singlethreaded(profile_stream.next()) {
            Some(Ok(bredr::ProfileRequest::Connect { responder, .. })) => responder,
            x => panic!("Expected Profile connect, got {x:?}"),
        };

        let first_result = exec.run_singlethreaded(&mut connect_fut);
        let _ = first_result.expect_err("Should have an error from first attempt");

        // Responding on the first connect shouldn't do anything at this point.
        responder.send(Err(fidl_fuchsia_bluetooth::ErrorCode::Failed)).unwrap();

        exec.run_until_stalled(&mut connect_again_fut).expect_pending("shouldn't finish");

        let (_remote, local) = Channel::create();
        responder_two.send(Ok(local.try_into().unwrap())).unwrap();

        let second_result = exec.run_singlethreaded(&mut connect_again_fut);
        let _ = second_result.expect("should receive the channel");
    }

    #[fuchsia::test]
    fn connected_peers_peer_disconnect_removes_peer() {
        let (mut exec, id, peers, _stream) = setup_connected_peer_test();

        let (remote, channel) = Channel::create();

        assert!(exec.run_singlethreaded(peers.connected(id, channel, None)).is_ok());
        run_to_stalled(&mut exec);

        // Disconnect the signaling channel, peer should be gone.
        drop(remote);

        run_to_stalled(&mut exec);

        assert!(peers.get(&id).is_none());
    }

    #[fuchsia::test]
    fn connected_peers_reconnect_works() {
        let (mut exec, id, peers, _stream) = setup_connected_peer_test();

        let (remote, channel) = Channel::create();
        assert!(exec.run_singlethreaded(peers.connected(id, channel, None)).is_ok());
        run_to_stalled(&mut exec);

        // Disconnect the signaling channel, peer should be gone.
        drop(remote);

        run_to_stalled(&mut exec);

        assert!(peers.get(&id).is_none());

        // Connect another peer with the same ID
        let (_remote, channel) = Channel::create();

        assert!(exec.run_singlethreaded(peers.connected(id, channel, None)).is_ok());
        run_to_stalled(&mut exec);

        // Should be connected.
        assert!(peers.get(&id).is_some());
    }
}
