blob: 3f70af1ac4b42e40f334836b80d7b9afe118c081 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#![deny(warnings)]
use {
fuchsia_async::{self as fasync, Timer},
fuchsia_zircon::{Duration, Status, Time},
futures::{stream::Stream, task::LocalWaker, Poll},
parking_lot::Mutex,
std::{pin::Pin, sync::Arc, sync::Weak},
};
use crate::{
types::{
EndpointType, Error, ErrorCode, MediaType, Result, ServiceCapability, StreamEndpointId,
StreamInformation, TryFrom,
},
Peer, SimpleResponder,
};
#[derive(PartialEq)]
enum StreamState {
Idle,
Configured,
// An Open command has been accepted, but streams have not been established yet.
Opening,
Open,
Streaming,
Closing,
Aborting,
}
/// An AVDTP Transport Stream, which implements the Basic service
/// See Section 7.2
/// Audio frames are currently not delivered anywhere, and are counted and dropped.
/// TODO(jamuraa): setup a delivery mechanism that is compatible with Media
pub struct StreamEndpoint {
/// Local stream endpoint id. This should be unique per AVDTP Peer.
id: StreamEndpointId,
/// The type of endpoint this is (TSEP), Source or Sink.
endpoint_type: EndpointType,
/// The media type this stream represents.
media_type: MediaType,
/// Current state the stream is in. See Section 6.5 for an overview.
state: StreamState,
/// The media transport socket.
/// This should be Some(socket) when state is Open or Streaming.
transport: Option<Arc<fasync::Socket>>,
/// True when the MediaStream is held.
/// Prevents multiple threads from owning the media stream.
stream_held: Arc<Mutex<bool>>,
/// The capabilities of this endpoint.
capabilities: Vec<ServiceCapability>,
/// The remote stream endpoint id. None if the stream has never been configured.
remote_id: Option<StreamEndpointId>,
/// The current configuration of this endpoint. Empty if the stream has never been configured.
configuration: Vec<ServiceCapability>,
}
impl StreamEndpoint {
/// Make a new StreamEndpoint.
/// |id| must be in the valid range for a StreamEndpointId (0x01 - 0x3E).
/// StreamEndpooints start in the Idle state.
pub fn new(
id: u8, media_type: MediaType, endpoint_type: EndpointType,
capabilities: Vec<ServiceCapability>,
) -> Result<StreamEndpoint> {
let seid = StreamEndpointId::try_from(id)?;
Ok(StreamEndpoint {
id: seid,
capabilities: capabilities,
media_type: media_type,
endpoint_type: endpoint_type,
state: StreamState::Idle,
transport: None,
stream_held: Arc::new(Mutex::new(false)),
remote_id: None,
configuration: vec![],
})
}
/// Attempt to Configure this stream using the capabilities given.
/// If the stream is not in an Idle state, fails with Err(InvalidState).
/// Used for the Stream Configuration procedure, see Section 6.9
pub fn configure(
&mut self, remote_id: &StreamEndpointId, capabilities: Vec<ServiceCapability>,
) -> Result<()> {
if self.state != StreamState::Idle {
return Err(Error::InvalidState);
}
self.remote_id = Some(remote_id.clone());
for cap in &capabilities {
if !self
.capabilities
.iter()
.any(|y| std::mem::discriminant(cap) == std::mem::discriminant(y))
{
return Err(Error::OutOfRange);
}
}
self.configuration = capabilities;
self.state = StreamState::Configured;
Ok(())
}
/// Attempt to reconfigure this stream with the capabilities given.
/// If the capabilities are not valid to set, fails with Err(OutOfRange)
/// If the stream is not in the Open state, fails with Err(InvalidState)
/// Used for the Stream Reconfiguration procedure, see Section 6.15.
pub fn reconfigure(&mut self, mut capabilities: Vec<ServiceCapability>) -> Result<()> {
if self.state != StreamState::Open {
return Err(Error::InvalidState);
}
// Only application capabilities are allowed to be reconfigured. See Section 8.11.1
if capabilities.iter().any(|x| !x.is_application()) {
return Err(Error::OutOfRange);
}
// Should only replace the capabilities that have been reconfigured. See Section 8.11.2
let to_replace: std::vec::Vec<_> = capabilities
.iter()
.map(|x| std::mem::discriminant(x))
.collect();
self.capabilities.retain(|x| {
let disc = std::mem::discriminant(x);
!to_replace.contains(&disc)
});
self.capabilities.append(&mut capabilities);
Ok(())
}
/// When a L2CAP channel is received after an Open command is accepted, it should be
/// delivered via receive_channel.
/// Returns true if this Endpoint expects more channels to be established before
/// streaming is started.
/// Returns Err(InvalidState) if this Endpoint is not expecting a channel to be established,
/// closing |c|.
pub fn receive_channel(&mut self, c: fasync::Socket) -> Result<bool> {
if self.state != StreamState::Opening || self.transport.is_some() {
return Err(Error::InvalidState);
}
self.transport = Some(Arc::new(c));
self.stream_held = Arc::new(Mutex::new(false));
// TODO(jamuraa, NET-1674, NET-1675): Reporting and Recovery channels
self.state = StreamState::Open;
Ok(false)
}
/// Begin opening this stream. The stream must be in a Configured state.
/// See Stream Establishment, Section 6.11
pub fn establish(&mut self) -> Result<()> {
if self.state != StreamState::Configured || self.transport.is_some() {
return Err(Error::InvalidState);
}
self.state = StreamState::Opening;
Ok(())
}
/// Close this stream. This procedure checks that the media channels are closed.
/// If the channels are not closed in 3 seconds, it initates an abort prodecure with the
/// remote |peer| and returns the result of that.
pub async fn release<'a>(
&'a mut self, responder: SimpleResponder, peer: &'a Peer,
) -> Result<()> {
if self.state != StreamState::Open && self.state != StreamState::Streaming {
return responder.reject(ErrorCode::BadState);
}
self.state = StreamState::Closing;
responder.send()?;
// TODO(jamuraa): Replace this with a select! on this and a future that completes when
// the channels are all closed.
await!(Timer::new(Time::after(Duration::from_seconds(3))));
if let Some(sock) = self.transport.as_ref() {
// Check to see if the transport is still alive.
match sock.as_ref().as_ref().read(&mut []) {
Err(Status::PEER_CLOSED) => (),
_ => return await!(self.abort(Some(peer))),
};
}
// Closing returns this endpoint to the Idle state.
self.configuration.clear();
self.remote_id = None;
self.state = StreamState::Idle;
Ok(())
}
/// Start this stream. This can be done only from the Open State.
/// Used for the Stream Start procedure, See Section 6.12
pub fn start(&mut self) -> Result<()> {
if self.state != StreamState::Open {
return Err(Error::InvalidState);
}
self.state = StreamState::Streaming;
Ok(())
}
/// Suspend this stream. This can be done only from the Streaming state.
/// Used for the Stream Suspend procedure, See Section 6.14
pub fn suspend(&mut self) -> Result<()> {
if self.state != StreamState::Streaming {
return Err(Error::InvalidState);
}
self.state = StreamState::Open;
Ok(())
}
/// Abort this stream. This can be done from any state, and will always return the state
/// to Idle. If peer is not None, we are initiating this procedure and all our channels will
/// be closed.
pub async fn abort<'a>(&'a mut self, peer: Option<&'a Peer>) -> Result<()> {
if let Some(peer) = peer {
let seid = self.remote_id.as_ref().unwrap();
let _ = await!(peer.abort(seid));
self.state = StreamState::Aborting;
}
self.configuration.clear();
self.remote_id = None;
self.transport = None;
self.state = StreamState::Idle;
Ok(())
}
/// Capabilities of this StreamEndpoint.
/// Provides support for the Get Capabilities and Get All Capabilities signaling procedures.
/// See Sections 6.7 and 6.8
pub fn capabilities(&self) -> &Vec<ServiceCapability> {
&self.capabilities
}
/// Returns the local StreamEndpointId for this endpoint.
pub fn local_id(&self) -> &StreamEndpointId {
&self.id
}
/// Make a StreamInforamtion which represents the current state of this stream.
pub fn information(&self) -> StreamInformation {
StreamInformation::new(
self.id.clone(),
self.state != StreamState::Idle,
self.media_type.clone(),
self.endpoint_type.clone(),
)
}
/// Take the media stream, which transmits (or receives) any media for this StreamEndpoint.
/// Panics if the media stream is alraedy taken, or if the stream is not open.
pub fn take_transport(&mut self) -> MediaStream {
let mut lock = self.stream_held.lock();
assert!(
!*lock && self.transport.is_some(),
"Media stream has already been taken."
);
*lock = true;
MediaStream {
lock: self.stream_held.clone(),
stream: Arc::downgrade(self.transport.as_ref().unwrap()),
}
}
}
/// Represents the stream of media.
/// Currently just a weak pointer to the transport socket.
/// TODO(jamuraa): parse transport sockets, make this into a real asynchronous socket that
/// produces and/or takes media frames.
pub struct MediaStream {
lock: Arc<Mutex<bool>>,
stream: Weak<fasync::Socket>,
}
impl Drop for MediaStream {
fn drop(&mut self) {
let mut l = self.lock.lock();
*l = false;
}
}
impl Stream for MediaStream {
type Item = Result<Vec<u8>>;
fn poll_next(self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Option<Self::Item>> {
let s = match self.stream.upgrade() {
None => return Poll::Ready(None),
Some(s) => s,
};
let mut res = Vec::<u8>::new();
match s.poll_datagram(&mut res, lw) {
Poll::Ready(Ok(_size)) => Poll::Ready(Some(Ok(res))),
Poll::Ready(Err(e)) => Poll::Ready(Some(Err(Error::PeerRead(e)))),
Poll::Pending => Poll::Pending,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
tests::{expect_remote_recv, recv_remote, setup_peer},
types::MediaCodecType,
Request,
};
use fuchsia_zircon as zx;
use futures::stream::StreamExt;
const REMOTE_ID_VAL: u8 = 1;
const REMOTE_ID: StreamEndpointId = StreamEndpointId(REMOTE_ID_VAL);
#[test]
fn make() {
let s = StreamEndpoint::new(
REMOTE_ID_VAL,
MediaType::Audio,
EndpointType::Sink,
vec![ServiceCapability::MediaTransport],
);
assert!(s.is_ok());
let s = s.unwrap();
assert_eq!(&StreamEndpointId(1), s.local_id());
let info = s.information();
assert!(!info.in_use());
let no = StreamEndpoint::new(
0,
MediaType::Audio,
EndpointType::Sink,
vec![ServiceCapability::MediaTransport],
);
assert!(no.is_err());
}
fn establish_stream(s: &mut StreamEndpoint) -> zx::Socket {
assert_eq!(Ok(()), s.establish());
let (remote, transport) = zx::Socket::create(zx::SocketOpts::DATAGRAM).unwrap();
assert_eq!(
Ok(false),
s.receive_channel(fasync::Socket::from_socket(transport).unwrap())
);
remote
}
#[test]
fn stream_configure_reconfigure() {
let _exec = fasync::Executor::new().expect("failed to create an executor");
let mut s = StreamEndpoint::new(
REMOTE_ID_VAL,
MediaType::Audio,
EndpointType::Sink,
vec![
ServiceCapability::MediaTransport,
ServiceCapability::MediaCodec {
media_type: MediaType::Audio,
codec_type: MediaCodecType::new(0x40),
codec_extra: vec![0xDE, 0xAD, 0xBE, 0xEF],
},
],
)
.unwrap();
// Can't configure items that aren't in range.
assert_eq!(
Err(Error::OutOfRange),
s.configure(&REMOTE_ID, vec![ServiceCapability::Reporting])
);
assert_eq!(
Ok(()),
s.configure(
&REMOTE_ID,
vec![
ServiceCapability::MediaTransport,
ServiceCapability::MediaCodec {
media_type: MediaType::Audio,
codec_type: MediaCodecType::new(0x40),
codec_extra: vec![0x0C, 0x0D, 0x0E, 0x0F],
}
]
)
);
// Note: we allow devices to be configured (and reconfigured) again when they are
// just configured, even though this is proabably not allowed per the spec.
// Can't configure while open
establish_stream(&mut s);
assert_eq!(
Err(Error::InvalidState),
s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport])
);
// Reconfiguring while open is fine though.
assert_eq!(
Ok(()),
s.reconfigure(vec![ServiceCapability::MediaCodec {
media_type: MediaType::Audio,
codec_type: MediaCodecType::new(0x40),
codec_extra: vec![0x0C, 0x0D, 0x0E, 0x0F],
}])
);
// Can't reconfigure non-application types
assert_eq!(
Err(Error::OutOfRange),
s.reconfigure(vec![ServiceCapability::MediaTransport])
);
// Can't configure or reconfigure while streaming
assert_eq!(Ok(()), s.start());
assert_eq!(
Err(Error::InvalidState),
s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport])
);
assert_eq!(
Err(Error::InvalidState),
s.reconfigure(vec![ServiceCapability::MediaCodec {
media_type: MediaType::Audio,
codec_type: MediaCodecType::new(0x40),
codec_extra: vec![0x0C, 0x0D, 0x0E, 0x0F],
}])
);
assert_eq!(Ok(()), s.suspend());
// Reconfigure should be fine again in open state.
assert_eq!(
Ok(()),
s.reconfigure(vec![ServiceCapability::MediaCodec {
media_type: MediaType::Audio,
codec_type: MediaCodecType::new(0x40),
codec_extra: vec![0x0C, 0x0D, 0x0E, 0x0F],
}])
);
// Configure is stil not allowed.
assert_eq!(
Err(Error::InvalidState),
s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport])
);
}
#[test]
fn stream_establishment() {
let _exec = fasync::Executor::new().expect("failed to create an executor");
let mut s = StreamEndpoint::new(
REMOTE_ID_VAL,
MediaType::Audio,
EndpointType::Sink,
vec![ServiceCapability::MediaTransport],
)
.unwrap();
let (remote, transport) = zx::Socket::create(zx::SocketOpts::DATAGRAM).unwrap();
// Can't establish before configuring
assert_eq!(Err(Error::InvalidState), s.establish());
// Trying to receive a channel in the wrong state closes the channel
assert_eq!(
Err(Error::InvalidState),
s.receive_channel(fasync::Socket::from_socket(transport).unwrap())
);
let buf: &mut [u8] = &mut [0; 1];
assert_eq!(Err(zx::Status::PEER_CLOSED), remote.read(buf));
assert_eq!(
Ok(()),
s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport])
);
assert_eq!(Ok(()), s.establish());
// And we should be able to give a channel now.
let (_remote, transport) = zx::Socket::create(zx::SocketOpts::DATAGRAM).unwrap();
assert_eq!(
Ok(false),
s.receive_channel(fasync::Socket::from_socket(transport).unwrap())
);
}
fn setup_peer_for_release(exec: &mut fasync::Executor) -> (Peer, zx::Socket, SimpleResponder) {
let (peer, signaling) = setup_peer();
// Send a close from the other side to produce an event we can respond to.
signaling.write(&[0x40, 0x08, 0x04]).is_ok();
let mut req_stream = peer.take_request_stream();
let mut req_fut = req_stream.next();
let complete = exec.run_until_stalled(&mut req_fut);
let responder = match complete {
Poll::Ready(Some(Ok(Request::Close { responder, .. }))) => responder,
_ => panic!("Expected a close request"),
};
(peer, signaling, responder)
}
#[test]
fn stream_release_without_abort() {
let mut exec = fasync::Executor::new().expect("failed to create an executor");
let mut s = StreamEndpoint::new(
REMOTE_ID_VAL,
MediaType::Audio,
EndpointType::Sink,
vec![ServiceCapability::MediaTransport],
)
.unwrap();
assert_eq!(
Ok(()),
s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport])
);
let remote_transport = establish_stream(&mut s);
let (peer, signaling, responder) = setup_peer_for_release(&mut exec);
let mut release_fut = Box::pin(s.release(responder, &peer));
let complete = exec.run_until_stalled(&mut release_fut);
// We should still be pending since the transport hasn't been closed.
assert!(complete.is_pending());
// Expect a "yes" response.
expect_remote_recv(&[0x42, 0x08], &signaling);
// Close the transport socket by dropping it.
drop(remote_transport);
// TODO(jamuraa): We need to wait until the timer expires for now.
exec.wake_next_timer();
assert_eq!(
Poll::Ready(Ok(())),
exec.run_until_stalled(&mut release_fut)
);
}
#[test]
fn stream_release_with_abort() {
let mut exec = fasync::Executor::new().expect("failed to create an executor");
let mut s = StreamEndpoint::new(
REMOTE_ID_VAL,
MediaType::Audio,
EndpointType::Sink,
vec![ServiceCapability::MediaTransport],
)
.unwrap();
assert_eq!(
Ok(()),
s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport])
);
let _remote_transport = establish_stream(&mut s);
let (peer, signaling, responder) = setup_peer_for_release(&mut exec);
let mut release_fut = Box::pin(s.release(responder, &peer));
let complete = exec.run_until_stalled(&mut release_fut);
// We should still be pending since the transport hasn't been closed.
assert!(complete.is_pending());
// Expect a "yes" response.
expect_remote_recv(&[0x42, 0x08], &signaling);
// TODO(jamuraa): We need to wait until the timer expires for now.
exec.wake_next_timer();
let complete = exec.run_until_stalled(&mut release_fut);
// Now we're wairing on response from the Abort
assert!(complete.is_pending());
// Should have got an abort
let received = recv_remote(&signaling).unwrap();
assert_eq!(0x0A, received[1]);
let txlabel = received[0] & 0xF0;
// Send a response
assert!(signaling.write(&[txlabel | 0x02, 0x0A]).is_ok());
assert_eq!(
Poll::Ready(Ok(())),
exec.run_until_stalled(&mut release_fut)
);
}
#[test]
fn start_and_suspend() {
let mut exec = fasync::Executor::new().expect("failed to create an executor");
let mut s = StreamEndpoint::new(
REMOTE_ID_VAL,
MediaType::Audio,
EndpointType::Sink,
vec![ServiceCapability::MediaTransport],
)
.unwrap();
// Can't start or suspend until configured and open.
assert_eq!(Err(Error::InvalidState), s.start());
assert_eq!(Err(Error::InvalidState), s.suspend());
assert_eq!(
Ok(()),
s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport])
);
assert_eq!(Err(Error::InvalidState), s.start());
assert_eq!(Err(Error::InvalidState), s.suspend());
assert_eq!(Ok(()), s.establish());
assert_eq!(Err(Error::InvalidState), s.start());
assert_eq!(Err(Error::InvalidState), s.suspend());
let (remote, transport) = zx::Socket::create(zx::SocketOpts::DATAGRAM).unwrap();
assert_eq!(
Ok(false),
s.receive_channel(fasync::Socket::from_socket(transport).unwrap())
);
// Should be able to start but not suspend now.
assert_eq!(Err(Error::InvalidState), s.suspend());
assert_eq!(Ok(()), s.start());
// Are started, so we should be able to suspend but not start again here.
assert_eq!(Err(Error::InvalidState), s.start());
assert_eq!(Ok(()), s.suspend());
// Now we're suspended, so we can start it again.
assert_eq!(Ok(()), s.start());
assert_eq!(Ok(()), s.suspend());
// After we close, we are back at idle and can't start / stop
let (peer, signaling, responder) = setup_peer_for_release(&mut exec);
{
let mut release_fut = Box::pin(s.release(responder, &peer));
let complete = exec.run_until_stalled(&mut release_fut);
// We should still be pending since the transport hasn't been closed.
assert!(complete.is_pending());
// Expect a "yes" response.
expect_remote_recv(&[0x42, 0x08], &signaling);
// Close the transport socket by dropping it.
drop(remote);
// TODO(jamuraa): We need to wait until the timer expires for now.
exec.wake_next_timer();
assert_eq!(
Poll::Ready(Ok(())),
exec.run_until_stalled(&mut release_fut)
);
}
// Shouldn't be able to start or suspend again.
assert_eq!(Err(Error::InvalidState), s.start());
assert_eq!(Err(Error::InvalidState), s.suspend());
}
#[test]
fn media_delivery() {}
}