| // Copyright 2018 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| use { |
| fuchsia_async::{DurationExt, Task, TimeoutExt}, |
| fuchsia_bluetooth::types::{A2dpDirection, Channel}, |
| fuchsia_sync::Mutex, |
| fuchsia_zircon::{Duration, DurationNum, Status}, |
| futures::{io, stream::Stream, FutureExt}, |
| std::{ |
| fmt, |
| pin::Pin, |
| sync::Arc, |
| sync::RwLock, |
| sync::Weak, |
| task::{Context, Poll}, |
| }, |
| }; |
| |
| use crate::{ |
| types::{ |
| EndpointType, Error, ErrorCode, MediaCodecType, MediaType, Result as AvdtpResult, |
| ServiceCapability, ServiceCategory, StreamEndpointId, StreamInformation, |
| }, |
| Peer, SimpleResponder, |
| }; |
| |
| pub type StreamEndpointUpdateCallback = Box<dyn Fn(&StreamEndpoint) -> () + Sync + Send>; |
| |
| /// The state of a StreamEndpoint. |
| #[derive(PartialEq, Debug)] |
| pub enum StreamState { |
| Idle, |
| Configured, |
| // An Open command has been accepted, but streams have not been established yet. |
| Opening, |
| Open, |
| Streaming, |
| Closing, |
| Aborting, |
| } |
| |
| impl StreamState { |
| fn configured(&self) -> bool { |
| match self { |
| StreamState::Configured |
| | StreamState::Opening |
| | StreamState::Open |
| | StreamState::Streaming => true, |
| _ => false, |
| } |
| } |
| } |
| |
| /// An AVDTP StreamEndpoint. StreamEndpoints represent a particular capability of the application |
| /// to be a source of sink of media. Included here to aid negotiating the stream connection. |
| /// See Section 5.3 of the AVDTP 1.3 Specification for more information about the Stream Endpoint |
| /// Architecture. |
| pub struct StreamEndpoint { |
| /// Local stream endpoint id. This should be unique per AVDTP Peer. |
| id: StreamEndpointId, |
| /// The type of endpoint this is (TSEP), Source or Sink. |
| endpoint_type: EndpointType, |
| /// The media type this stream represents. |
| media_type: MediaType, |
| /// Current state the stream is in. See Section 6.5 for an overview. |
| state: StreamState, |
| /// The media transport channel |
| /// This should be Some(channel) when state is Open or Streaming. |
| transport: Option<Arc<RwLock<Channel>>>, |
| /// True when the MediaStream is held. |
| /// Prevents multiple threads from owning the media stream. |
| stream_held: Arc<Mutex<bool>>, |
| /// The capabilities of this endpoint. |
| capabilities: Vec<ServiceCapability>, |
| /// The remote stream endpoint id. None if the stream has never been configured. |
| remote_id: Option<StreamEndpointId>, |
| /// The current configuration of this endpoint. Empty if the stream has never been configured. |
| configuration: Vec<ServiceCapability>, |
| /// Callback that is run whenever the endpoint is updated |
| update_callback: Option<StreamEndpointUpdateCallback>, |
| } |
| |
| impl fmt::Debug for StreamEndpoint { |
| fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| f.debug_struct("StreamEndpoint") |
| .field("id", &self.id.0) |
| .field("endpoint_type", &self.endpoint_type) |
| .field("media_type", &self.media_type) |
| .field("state", &self.state) |
| .field("capabilities", &self.capabilities) |
| .field("remote_id", &self.remote_id.as_ref().map(|id| id.to_string())) |
| .field("configuration", &self.configuration) |
| .finish() |
| } |
| } |
| |
| impl StreamEndpoint { |
| /// Make a new StreamEndpoint. |
| /// |id| must be in the valid range for a StreamEndpointId (0x01 - 0x3E). |
| /// StreamEndpoints start in the Idle state. |
| pub fn new( |
| id: u8, |
| media_type: MediaType, |
| endpoint_type: EndpointType, |
| capabilities: Vec<ServiceCapability>, |
| ) -> AvdtpResult<StreamEndpoint> { |
| let seid = StreamEndpointId::try_from(id)?; |
| Ok(StreamEndpoint { |
| id: seid, |
| capabilities, |
| media_type, |
| endpoint_type, |
| state: StreamState::Idle, |
| transport: None, |
| stream_held: Arc::new(Mutex::new(false)), |
| remote_id: None, |
| configuration: vec![], |
| update_callback: None, |
| }) |
| } |
| |
| pub fn as_new(&self) -> Self { |
| StreamEndpoint::new( |
| self.id.0, |
| self.media_type.clone(), |
| self.endpoint_type.clone(), |
| self.capabilities.clone(), |
| ) |
| .expect("as_new") |
| } |
| |
| /// Set the state to the given value and run the `update_callback` afterwards |
| fn set_state(&mut self, state: StreamState) { |
| self.state = state; |
| self.update_callback(); |
| } |
| |
| /// Pass update callback to StreamEndpoint that will be called anytime `StreamEndpoint` is |
| /// modified. |
| pub fn set_update_callback(&mut self, callback: Option<StreamEndpointUpdateCallback>) { |
| self.update_callback = callback; |
| } |
| |
| fn update_callback(&self) { |
| if let Some(cb) = self.update_callback.as_ref() { |
| cb(self); |
| } |
| } |
| |
| /// Build a new StreamEndpoint from a StreamInformation and associated Capabilities. |
| /// This makes it easy to build from AVDTP Discover and GetCapabilities procedures. |
| /// StreamEndpooints start in the Idle state. |
| pub fn from_info( |
| info: &StreamInformation, |
| capabilities: Vec<ServiceCapability>, |
| ) -> StreamEndpoint { |
| StreamEndpoint { |
| id: info.id().clone(), |
| capabilities, |
| media_type: info.media_type().clone(), |
| endpoint_type: info.endpoint_type().clone(), |
| state: StreamState::Idle, |
| transport: None, |
| stream_held: Arc::new(Mutex::new(false)), |
| remote_id: None, |
| configuration: vec![], |
| update_callback: None, |
| } |
| } |
| |
| /// Attempt to Configure this stream using the capabilities given. |
| /// If the stream is not in an Idle state, fails with Err(InvalidState). |
| /// Used for the Stream Configuration procedure, see Section 6.9 |
| pub fn configure( |
| &mut self, |
| remote_id: &StreamEndpointId, |
| capabilities: Vec<ServiceCapability>, |
| ) -> Result<(), (ServiceCategory, ErrorCode)> { |
| if self.state != StreamState::Idle { |
| return Err((ServiceCategory::None, ErrorCode::BadState)); |
| } |
| self.remote_id = Some(remote_id.clone()); |
| for cap in &capabilities { |
| if !self |
| .capabilities |
| .iter() |
| .any(|y| std::mem::discriminant(cap) == std::mem::discriminant(y)) |
| { |
| return Err((cap.category(), ErrorCode::UnsupportedConfiguration)); |
| } |
| } |
| self.configuration = capabilities; |
| self.set_state(StreamState::Configured); |
| Ok(()) |
| } |
| |
| /// Attempt to reconfigure this stream with the capabilities given. If any capability is not |
| /// valid to set, fails with the first such category and InvalidCapabilities If the stream is |
| /// not in the Open state, fails with Err((None, BadState)) Used for the Stream Reconfiguration |
| /// procedure, see Section 6.15. |
| pub fn reconfigure( |
| &mut self, |
| mut capabilities: Vec<ServiceCapability>, |
| ) -> Result<(), (ServiceCategory, ErrorCode)> { |
| if self.state != StreamState::Open { |
| return Err((ServiceCategory::None, ErrorCode::BadState)); |
| } |
| // Only application capabilities are allowed to be reconfigured. See Section 8.11.1 |
| if let Some(cap) = capabilities.iter().find(|x| !x.is_application()) { |
| return Err((cap.category(), ErrorCode::InvalidCapabilities)); |
| } |
| // Should only replace the capabilities that have been configured. See Section 8.11.2 |
| let to_replace: std::vec::Vec<_> = |
| capabilities.iter().map(|x| std::mem::discriminant(x)).collect(); |
| self.configuration.retain(|x| { |
| let disc = std::mem::discriminant(x); |
| !to_replace.contains(&disc) |
| }); |
| self.configuration.append(&mut capabilities); |
| self.update_callback(); |
| Ok(()) |
| } |
| |
| /// Get the current configuration of this stream. |
| /// If the stream is not configured, returns None. |
| /// Used for the Steam Get Configuration Procedure, see Section 6.10 |
| pub fn get_configuration(&self) -> Option<&Vec<ServiceCapability>> { |
| if !self.state.configured() { |
| return None; |
| } |
| Some(&self.configuration) |
| } |
| |
| // 100 milliseconds chosen based on end of range testing, to allow for recovery after normal |
| // packet delivery continues. |
| const SRC_FLUSH_TIMEOUT: Duration = Duration::from_millis(100); |
| |
| /// When a L2CAP channel is received after an Open command is accepted, it should be |
| /// delivered via receive_channel. |
| /// Returns true if this Endpoint expects more channels to be established before |
| /// streaming is started. |
| /// Returns Err(InvalidState) if this Endpoint is not expecting a channel to be established, |
| /// closing |c|. |
| pub fn receive_channel(&mut self, c: Channel) -> AvdtpResult<bool> { |
| if self.state != StreamState::Opening || self.transport.is_some() { |
| return Err(Error::InvalidState); |
| } |
| self.transport = Some(Arc::new(RwLock::new(c))); |
| self.try_flush_timeout(Self::SRC_FLUSH_TIMEOUT); |
| self.stream_held = Arc::new(Mutex::new(false)); |
| // TODO(jamuraa, https://fxbug.dev/42051664, https://fxbug.dev/42051776): Reporting and Recovery channels |
| self.set_state(StreamState::Open); |
| Ok(false) |
| } |
| |
| /// Begin opening this stream. The stream must be in a Configured state. |
| /// See Stream Establishment, Section 6.11 |
| pub fn establish(&mut self) -> Result<(), ErrorCode> { |
| if self.state != StreamState::Configured || self.transport.is_some() { |
| return Err(ErrorCode::BadState); |
| } |
| self.set_state(StreamState::Opening); |
| Ok(()) |
| } |
| |
| /// Waits for the MediaTransport Channel to be closed up to Duration. |
| /// Returns Err(Status::TIMED_OUT) if the channel didn't close. |
| pub async fn wait_for_channel_close(&self, timeout: Duration) -> Result<(), Status> { |
| if self.transport.is_none() { |
| return Ok(()); |
| } |
| // TODO: this variable triggered the `must_not_suspend` lint and may be held across an await |
| // If this is the case, it is an error. See https://fxbug.dev/42168913 for more details |
| let channel = |
| self.transport.as_ref().unwrap().try_read().map_err(|_e| Status::BAD_STATE)?; |
| let closed_fut = channel.closed(); |
| closed_fut.on_timeout(timeout.after_now(), || Err(Status::TIMED_OUT)).await |
| } |
| |
| /// Attempts to set audio direction priority of the MediaTransport channel based on |
| /// whether the stream is a source or sink endpoint if `active` is true. If `active` is |
| /// false, set the priority to Normal instead. Does nothing on failure. |
| pub fn try_priority(&self, active: bool) { |
| let priority = match (active, &self.endpoint_type) { |
| (false, _) => A2dpDirection::Normal, |
| (true, EndpointType::Source) => A2dpDirection::Source, |
| (true, EndpointType::Sink) => A2dpDirection::Sink, |
| }; |
| let fut = match self.transport.as_ref().unwrap().try_read() { |
| Err(_) => return, |
| Ok(channel) => channel.set_audio_priority(priority).map(|_| ()), |
| }; |
| Task::spawn(fut).detach(); |
| } |
| |
| /// Attempts to set the flush timeout for the MediaTransport channel, for source endpoints. |
| pub fn try_flush_timeout(&self, timeout: Duration) { |
| if self.endpoint_type != EndpointType::Source { |
| return; |
| } |
| let fut = match self.transport.as_ref().unwrap().try_write() { |
| Err(_) => return, |
| Ok(channel) => channel.set_flush_timeout(Some(timeout)).map(|_| ()), |
| }; |
| Task::spawn(fut).detach(); |
| } |
| |
| /// Close this stream. This procedure checks that the media channels are closed. |
| /// If the channels are not closed in 3 seconds, it initiates an abort procedure with the |
| /// remote |peer| and completes when that finishes. |
| pub async fn release<'a>( |
| &'a mut self, |
| responder: SimpleResponder, |
| peer: &'a Peer, |
| ) -> AvdtpResult<()> { |
| if self.state != StreamState::Open && self.state != StreamState::Streaming { |
| return responder.reject(ErrorCode::BadState); |
| } |
| self.set_state(StreamState::Closing); |
| responder.send()?; |
| let timeout = 3.seconds(); |
| if let Err(Status::TIMED_OUT) = self.wait_for_channel_close(timeout).await { |
| return Ok(self.abort(Some(peer)).await); |
| } |
| // Closing returns this endpoint to the Idle state. |
| self.configuration.clear(); |
| self.remote_id = None; |
| self.set_state(StreamState::Idle); |
| Ok(()) |
| } |
| |
| /// Returns the current state of this endpoint. |
| pub fn state(&self) -> &StreamState { |
| &self.state |
| } |
| |
| /// Start this stream. This can be done only from the Open State. |
| /// Used for the Stream Start procedure, See Section 6.12 |
| pub fn start(&mut self) -> Result<(), ErrorCode> { |
| if self.state != StreamState::Open { |
| return Err(ErrorCode::BadState); |
| } |
| self.try_priority(true); |
| self.set_state(StreamState::Streaming); |
| Ok(()) |
| } |
| |
| /// Suspend this stream. This can be done only from the Streaming state. |
| /// Used for the Stream Suspend procedure, See Section 6.14 |
| pub fn suspend(&mut self) -> Result<(), ErrorCode> { |
| if self.state != StreamState::Streaming { |
| return Err(ErrorCode::BadState); |
| } |
| self.set_state(StreamState::Open); |
| self.try_priority(false); |
| Ok(()) |
| } |
| |
| /// Abort this stream. This can be done from any state, and will always return the state |
| /// to Idle. If peer is not None, we are initiating this procedure and all our channels will |
| /// be closed. |
| pub async fn abort<'a>(&'a mut self, peer: Option<&'a Peer>) { |
| if let Some(peer) = peer { |
| if let Some(seid) = &self.remote_id { |
| let _ = peer.abort(&seid).await; |
| self.set_state(StreamState::Aborting); |
| } |
| } |
| self.configuration.clear(); |
| self.remote_id = None; |
| self.transport = None; |
| self.set_state(StreamState::Idle); |
| } |
| |
| /// Capabilities of this StreamEndpoint. |
| /// Provides support for the Get Capabilities and Get All Capabilities signaling procedures. |
| /// See Sections 6.7 and 6.8 |
| pub fn capabilities(&self) -> &Vec<ServiceCapability> { |
| &self.capabilities |
| } |
| |
| /// Returns the CodecType of this StreamEndpoint. |
| /// Returns None if there is no MediaCodec capability in the endpoint. |
| /// Note: a MediaCodec capability is required by all endpoints by the spec. |
| pub fn codec_type(&self) -> Option<&MediaCodecType> { |
| self.capabilities.iter().find_map(|cap| match cap { |
| ServiceCapability::MediaCodec { codec_type, .. } => Some(codec_type), |
| _ => None, |
| }) |
| } |
| |
| /// Returns the local StreamEndpointId for this endpoint. |
| pub fn local_id(&self) -> &StreamEndpointId { |
| &self.id |
| } |
| |
| /// Returns the remote StreamEndpointId for this endpoint, if it's configured. |
| pub fn remote_id(&self) -> Option<&StreamEndpointId> { |
| self.remote_id.as_ref() |
| } |
| |
| /// Returns the EndpointType of this endpoint |
| pub fn endpoint_type(&self) -> &EndpointType { |
| &self.endpoint_type |
| } |
| |
| /// Make a StreamInformation which represents the current state of this stream. |
| pub fn information(&self) -> StreamInformation { |
| StreamInformation::new( |
| self.id.clone(), |
| self.state != StreamState::Idle, |
| self.media_type.clone(), |
| self.endpoint_type.clone(), |
| ) |
| } |
| |
| /// Take the media transport channel, which transmits (or receives) any media for this |
| /// StreamEndpoint. Returns None if the channel is held already, or if the channel has not |
| /// been opened. |
| pub fn take_transport(&mut self) -> Option<MediaStream> { |
| let mut stream_held = self.stream_held.lock(); |
| if *stream_held || self.transport.is_none() { |
| return None; |
| } |
| |
| *stream_held = true; |
| |
| Some(MediaStream::new( |
| self.stream_held.clone(), |
| Arc::downgrade(self.transport.as_ref().unwrap()), |
| )) |
| } |
| } |
| |
| /// Represents a media transport stream. |
| /// If a sink, produces the bytes that have been delivered from the peer. |
| /// If a source, can send bytes using `send` |
| pub struct MediaStream { |
| in_use: Arc<Mutex<bool>>, |
| channel: Weak<RwLock<Channel>>, |
| } |
| |
| impl MediaStream { |
| pub fn new(in_use: Arc<Mutex<bool>>, channel: Weak<RwLock<Channel>>) -> Self { |
| Self { in_use, channel } |
| } |
| |
| fn try_upgrade(&self) -> Result<Arc<RwLock<Channel>>, io::Error> { |
| self.channel |
| .upgrade() |
| .ok_or(io::Error::new(io::ErrorKind::ConnectionAborted, "lost connection")) |
| } |
| |
| pub fn max_tx_size(&self) -> Result<usize, io::Error> { |
| match self.try_upgrade()?.try_read() { |
| Err(_e) => return Err(io::Error::new(io::ErrorKind::WouldBlock, "couldn't lock")), |
| Ok(lock) => Ok(lock.max_tx_size()), |
| } |
| } |
| } |
| |
| impl Drop for MediaStream { |
| fn drop(&mut self) { |
| let mut l = self.in_use.lock(); |
| *l = false; |
| } |
| } |
| |
| impl Stream for MediaStream { |
| type Item = AvdtpResult<Vec<u8>>; |
| |
| fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
| let arc_chan = match self.try_upgrade() { |
| Err(_e) => return Poll::Ready(None), |
| Ok(c) => c, |
| }; |
| let lock = match arc_chan.try_write() { |
| Err(_e) => return Poll::Ready(None), |
| Ok(lock) => lock, |
| }; |
| let mut pin_chan = Pin::new(lock); |
| match pin_chan.as_mut().poll_next(cx) { |
| Poll::Ready(Some(Ok(res))) => Poll::Ready(Some(Ok(res))), |
| Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(Error::PeerRead(e)))), |
| Poll::Ready(None) => Poll::Ready(None), |
| Poll::Pending => Poll::Pending, |
| } |
| } |
| } |
| |
| impl io::AsyncWrite for MediaStream { |
| fn poll_write( |
| self: Pin<&mut Self>, |
| cx: &mut Context<'_>, |
| buf: &[u8], |
| ) -> Poll<Result<usize, io::Error>> { |
| let arc_chan = match self.try_upgrade() { |
| Err(e) => return Poll::Ready(Err(e)), |
| Ok(c) => c, |
| }; |
| let lock = match arc_chan.try_write() { |
| Err(_) => { |
| return Poll::Ready(Err(io::Error::new(io::ErrorKind::WouldBlock, "couldn't lock"))) |
| } |
| Ok(lock) => lock, |
| }; |
| let mut pin_chan = Pin::new(lock); |
| pin_chan.as_mut().poll_write(cx, buf) |
| } |
| |
| fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { |
| let arc_chan = match self.try_upgrade() { |
| Err(e) => return Poll::Ready(Err(e)), |
| Ok(c) => c, |
| }; |
| let lock = match arc_chan.try_write() { |
| Err(_) => { |
| return Poll::Ready(Err(io::Error::new(io::ErrorKind::WouldBlock, "couldn't lock"))) |
| } |
| Ok(lock) => lock, |
| }; |
| let mut pin_chan = Pin::new(lock); |
| pin_chan.as_mut().poll_flush(cx) |
| } |
| |
| fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { |
| let arc_chan = match self.try_upgrade() { |
| Err(e) => return Poll::Ready(Err(e)), |
| Ok(c) => c, |
| }; |
| let lock = match arc_chan.try_write() { |
| Err(_) => { |
| return Poll::Ready(Err(io::Error::new(io::ErrorKind::WouldBlock, "couldn't lock"))) |
| } |
| Ok(lock) => lock, |
| }; |
| let mut pin_chan = Pin::new(lock); |
| pin_chan.as_mut().poll_close(cx) |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use super::*; |
| use crate::{ |
| tests::{expect_remote_recv, recv_remote, setup_peer}, |
| Request, |
| }; |
| |
| use assert_matches::assert_matches; |
| use fidl::endpoints::create_request_stream; |
| use fidl_fuchsia_bluetooth_bredr as bredr; |
| use fuchsia_async as fasync; |
| use fuchsia_zircon as zx; |
| use futures::io::AsyncWriteExt; |
| use futures::stream::StreamExt; |
| |
| const REMOTE_ID_VAL: u8 = 1; |
| const REMOTE_ID: StreamEndpointId = StreamEndpointId(REMOTE_ID_VAL); |
| |
| #[test] |
| fn make() { |
| let s = StreamEndpoint::new( |
| REMOTE_ID_VAL, |
| MediaType::Audio, |
| EndpointType::Sink, |
| vec![ServiceCapability::MediaTransport], |
| ); |
| assert!(s.is_ok()); |
| let s = s.unwrap(); |
| assert_eq!(&StreamEndpointId(1), s.local_id()); |
| |
| let info = s.information(); |
| assert!(!info.in_use()); |
| |
| let no = StreamEndpoint::new( |
| 0, |
| MediaType::Audio, |
| EndpointType::Sink, |
| vec![ServiceCapability::MediaTransport], |
| ); |
| assert!(no.is_err()); |
| } |
| |
| fn establish_stream(s: &mut StreamEndpoint) -> Channel { |
| assert_matches!(s.establish(), Ok(())); |
| let (chan, remote) = Channel::create(); |
| assert_matches!(s.receive_channel(chan), Ok(false)); |
| remote |
| } |
| |
| #[test] |
| fn from_info() { |
| let seid = StreamEndpointId::try_from(5).unwrap(); |
| let info = |
| StreamInformation::new(seid.clone(), false, MediaType::Audio, EndpointType::Sink); |
| let capabilities = vec![ServiceCapability::MediaTransport]; |
| |
| let endpoint = StreamEndpoint::from_info(&info, capabilities); |
| |
| assert_eq!(&seid, endpoint.local_id()); |
| assert_eq!(&false, endpoint.information().in_use()); |
| assert_eq!(1, endpoint.capabilities().len()); |
| } |
| |
| #[test] |
| fn codec_type() { |
| let s = StreamEndpoint::new( |
| REMOTE_ID_VAL, |
| MediaType::Audio, |
| EndpointType::Sink, |
| vec![ |
| ServiceCapability::MediaTransport, |
| ServiceCapability::MediaCodec { |
| media_type: MediaType::Audio, |
| codec_type: MediaCodecType::new(0x40), |
| codec_extra: vec![0xDE, 0xAD, 0xBE, 0xEF], // Meaningless test data. |
| }, |
| ], |
| ) |
| .unwrap(); |
| |
| assert_eq!(Some(&MediaCodecType::new(0x40)), s.codec_type()); |
| |
| let s = StreamEndpoint::new( |
| REMOTE_ID_VAL, |
| MediaType::Audio, |
| EndpointType::Sink, |
| vec![ServiceCapability::MediaTransport], |
| ) |
| .unwrap(); |
| |
| assert_eq!(None, s.codec_type()); |
| } |
| |
| fn test_endpoint(r#type: EndpointType) -> StreamEndpoint { |
| StreamEndpoint::new( |
| REMOTE_ID_VAL, |
| MediaType::Audio, |
| r#type, |
| vec![ |
| ServiceCapability::MediaTransport, |
| ServiceCapability::MediaCodec { |
| media_type: MediaType::Audio, |
| codec_type: MediaCodecType::new(0x40), |
| codec_extra: vec![0xDE, 0xAD, 0xBE, 0xEF], // Meaningless test data. |
| }, |
| ], |
| ) |
| .unwrap() |
| } |
| |
| #[test] |
| fn stream_configure_reconfigure() { |
| let _exec = fasync::TestExecutor::new(); |
| let mut s = test_endpoint(EndpointType::Sink); |
| |
| // Can't configure items that aren't in range. |
| assert_matches!( |
| s.configure(&REMOTE_ID, vec![ServiceCapability::Reporting]), |
| Err((ServiceCategory::Reporting, ErrorCode::UnsupportedConfiguration)) |
| ); |
| |
| assert_matches!( |
| s.configure( |
| &REMOTE_ID, |
| vec![ |
| ServiceCapability::MediaTransport, |
| ServiceCapability::MediaCodec { |
| media_type: MediaType::Audio, |
| codec_type: MediaCodecType::new(0x40), |
| // Change the codec_extra which is typical, ex. SBC (A2DP Spec 4.3.2.6) |
| codec_extra: vec![0x0C, 0x0D, 0x02, 0x51], |
| } |
| ] |
| ), |
| Ok(()) |
| ); |
| |
| // Note: we allow endpoints to be configured (and reconfigured) again when they |
| // are only configured, even though this is probably not allowed per the spec. |
| |
| // Can't configure while open |
| let _channel = establish_stream(&mut s); |
| |
| assert_matches!( |
| s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]), |
| Err((_, ErrorCode::BadState)) |
| ); |
| |
| let reconfiguration = vec![ServiceCapability::MediaCodec { |
| media_type: MediaType::Audio, |
| codec_type: MediaCodecType::new(0x40), |
| // Reconfigure to yet another different codec_extra value. |
| codec_extra: vec![0x0C, 0x0D, 0x0E, 0x0F], |
| }]; |
| |
| // The new configuration should match the previous one, but with the reconfigured |
| // capabilities updated. |
| let new_configuration = vec![ServiceCapability::MediaTransport, reconfiguration[0].clone()]; |
| |
| // Reconfiguring while open is fine though. |
| assert_matches!(s.reconfigure(reconfiguration.clone()), Ok(())); |
| |
| assert_eq!(Some(&new_configuration), s.get_configuration()); |
| |
| // Can't reconfigure non-application types |
| assert_matches!( |
| s.reconfigure(vec![ServiceCapability::MediaTransport]), |
| Err((ServiceCategory::MediaTransport, ErrorCode::InvalidCapabilities)) |
| ); |
| |
| // Can't configure or reconfigure while streaming |
| assert_matches!(s.start(), Ok(())); |
| |
| assert_matches!( |
| s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]), |
| Err((_, ErrorCode::BadState)) |
| ); |
| |
| assert_matches!(s.reconfigure(reconfiguration.clone()), Err((_, ErrorCode::BadState))); |
| |
| assert_matches!(s.suspend(), Ok(())); |
| |
| // Reconfigure should be fine again in open state. |
| assert_matches!(s.reconfigure(reconfiguration.clone()), Ok(())); |
| |
| // Configure is still not allowed. |
| assert_matches!( |
| s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]), |
| Err((_, ErrorCode::BadState)) |
| ); |
| } |
| |
| #[test] |
| fn stream_establishment() { |
| let _exec = fasync::TestExecutor::new(); |
| let mut s = test_endpoint(EndpointType::Sink); |
| |
| let (remote, transport) = Channel::create(); |
| |
| // Can't establish before configuring |
| assert_matches!(s.establish(), Err(ErrorCode::BadState)); |
| |
| // Trying to receive a channel in the wrong state closes the channel |
| assert_matches!(s.receive_channel(transport), Err(Error::InvalidState)); |
| |
| let buf: &mut [u8] = &mut [0; 1]; |
| |
| assert_matches!(remote.as_ref().read(buf), Err(zx::Status::PEER_CLOSED)); |
| |
| assert_matches!(s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]), Ok(())); |
| |
| assert_matches!(s.establish(), Ok(())); |
| |
| // And we should be able to give a channel now. |
| let (_remote, transport) = Channel::create(); |
| assert_matches!(s.receive_channel(transport), Ok(false)); |
| } |
| |
| fn setup_peer_for_release(exec: &mut fasync::TestExecutor) -> (Peer, Channel, SimpleResponder) { |
| let (peer, signaling) = setup_peer(); |
| // Send a close from the other side to produce an event we can respond to. |
| let _ = signaling.as_ref().write(&[0x40, 0x08, 0x04]).expect("signaling write"); |
| let mut req_stream = peer.take_request_stream(); |
| let mut req_fut = req_stream.next(); |
| let complete = exec.run_until_stalled(&mut req_fut); |
| let responder = match complete { |
| Poll::Ready(Some(Ok(Request::Close { responder, .. }))) => responder, |
| _ => panic!("Expected a close request"), |
| }; |
| (peer, signaling, responder) |
| } |
| |
| #[test] |
| fn stream_release_without_abort() { |
| let mut exec = fasync::TestExecutor::new(); |
| let mut s = test_endpoint(EndpointType::Sink); |
| |
| assert_matches!(s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]), Ok(())); |
| |
| let remote_transport = establish_stream(&mut s); |
| |
| let (peer, signaling, responder) = setup_peer_for_release(&mut exec); |
| |
| let mut release_fut = Box::pin(s.release(responder, &peer)); |
| let complete = exec.run_until_stalled(&mut release_fut); |
| |
| // We should still be pending since the transport hasn't been closed. |
| assert!(complete.is_pending()); |
| |
| // Expect a "yes" response. |
| expect_remote_recv(&[0x42, 0x08], &signaling); |
| |
| // Close the transport channel by dropping it. |
| drop(remote_transport); |
| |
| // After the transport is closed the release future should be complete. |
| assert_matches!(exec.run_until_stalled(&mut release_fut), Poll::Ready(Ok(()))); |
| } |
| |
| #[test] |
| fn test_mediastream() { |
| let mut exec = fasync::TestExecutor::new(); |
| let mut s = test_endpoint(EndpointType::Sink); |
| |
| assert_matches!(s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]), Ok(())); |
| |
| // Before the stream is opened, we shouldn't be able to take the transport. |
| assert!(s.take_transport().is_none()); |
| |
| let remote_transport = establish_stream(&mut s); |
| |
| // Should be able to get the transport from the stream now. |
| let temp_stream = s.take_transport(); |
| assert!(temp_stream.is_some()); |
| |
| // But only once |
| assert!(s.take_transport().is_none()); |
| |
| // Until you drop the stream |
| drop(temp_stream); |
| |
| let media_stream = s.take_transport(); |
| assert!(media_stream.is_some()); |
| let mut media_stream = media_stream.unwrap(); |
| |
| // Max TX size is taken from the underlying channel. |
| assert_matches!(media_stream.max_tx_size(), Ok(Channel::DEFAULT_MAX_TX)); |
| |
| // Writing to the media stream should send it through the transport channel. |
| let hearts = &[0xF0, 0x9F, 0x92, 0x96, 0xF0, 0x9F, 0x92, 0x96]; |
| let mut write_fut = media_stream.write(hearts); |
| |
| assert_matches!(exec.run_until_stalled(&mut write_fut), Poll::Ready(Ok(8))); |
| |
| expect_remote_recv(hearts, &remote_transport); |
| |
| // Closing the media stream should close the channel. |
| let mut close_fut = media_stream.close(); |
| assert_matches!(exec.run_until_stalled(&mut close_fut), Poll::Ready(Ok(()))); |
| // Note: there's no effect on the other end of the channel when a close occurs, |
| // until the channel is dropped. |
| |
| drop(s); |
| |
| // Reading from the remote end should fail. |
| let mut result = vec![0]; |
| assert_matches!( |
| remote_transport.as_ref().read(&mut result[..]), |
| Err(zx::Status::PEER_CLOSED) |
| ); |
| |
| // After the stream is gone, any write should return an Err |
| let mut write_fut = media_stream.write(&[0xDE, 0xAD]); |
| assert_matches!(exec.run_until_stalled(&mut write_fut), Poll::Ready(Err(_))); |
| |
| // After the stream is gone, the stream should be fused done. |
| let mut next_fut = media_stream.next(); |
| assert_matches!(exec.run_until_stalled(&mut next_fut), Poll::Ready(None)); |
| |
| // And the Max TX should be an error. |
| assert_matches!(media_stream.max_tx_size(), Err(_)); |
| } |
| |
| #[test] |
| fn stream_release_with_abort() { |
| let mut exec = fasync::TestExecutor::new(); |
| let mut s = test_endpoint(EndpointType::Sink); |
| |
| assert_matches!(s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]), Ok(())); |
| let _remote_transport = establish_stream(&mut s); |
| let (peer, signaling, responder) = setup_peer_for_release(&mut exec); |
| |
| let mut release_fut = Box::pin(s.release(responder, &peer)); |
| let complete = exec.run_until_stalled(&mut release_fut); |
| |
| // We should still be pending since the transport hasn't been closed. |
| assert!(complete.is_pending()); |
| |
| // Expect a "yes" response. |
| expect_remote_recv(&[0x42, 0x08], &signaling); |
| |
| let _ = exec.wake_next_timer(); |
| let complete = exec.run_until_stalled(&mut release_fut); |
| // Now we're waiting on response from the Abort |
| assert!(complete.is_pending()); |
| // Should have got an abort |
| let received = recv_remote(&signaling).unwrap(); |
| assert_eq!(0x0A, received[1]); |
| let txlabel = received[0] & 0xF0; |
| // Send a response |
| assert!(signaling.as_ref().write(&[txlabel | 0x02, 0x0A]).is_ok()); |
| |
| assert_matches!(exec.run_until_stalled(&mut release_fut), Poll::Ready(Ok(()))); |
| } |
| |
| #[test] |
| fn start_and_suspend() { |
| let mut exec = fasync::TestExecutor::new(); |
| let mut s = test_endpoint(EndpointType::Sink); |
| |
| // Can't start or suspend until configured and open. |
| assert_matches!(s.start(), Err(ErrorCode::BadState)); |
| assert_matches!(s.suspend(), Err(ErrorCode::BadState)); |
| |
| assert_matches!(s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]), Ok(())); |
| |
| assert_matches!(s.start(), Err(ErrorCode::BadState)); |
| assert_matches!(s.suspend(), Err(ErrorCode::BadState)); |
| |
| assert_matches!(s.establish(), Ok(())); |
| |
| assert_matches!(s.start(), Err(ErrorCode::BadState)); |
| assert_matches!(s.suspend(), Err(ErrorCode::BadState)); |
| |
| let (remote, local) = zx::Socket::create_datagram(); |
| let (client_end, mut direction_request_stream) = |
| create_request_stream::<bredr::AudioDirectionExtMarker>().unwrap(); |
| let ext = bredr::Channel { |
| socket: Some(local), |
| channel_mode: Some(bredr::ChannelMode::Basic), |
| max_tx_sdu_size: Some(1004), |
| ext_direction: Some(client_end), |
| ..Default::default() |
| }; |
| let transport = Channel::try_from(ext).unwrap(); |
| assert_matches!(s.receive_channel(transport), Ok(false)); |
| |
| // Should be able to start but not suspend now. |
| assert_matches!(s.suspend(), Err(ErrorCode::BadState)); |
| assert_matches!(s.start(), Ok(())); |
| |
| match exec.run_until_stalled(&mut direction_request_stream.next()) { |
| Poll::Ready(Some(Ok(bredr::AudioDirectionExtRequest::SetPriority { |
| priority, |
| responder, |
| }))) => { |
| assert_eq!(bredr::A2dpDirectionPriority::Sink, priority); |
| responder.send(Ok(())).expect("response to send cleanly"); |
| } |
| x => panic!("Expected a item to be ready on the request stream, got {:?}", x), |
| }; |
| |
| // Are started, so we should be able to suspend but not start again here. |
| assert_matches!(s.start(), Err(ErrorCode::BadState)); |
| assert_matches!(s.suspend(), Ok(())); |
| |
| match exec.run_until_stalled(&mut direction_request_stream.next()) { |
| Poll::Ready(Some(Ok(bredr::AudioDirectionExtRequest::SetPriority { |
| priority, |
| responder, |
| }))) => { |
| assert_eq!(bredr::A2dpDirectionPriority::Normal, priority); |
| responder.send(Ok(())).expect("response to send cleanly"); |
| } |
| x => panic!("Expected a item to be ready on the request stream, got {:?}", x), |
| }; |
| |
| // Now we're suspended, so we can start it again. |
| assert_matches!(s.start(), Ok(())); |
| assert_matches!(s.suspend(), Ok(())); |
| |
| // After we close, we are back at idle and can't start / stop |
| let (peer, signaling, responder) = setup_peer_for_release(&mut exec); |
| |
| { |
| let mut release_fut = Box::pin(s.release(responder, &peer)); |
| let complete = exec.run_until_stalled(&mut release_fut); |
| |
| // We should still be pending since the transport hasn't been closed. |
| assert!(complete.is_pending()); |
| |
| // Expect a "yes" response. |
| expect_remote_recv(&[0x42, 0x08], &signaling); |
| |
| // Close the transport channel by dropping it. |
| drop(remote); |
| |
| // After the channel is closed we should be done. |
| assert_matches!(exec.run_until_stalled(&mut release_fut), Poll::Ready(Ok(()))); |
| } |
| |
| // Shouldn't be able to start or suspend again. |
| assert_matches!(s.start(), Err(ErrorCode::BadState)); |
| assert_matches!(s.suspend(), Err(ErrorCode::BadState)); |
| } |
| |
| fn receive_l2cap_params_channel( |
| s: &mut StreamEndpoint, |
| ) -> (zx::Socket, bredr::L2capParametersExtRequestStream) { |
| assert_matches!(s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]), Ok(())); |
| assert_matches!(s.establish(), Ok(())); |
| |
| let (remote, local) = zx::Socket::create_datagram(); |
| let (client_end, l2cap_params_requests) = |
| create_request_stream::<bredr::L2capParametersExtMarker>().unwrap(); |
| let ext = bredr::Channel { |
| socket: Some(local), |
| channel_mode: Some(bredr::ChannelMode::Basic), |
| max_tx_sdu_size: Some(1004), |
| ext_l2cap: Some(client_end), |
| ..Default::default() |
| }; |
| let transport = Channel::try_from(ext).unwrap(); |
| assert_matches!(s.receive_channel(transport), Ok(false)); |
| (remote, l2cap_params_requests) |
| } |
| |
| #[test] |
| fn sets_flush_timeout_for_source_transports() { |
| let mut exec = fasync::TestExecutor::new(); |
| let mut s = test_endpoint(EndpointType::Source); |
| let (_remote, mut l2cap_params_requests) = receive_l2cap_params_channel(&mut s); |
| |
| // Should request to set the flush timeout. |
| match exec.run_until_stalled(&mut l2cap_params_requests.next()) { |
| Poll::Ready(Some(Ok(bredr::L2capParametersExtRequest::RequestParameters { |
| request, |
| responder, |
| }))) => { |
| assert_eq!( |
| Some(StreamEndpoint::SRC_FLUSH_TIMEOUT.into_nanos()), |
| request.flush_timeout |
| ); |
| responder.send(&request).expect("response to send cleanly"); |
| } |
| x => panic!("Expected a item to be ready on the request stream, got {:?}", x), |
| }; |
| } |
| |
| #[test] |
| fn no_flush_timeout_for_sink_transports() { |
| let mut exec = fasync::TestExecutor::new(); |
| let mut s = test_endpoint(EndpointType::Sink); |
| let (_remote, mut l2cap_params_requests) = receive_l2cap_params_channel(&mut s); |
| |
| // Should NOT request to set the flush timeout. |
| match exec.run_until_stalled(&mut l2cap_params_requests.next()) { |
| Poll::Pending => {} |
| x => panic!("Expected no request to set flush timeout, got {:?}", x), |
| }; |
| } |
| |
| #[test] |
| fn get_configuration() { |
| let mut exec = fasync::TestExecutor::new(); |
| let mut s = test_endpoint(EndpointType::Sink); |
| |
| // Can't get configuration if we aren't configured. |
| assert!(s.get_configuration().is_none()); |
| |
| let config = vec![ |
| ServiceCapability::MediaTransport, |
| ServiceCapability::MediaCodec { |
| media_type: MediaType::Audio, |
| codec_type: MediaCodecType::new(0), |
| // Change the codec_extra which is typical, ex. SBC (A2DP Spec 4.3.2.6) |
| codec_extra: vec![0x60, 0x0D, 0x02, 0x55], |
| }, |
| ]; |
| |
| assert_matches!(s.configure(&REMOTE_ID, config.clone()), Ok(())); |
| |
| match s.get_configuration() { |
| Some(c) => assert_eq!(&config, c), |
| x => panic!("Expected Ok from get_configuration but got {:?}", x), |
| }; |
| |
| { |
| // Abort this stream, putting it back to the idle state. |
| let mut abort_fut = Box::pin(s.abort(None)); |
| let complete = exec.run_until_stalled(&mut abort_fut); |
| assert_matches!(complete, Poll::Ready(())); |
| } |
| |
| assert!(s.get_configuration().is_none()); |
| } |
| |
| use std::sync::atomic::{AtomicUsize, Ordering}; |
| |
| /// Create a callback that tracks how many times it has been called |
| fn call_count_callback() -> (Option<StreamEndpointUpdateCallback>, Arc<AtomicUsize>) { |
| let call_count = Arc::new(AtomicUsize::new(0)); |
| let call_count_reader = call_count.clone(); |
| let count_cb: StreamEndpointUpdateCallback = Box::new(move |_stream: &StreamEndpoint| { |
| let _ = call_count.fetch_add(1, Ordering::SeqCst); |
| }); |
| (Some(count_cb), call_count_reader) |
| } |
| |
| /// Test that the update callback is run at least once for all methods that mutate the state of |
| /// the StreamEndpoint. This is done through an atomic counter in the callback that increments |
| /// when the callback is run. |
| /// |
| /// Note that the _results_ of calling these mutating methods on the state of StreamEndpoint are |
| /// not validated here. They are validated in other tests. |
| #[test] |
| fn update_callback() { |
| let mut exec = fasync::TestExecutor::new(); |
| let mut s = test_endpoint(EndpointType::Sink); |
| let (cb, call_count) = call_count_callback(); |
| s.set_update_callback(cb); |
| |
| s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]) |
| .expect("Configure to succeed in test"); |
| assert!(call_count.load(Ordering::SeqCst) > 0, "Update callback called at least once"); |
| call_count.store(0, Ordering::SeqCst); // clear call count |
| |
| s.establish().expect("Establish to succeed in test"); |
| assert!(call_count.load(Ordering::SeqCst) > 0, "Update callback called at least once"); |
| call_count.store(0, Ordering::SeqCst); // clear call count |
| |
| let (_, transport) = Channel::create(); |
| assert_eq!( |
| s.receive_channel(transport).expect("Receive channel to succeed in test"), |
| false |
| ); |
| assert!(call_count.load(Ordering::SeqCst) > 0, "Update callback called at least once"); |
| call_count.store(0, Ordering::SeqCst); // clear call count |
| |
| s.start().expect("Start to succeed in test"); |
| assert!(call_count.load(Ordering::SeqCst) > 0, "Update callback called at least once"); |
| call_count.store(0, Ordering::SeqCst); // clear call count |
| |
| s.suspend().expect("Suspend to succeed in test"); |
| assert!(call_count.load(Ordering::SeqCst) > 0, "Update callback called at least once"); |
| call_count.store(0, Ordering::SeqCst); // clear call count |
| |
| s.reconfigure(vec![]).expect("Reconfigure to succeed in test"); |
| assert!(call_count.load(Ordering::SeqCst) > 0, "Update callback called at least once"); |
| call_count.store(0, Ordering::SeqCst); // clear call count |
| |
| { |
| // Abort this stream, putting it back to the idle state. |
| let mut abort_fut = Box::pin(s.abort(None)); |
| let _ = exec.run_until_stalled(&mut abort_fut); |
| assert!(call_count.load(Ordering::SeqCst) > 0, "Update callback called at least once"); |
| } |
| } |
| } |