// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#![feature(futures_api, async_await, await_macro)]
#![recursion_limit = "256"]

use {
    bt_avdtp as avdtp,
    failure::{format_err, Error, ResultExt},
    fidl_fuchsia_bluetooth_bredr::*,
    fidl_fuchsia_media::AUDIO_ENCODING_SBC,
    fuchsia_async as fasync,
    fuchsia_syslog::{self, fx_log_info, fx_log_warn},
    fuchsia_zircon as zx,
    futures::{
        channel::mpsc::{self as mpsc, Receiver, Sender},
        select, FutureExt, StreamExt,
    },
    parking_lot::RwLock,
    std::{collections::hash_map::Entry, collections::HashMap, string::String, sync::Arc},
};

mod player;

/// When true, the service will display a byte count while streaming.
const DEBUG_STREAM_STATS: bool = false;

/// Make the SDP definition for the A2DP sink service.
fn make_profile_service_definition() -> ServiceDefinition {
    ServiceDefinition {
        service_class_uuids: vec![String::from("110B")], // Audio Sink UUID
        protocol_descriptors: vec![
            ProtocolDescriptor {
                protocol: ProtocolIdentifier::L2Cap,
                params: vec![DataElement {
                    type_: DataElementType::UnsignedInteger,
                    size: 2,
                    data: DataElementData::Integer(PSM_AVDTP),
                }],
            },
            ProtocolDescriptor {
                protocol: ProtocolIdentifier::Avdtp,
                params: vec![DataElement {
                    type_: DataElementType::UnsignedInteger,
                    size: 2,
                    data: DataElementData::Integer(0x0103), // Indicate v1.3
                }],
            },
        ],
        profile_descriptors: vec![ProfileDescriptor {
            profile_id: ServiceClassProfileIdentifier::AdvancedAudioDistribution,
            major_version: 1,
            minor_version: 3,
        }],
        additional_protocol_descriptors: None,
        information: vec![Information {
            language: "en".to_string(),
            name: Some("A2DP".to_string()),
            description: Some("Advanced Audio Distribution Profile".to_string()),
            provider: Some("Fuchsia".to_string()),
        }],
        additional_attributes: None,
    }
}

// Defined in the Bluetooth Assigned Numbers for Audio/Video applications
// https://www.bluetooth.com/specifications/assigned-numbers/audio-video
const AUDIO_CODEC_SBC: u8 = 0;
// Arbitrarily chosen ID for the SBC stream endpoint.
const SBC_SEID: u8 = 6;

/// Controls a stream endpoint and the media decoding task which is associated with it.
struct Stream {
    /// The AVDTP endpoint that this stream is associated with.
    endpoint: avdtp::StreamEndpoint,
    /// The encoding that media sent to this endpoint should be encoded with.
    /// This should be an encoding constant from fuchsia.media like AUDIO_ENCODING_SBC.
    /// See //garnet/public/fidl/fuchsia.media/stream_type.fidl for valid encodings.
    encoding: String,
    /// Some(sender) when a stream task is started.  Signaling on this sender will
    /// end the media streaming task.
    suspend_sender: Option<Sender<()>>,
}

impl Stream {
    fn new(endpoint: avdtp::StreamEndpoint, encoding: String) -> Stream {
        Stream {
            endpoint,
            encoding,
            suspend_sender: None,
        }
    }

    /// Attempt to start the media decoding task.
    fn start(&mut self) -> Result<(), avdtp::ErrorCode> {
        if self.suspend_sender.is_some() {
            return Err(avdtp::ErrorCode::BadState);
        }
        let (send, receive) = mpsc::channel(1);
        self.suspend_sender = Some(send);
        fuchsia_async::spawn(decode_media_stream(
            self.endpoint.take_transport(),
            self.encoding.clone(),
            receive,
        ));
        Ok(())
    }

    /// Signals to the media decoding task to end.
    fn stop(&mut self) -> Result<(), avdtp::ErrorCode> {
        match self.suspend_sender.take() {
            None => Err(avdtp::ErrorCode::BadState),
            Some(mut sender) => sender.try_send(()).or(Err(avdtp::ErrorCode::BadState)),
        }
    }
}

/// A collection of streams that can be indexed by their EndpointId to their
/// endpoint and the codec to use for this endpoint.
struct Streams(HashMap<avdtp::StreamEndpointId, Stream>);

impl Streams {
    /// A new empty set of endpoints.
    fn new() -> Streams {
        Streams(HashMap::new())
    }

    /// Builds a set of endpoints from the available codecs.
    fn build() -> avdtp::Result<Streams> {
        let mut s = Streams::new();
        // TODO(BT-533): detect codecs, add streams for each codec
        let sbc_stream = avdtp::StreamEndpoint::new(
            SBC_SEID,
            avdtp::MediaType::Audio,
            avdtp::EndpointType::Sink,
            vec![
                avdtp::ServiceCapability::MediaTransport,
                avdtp::ServiceCapability::MediaCodec {
                    media_type: avdtp::MediaType::Audio,
                    codec_type: avdtp::MediaCodecType::new(AUDIO_CODEC_SBC),
                    // SBC Codec Specific Information Elements:
                    // These are the mandatory support in sink.
                    // Byte 0:
                    //  - Sampling Frequencies: 44.1kHz, 48.0kHz
                    //  - Channel modes: All (MONO, DUAL CHANNEL, STEREO, JOINT STEREO)
                    // Byte 1:
                    //  - Block length: all (4, 8, 12, 16)
                    //  - Subbands: all (4, 8)
                    //  - Allocation Method: all (SNR and loudness)
                    // Byte 2-3: Minimum and maximum bitpool value. This is just the minimum to the max.
                    // TODO(jamuraa): there should be a way to build this data in a structured way (bt-a2dp?)
                    codec_extra: vec![0x3F, 0xFF, 2, 250],
                },
            ],
        )?;
        s.insert(sbc_stream, AUDIO_ENCODING_SBC.to_string());
        Ok(s)
    }

    /// Adds a stream, indexing it by the endoint id, associated with an encoding,
    /// replacing any other stream with the same endpoint id.
    fn insert(&mut self, stream: avdtp::StreamEndpoint, codec: String) {
        self.0
            .insert(stream.local_id().clone(), Stream::new(stream, codec));
    }

    /// Retrievees a mutable reference to the endpoint with the `id`.
    fn get_endpoint(&mut self, id: &avdtp::StreamEndpointId) -> Option<&mut avdtp::StreamEndpoint> {
        self.0.get_mut(id).map(|x| &mut x.endpoint)
    }

    /// Retrieves a mutable reference to the Stream referenced by `id`, if the stream exists,
    /// otherwise returns Err(BadAcpSeid).
    fn get_mut(&mut self, id: &avdtp::StreamEndpointId) -> Result<&mut Stream, avdtp::ErrorCode> {
        self.0.get_mut(id).ok_or(avdtp::ErrorCode::BadAcpSeid)
    }

    /// Returns the information on all known streams.
    fn information(&self) -> Vec<avdtp::StreamInformation> {
        self.0.values().map(|x| x.endpoint.information()).collect()
    }
}

/// Discovers any remote streams and reports their information to the log.
async fn discover_remote_streams(peer: Arc<avdtp::Peer>) {
    let streams = await!(peer.discover()).expect("Failed to discover source streams");
    fx_log_info!("Discovered {} streams", streams.len());
    for info in streams {
        match await!(peer.get_all_capabilities(info.id())) {
            Ok(capabilities) => {
                fx_log_info!("Stream {:?}", info);
                for cap in capabilities {
                    fx_log_info!("  - {:?}", cap);
                }
            }
            Err(e) => fx_log_info!("Stream {} discovery failed: {:?}", info.id(), e),
        };
    }
}

/// RemotePeer handles requests from the AVDTP layer, and provides responses as appropriate based
/// on the current state of the A2DP streams available.
/// Each remote A2DP source interacts with its own set of stream endpoints.
struct RemotePeer {
    /// AVDTP peer communicating to this.
    peer: Arc<avdtp::Peer>,
    /// Some(id) if this peer has just opened a stream but the StreamEndpoint hasn't signaled the
    /// end of channel opening yet. Per AVDTP Sec 6.11 only up to one stream can be in this state.
    opening: Option<avdtp::StreamEndpointId>,
    /// The stream endpoint collection for this peer.
    streams: Streams,
}

type RemotesMap = HashMap<String, RemotePeer>;

impl RemotePeer {
    fn new(peer: avdtp::Peer) -> RemotePeer {
        RemotePeer {
            peer: Arc::new(peer),
            opening: None,
            streams: Streams::build().unwrap(),
        }
    }

    /// Provides a reference to the AVDTP peer.
    fn peer(&self) -> Arc<avdtp::Peer> {
        self.peer.clone()
    }

    /// Provide a new established L2CAP channel to this remote peer.
    /// This function should be called whenever the remote assoiated with this peer opens an
    /// L2CAP channel after the first.
    fn receive_channel(&mut self, channel: zx::Socket) -> Result<(), Error> {
        let stream = match &self.opening {
            None => Err(format_err!("No stream opening.")),
            Some(id) => self
                .streams
                .get_endpoint(&id)
                .ok_or(format_err!("endpoint doesn't exist")),
        }?;
        if !stream.receive_channel(fasync::Socket::from_socket(channel)?)? {
            self.opening = None;
        }
        fx_log_info!("connected transport channel to seid {}", stream.local_id());
        Ok(())
    }

    /// Start an asynchronous task to handle any requests from the AVDTP peer.
    /// This task completes when the remote end closes the signaling connection.
    /// This remote peer should be active in the `remotes` map with an id of `device_id`.
    /// When the signaling connection is closed, the task deactivates the remote, removing it
    /// from the `remotes` map.
    fn start_requests_task(&mut self, remotes: Arc<RwLock<RemotesMap>>, device_id: String) {
        let mut request_stream = self.peer.take_request_stream();
        fuchsia_async::spawn(
            async move {
                while let Some(r) = await!(request_stream.next()) {
                    match r {
                        Err(e) => fx_log_info!("Request Error on {}: {:?}", device_id, e),
                        Ok(request) => {
                            let mut peer;
                            {
                                let mut wremotes = remotes.write();
                                peer = wremotes.remove(&device_id).unwrap();
                            }
                            let fut = peer.handle_request(request);
                            if let Err(e) = await!(fut) {
                                fx_log_warn!("{} Error handling request: {:?}", device_id, e);
                            }
                            remotes.write().insert(device_id.clone(), peer);
                        }
                    }
                }
                fx_log_info!("Peer {} disconnected", device_id);
                remotes.write().remove(&device_id);
            },
        );
    }

    /// Handle a single request event from the avdtp peer.
    async fn handle_request(&mut self, r: avdtp::Request) -> avdtp::Result<()> {
        fx_log_info!("Handling {:?} from peer..", r);
        match r {
            avdtp::Request::Discover { responder } => responder.send(&self.streams.information()),
            avdtp::Request::GetCapabilities {
                responder,
                stream_id,
            }
            | avdtp::Request::GetAllCapabilities {
                responder,
                stream_id,
            } => match self.streams.get_endpoint(&stream_id) {
                None => responder.reject(avdtp::ErrorCode::BadAcpSeid),
                Some(stream) => responder.send(stream.capabilities()),
            },
            avdtp::Request::Open {
                responder,
                stream_id,
            } => match self.streams.get_endpoint(&stream_id) {
                None => responder.reject(avdtp::ErrorCode::BadAcpSeid),
                Some(stream) => match stream.establish() {
                    Ok(()) => {
                        self.opening = Some(stream_id);
                        responder.send()
                    }
                    Err(_) => responder.reject(avdtp::ErrorCode::BadState),
                },
            },
            avdtp::Request::Close {
                responder,
                stream_id,
            } => match self.streams.get_endpoint(&stream_id) {
                None => responder.reject(avdtp::ErrorCode::BadAcpSeid),
                Some(stream) => await!(stream.release(responder, &self.peer)),
            },
            avdtp::Request::SetConfiguration {
                responder,
                local_stream_id,
                remote_stream_id,
                capabilities,
            } => {
                let stream = match self.streams.get_endpoint(&local_stream_id) {
                    None => return responder.reject(None, avdtp::ErrorCode::BadAcpSeid),
                    Some(stream) => stream,
                };
                // TODO(BT-695): Confirm the MediaCodec parameters are OK
                match stream.configure(&remote_stream_id, capabilities) {
                    Ok(_) => responder.send(),
                    Err(e) => {
                        // Only happens when this is already configured.
                        responder.reject(None, avdtp::ErrorCode::SepInUse)?;
                        Err(e)
                    }
                }
            }
            avdtp::Request::Reconfigure {
                responder,
                local_stream_id,
                capabilities,
            } => {
                let stream = match self.streams.get_endpoint(&local_stream_id) {
                    None => return responder.reject(None, avdtp::ErrorCode::BadAcpSeid),
                    Some(stream) => stream,
                };
                // TODO(jamuraa): Actually tweak the codec parameters.
                match stream.reconfigure(capabilities) {
                    Ok(_) => responder.send(),
                    Err(e) => {
                        responder.reject(None, avdtp::ErrorCode::BadState)?;
                        Err(e)
                    }
                }
            }
            avdtp::Request::Start {
                responder,
                stream_ids,
            } => {
                for seid in stream_ids {
                    if let Err(code) = self.streams.get_mut(&seid).and_then(|x| x.start()) {
                        return responder.reject(&seid, code);
                    }
                }
                responder.send()
            }
            avdtp::Request::Suspend {
                responder,
                stream_ids,
            } => {
                for seid in stream_ids {
                    if let Err(code) = self.streams.get_mut(&seid).and_then(|x| x.stop()) {
                        return responder.reject(&seid, code);
                    }
                }
                responder.send()
            }
            avdtp::Request::Abort {
                responder,
                stream_id,
            } => {
                let stream = match self.streams.get_endpoint(&stream_id) {
                    None => return Ok(()),
                    Some(stream) => stream,
                };
                await!(stream.abort(None))?;
                self.opening = self.opening.take().filter(|id| id != &stream_id);
                let _ = self.streams.get_mut(&stream_id).and_then(|x| x.stop());
                responder.send()
            }
        }
    }
}

/// Decodes a media stream by starting a Player and transferring media stream packets from AVDTP
/// to the player.  Restarts the player on player errors.
/// Ends when signaled from `end_signal`, or when the media transport stream is closed.
async fn decode_media_stream(
    mut stream: avdtp::MediaStream, encoding: String, mut end_signal: Receiver<()>,
) -> () {
    let mut total_bytes = 0;
    let mut player = match await!(player::Player::new(encoding.clone())) {
        Ok(v) => v,
        Err(e) => {
            fx_log_info!("Can't setup stream source for Media: {:?}", e);
            return;
        }
    };
    loop {
        select! {
            item = stream.next().fuse() => {
                if item.is_none() {
                    fx_log_info!("Media transport closed");
                    return;
                }
                match item.unwrap() {
                    Ok(pkt) => {
                        match player.push_payload(&pkt.as_slice()) {
                            Err(e) => {
                                fx_log_info!("can't push packet: {:?}", e);
                            }
                            _ => (),
                        };
                        total_bytes += pkt.len();
                        if !player.playing() {
                            player.play().unwrap_or_else(|e| fx_log_info!("Problem playing: {:}", e));
                        }
                        // TODO(BT-696): Report rx stats to the hub.
                        if DEBUG_STREAM_STATS {
                            eprint!(
                                "Media Packet received: +{} bytes = {} \r",
                                pkt.len(),
                                total_bytes
                                );
                        }
                    }
                    Err(e) => {
                        fx_log_info!("Error in media stream: {:?}", e);
                        return;
                    }
                }
            },
            evt = player.next_event().fuse() => {
                fx_log_info!("Player Event happened: {:?}", evt);
                if evt.is_none() {
                    fx_log_info!("Rebuilding Player: {:?}", evt);
                    // The player died somehow? Attempt to rebuild the player.
                    player = match await!(player::Player::new(encoding.clone())) {
                        Ok(v) => v,
                        Err(e) => {
                            fx_log_info!("Can't rebuild player: {:?}", e);
                            return;
                        }
                    };
                }
            }
            _ = end_signal.next().fuse() => {
                fx_log_info!("Stream ending on end signal");
                return;
            }
        }
    }
}

#[fasync::run_singlethreaded]
async fn main() -> Result<(), Error> {
    fuchsia_syslog::init_with_tags(&["a2dp-sink"]).expect("Can't init logger");
    let profile_svc = fuchsia_app::client::connect_to_service::<ProfileMarker>()
        .context("Failed to connect to Bluetooth profile service")?;

    let mut service_def = make_profile_service_definition();
    let (status, service_id) = await!(profile_svc.add_service(
        &mut service_def,
        SecurityLevel::EncryptionOptional,
        false
    ))?;

    fx_log_info!("Registered Service ID {}", service_id);

    if let Some(e) = status.error {
        return Err(format_err!("Couldn't add A2DP sink service: {:?}", e));
    }

    let remotes: Arc<RwLock<RemotesMap>> = Arc::new(RwLock::new(HashMap::new()));

    let mut evt_stream = profile_svc.take_event_stream();
    while let Some(evt) = await!(evt_stream.next()) {
        match evt {
            Err(e) => return Err(e.into()),
            Ok(ProfileEvent::OnConnected {
                device_id,
                service_id: _,
                channel,
                protocol,
            }) => {
                fx_log_info!(
                    "Connection from {}: {:?} {:?}!",
                    device_id,
                    channel,
                    protocol
                );
                match remotes.write().entry(device_id.clone()) {
                    Entry::Occupied(mut entry) => {
                        if let Err(e) = entry.get_mut().receive_channel(channel) {
                            fx_log_warn!("{} failed to connect channel: {}", device_id, e);
                        }
                    }
                    Entry::Vacant(entry) => {
                        fx_log_info!("Adding new peer for {}", device_id);
                        let peer = match avdtp::Peer::new(channel) {
                            Ok(peer) => peer,
                            Err(e) => {
                                fx_log_warn!("Error adding signaling peer {}: {:?}", device_id, e);
                                continue;
                            }
                        };
                        let remote = entry.insert(RemotePeer::new(peer));
                        // Spawn tasks to handle this remote
                        remote.start_requests_task(remotes.clone(), device_id);
                        fuchsia_async::spawn(discover_remote_streams(remote.peer()));
                    }
                }
            }
        }
    }
    Ok(())
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    /// Test that the Streams specialized hashmap works as expected, storing
    /// the stream based on the SEID and retrieving the right pieces from
    /// the accessors.
    fn test_streams() {
        let mut streams = Streams::new();

        // An endpoint for testing
        let s = avdtp::StreamEndpoint::new(
            1,
            avdtp::MediaType::Audio,
            avdtp::EndpointType::Sink,
            vec![avdtp::ServiceCapability::MediaTransport],
        )
        .unwrap();

        let id = s.local_id().clone();
        let information = s.information();
        let encoding = AUDIO_ENCODING_SBC.to_string();

        assert!(streams.get_endpoint(&id).is_none());

        let res = streams.get_mut(&id);

        assert!(res.is_err());
        assert_eq!(avdtp::ErrorCode::BadAcpSeid, res.err().unwrap());

        streams.insert(s, encoding.clone());

        assert!(streams.get_endpoint(&id).is_some());
        assert_eq!(&id, streams.get_endpoint(&id).unwrap().local_id());

        assert_eq!([information], streams.information().as_slice());

        let res = streams.get_mut(&id);

        assert!(res.as_ref().unwrap().suspend_sender.is_none());
        assert_eq!(encoding, res.as_ref().unwrap().encoding);
    }
}
