blob: 54703dfe66d003b9d13a21969fd792fefe197773 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#![feature(async_await, await_macro, futures_api)]
use {
fuchsia_async as fasync,
fuchsia_syslog::{fx_log_info, fx_log_warn, fx_vlog},
fuchsia_zircon::{self as zx, Duration, Time},
futures::{
future::FusedFuture,
ready, select,
stream::Stream,
task::{LocalWaker, Poll, Waker},
FutureExt,
},
parking_lot::Mutex,
slab::Slab,
std::{collections::VecDeque, marker::Unpin, mem, pin::Pin, result, sync::Arc},
};
#[cfg(test)]
mod tests;
mod stream_endpoint;
mod types;
use crate::types::{
Decodable, Encodable, SignalIdentifier, SignalingHeader, SignalingMessageType, TryFrom, TxLabel,
};
pub use crate::{
stream_endpoint::{MediaStream, StreamEndpoint},
types::{
ContentProtectionType, EndpointType, Error, ErrorCode, MediaCodecType, MediaType, Result,
ServiceCapability, StreamEndpointId, StreamInformation,
},
};
/// An AVDTP signaling peer can send commands to another peer, receive requests and send responses.
/// Media transport is not handled by this peer.
///
/// Requests from the distant peer are delivered through the request stream available through
/// take_request_stream(). Only one RequestStream can be active at a time. Only valid requests
/// are sent to the request stream - invalid formats are automatically rejected.
///
/// Responses are sent using responders that are included in the request stream from the connected
/// peer.
#[derive(Debug)]
pub struct Peer {
inner: Arc<PeerInner>,
}
impl Peer {
/// Create a new peer from a signaling channel socket.
pub fn new(signaling: zx::Socket) -> result::Result<Peer, zx::Status> {
Ok(Peer {
inner: Arc::new(PeerInner {
signaling: fasync::Socket::from_socket(signaling)?,
response_waiters: Mutex::new(Slab::<ResponseWaiter>::new()),
incoming_requests: Mutex::<RequestQueue>::default(),
}),
})
}
/// Take the event listener for this peer. Panics if the stream is already
/// held.
pub fn take_request_stream(&self) -> RequestStream {
{
let mut lock = self.inner.incoming_requests.lock();
if let RequestListener::None = lock.listener {
lock.listener = RequestListener::New;
} else {
panic!("Request stream has already been taken");
}
}
RequestStream {
inner: self.inner.clone(),
}
}
/// Send a Stream End Point Discovery (Sec 8.6) command to the remote peer.
/// Asynchronously returns a the reply in a vector of endpoint information.
/// Error will be RemoteRejected with the error code returned by the remote
/// if the remote peer rejected the command.
pub async fn discover(&self) -> Result<Vec<StreamInformation>> {
let response: Result<DiscoverResponse> =
await!(self.send_command(SignalIdentifier::Discover, &[]));
match response {
Ok(response) => Ok(response.endpoints),
Err(e) => Err(e),
}
}
/// Send a Get Capabilities (Sec 8.7) command to the remote peer for the
/// given `stream_id`.
/// Asynchronously returns the reply which contains the ServiceCapabilities
/// reported.
/// In general, Get All Capabilities should be preferred to this command.
/// Error will be RemoteRejected with the error code reported by the remote
/// if the remote peer rejects the command.
pub async fn get_capabilities<'a>(
&'a self, stream_id: &'a StreamEndpointId,
) -> Result<Vec<ServiceCapability>> {
let stream_params = &[stream_id.to_msg()];
let response: Result<GetCapabilitiesResponse> =
await!(self.send_command(SignalIdentifier::GetCapabilities, stream_params));
match response {
Ok(response) => Ok(response.capabilities),
Err(e) => Err(e),
}
}
/// Send a Get All Capabilities (Sec 8.8) command to the remote peer for the
/// given `stream_id`.
/// Asynchronously returns the reply which contains the ServiceCapabilities
/// reported.
/// Error will be RemoteRejected with the error code reported by the remote
/// if the remote peer rejects the command.
pub async fn get_all_capabilities<'a>(
&'a self, stream_id: &'a StreamEndpointId,
) -> Result<Vec<ServiceCapability>> {
let stream_params = &[stream_id.to_msg()];
let response: Result<GetCapabilitiesResponse> =
await!(self.send_command(SignalIdentifier::GetAllCapabilities, stream_params));
match response {
Ok(response) => Ok(response.capabilities),
Err(e) => Err(e),
}
}
/// Send a Stream Configuration (Sec 8.9) command to the remote peer for the
/// given remote `stream_id`, communicating the association to a local
/// `local_stream_id` and the required stream `capabilities`.
/// Panics if `capabilities` is empty.
/// Returns Ok(()) if the command was accepted, and RemoteConfigRejected
/// if the remote refused.
pub async fn set_configuration<'a>(
&'a self, stream_id: &'a StreamEndpointId, local_stream_id: &'a StreamEndpointId,
capabilities: &'a [ServiceCapability],
) -> Result<()> {
assert!(!capabilities.is_empty(), "must set at least one capability");
let mut params: Vec<u8> = Vec::new();
params.resize(capabilities.iter().fold(2, |a, x| a + x.encoded_len()), 0);
params[0] = stream_id.to_msg();
params[1] = local_stream_id.to_msg();
let mut idx = 2;
for capability in capabilities {
capability.encode(&mut params[idx..])?;
idx += capability.encoded_len();
}
let response: Result<SimpleResponse> =
await!(self.send_command(SignalIdentifier::SetConfiguration, &params));
response.and(Ok(()))
}
/// Send a Stream Reconfigure (Sec 8.11) command to the remote peer for the
/// given remote `stream_id`, to reconfigure the Application Service
/// capabilities in `capabilities`.
/// Note: Per the spec, only the Media Codec and Content Protection
/// capablities will be accepted in this command.
/// Panics if there are no capabilities to configure.
/// Returns Ok(()) if the command was accepted, and RemoteConfigRejected
/// if the remote refused.
pub async fn reconfigure<'a>(
&'a self, stream_id: &'a StreamEndpointId, capabilities: &'a [ServiceCapability],
) -> Result<()> {
assert!(!capabilities.is_empty(), "must set at least one capability");
let mut params: Vec<u8> = Vec::new();
params.resize(capabilities.iter().fold(1, |a, x| a + x.encoded_len()), 0);
params[0] = stream_id.to_msg();
let mut idx = 1;
for capability in capabilities {
if !capability.is_application() {
return Err(Error::Encoding);
}
capability.encode(&mut params[idx..])?;
idx += capability.encoded_len();
}
let response: Result<SimpleResponse> =
await!(self.send_command(SignalIdentifier::Reconfigure, &params));
response.and(Ok(()))
}
/// Send a Open Stream Command (Sec 8.12) to the remote peer for the given
/// `stream_id`.
/// Returns Ok(()) if the command is accepted, and RemoteRejected if the
/// remote peer rejects the command with the code returned by the remote.
pub async fn open<'a>(&'a self, stream_id: &'a StreamEndpointId) -> Result<()> {
let stream_params = &[stream_id.to_msg()];
let response: Result<SimpleResponse> =
await!(self.send_command(SignalIdentifier::Open, stream_params));
response.and(Ok(()))
}
/// Send a Start Stream Command (Sec 8.13) to the remote peer for all the
/// streams in `stream_ids`.
/// Returns Ok(()) if the command is accepted, and RemoteStreamRejected
/// with the stream endpoint id and error code reported by the remote if
/// the remote signals a failure.
pub async fn start<'a>(&'a self, stream_ids: &'a [StreamEndpointId]) -> Result<()> {
let mut stream_params = Vec::with_capacity(stream_ids.len());
for stream_id in stream_ids {
stream_params.push(stream_id.to_msg());
}
let response: Result<SimpleResponse> =
await!(self.send_command(SignalIdentifier::Start, &stream_params));
response.and(Ok(()))
}
/// Send a Close Stream Command (Sec 8.14) to the remote peer for the given
/// `stream_id`.
/// Returns Ok(()) if the command is accepted, and RemoteRejected if the
/// remote peer rejects the command with the code returned by the remote.
pub async fn close<'a>(&'a self, stream_id: &'a StreamEndpointId) -> Result<()> {
let stream_params = &[stream_id.to_msg()];
let response: Result<SimpleResponse> =
await!(self.send_command(SignalIdentifier::Close, stream_params));
response.and(Ok(()))
}
/// Send a Suspend Command (Sec 8.15) to the remote peer for all the
/// streams in `stream_ids`.
/// Returns Ok(()) if the command is accepted, and RemoteStreamRejected
/// with the stream endpoint id and error code reported by the remote if
/// the remote signals a failure.
pub async fn suspend<'a>(&'a self, stream_ids: &'a [StreamEndpointId]) -> Result<()> {
let mut stream_params = Vec::with_capacity(stream_ids.len());
for stream_id in stream_ids {
stream_params.push(stream_id.to_msg());
}
let response: Result<SimpleResponse> =
await!(self.send_command(SignalIdentifier::Suspend, &stream_params));
response.and(Ok(()))
}
/// Send an Abort (Sec 8.16) to the remote peer for the given `stream_id`.
/// Returns Ok(()) if the command is accepted, and Err(Timeout) if the remote
/// timed out. The remote peer is not allowed to reject this command, and
/// commands that have invalid `stream_id` will timeout instead.
pub async fn abort<'a>(&'a self, stream_id: &'a StreamEndpointId) -> Result<()> {
let stream_params = &[stream_id.to_msg()];
let response: Result<SimpleResponse> =
await!(self.send_command(SignalIdentifier::Abort, stream_params));
response.and(Ok(()))
}
/// The maximum amount of time we will wait for a response to a signaling command.
fn command_timeout() -> Duration {
const RTX_SIG_TIMER_MS: i64 = 3000;
Duration::from_millis(RTX_SIG_TIMER_MS)
}
/// Sends a signal on the socket and receive a future that will complete
/// when we get the expected reponse.
async fn send_command<'a, D: Decodable>(
&'a self, signal: SignalIdentifier, payload: &'a [u8],
) -> Result<D> {
let id = self.inner.add_response_waiter()?;
let header = SignalingHeader::new(id, signal, SignalingMessageType::Command);
{
let mut buf = vec![0; header.encoded_len()];
header.encode(buf.as_mut_slice())?;
buf.extend_from_slice(payload);
self.inner.send_signal(buf.as_slice())?;
}
let mut response = CommandResponse {
id: header.label(),
inner: Some(self.inner.clone()),
};
let mut timeout = fasync::Timer::new(Time::after(Peer::command_timeout())).fuse();
select! {
_ = timeout => Err(Error::Timeout),
r = response => {
let response_buf = r?;
decode_signaling_response(header.signal(), response_buf)
},
}
}
}
/// A request from the connected peer.
/// Each variant of this includes a responder which implements two functions:
/// - send(...) will send a response with the information provided.
/// - reject(ErrorCode) will send an reject response with the given error code.
#[derive(Debug)]
pub enum Request {
Discover {
responder: DiscoverResponder,
},
GetCapabilities {
stream_id: StreamEndpointId,
responder: GetCapabilitiesResponder,
},
GetAllCapabilities {
stream_id: StreamEndpointId,
responder: GetCapabilitiesResponder,
},
SetConfiguration {
local_stream_id: StreamEndpointId,
remote_stream_id: StreamEndpointId,
capabilities: Vec<ServiceCapability>,
responder: ConfigureResponder,
},
Open {
stream_id: StreamEndpointId,
responder: SimpleResponder,
},
Reconfigure {
local_stream_id: StreamEndpointId,
capabilities: Vec<ServiceCapability>,
responder: ConfigureResponder,
},
Start {
stream_ids: Vec<StreamEndpointId>,
responder: StreamResponder,
},
Close {
stream_id: StreamEndpointId,
responder: SimpleResponder,
},
Suspend {
stream_ids: Vec<StreamEndpointId>,
responder: StreamResponder,
},
Abort {
stream_id: StreamEndpointId,
responder: SimpleResponder,
},
// TODO(jamuraa): add the rest of the requests
}
macro_rules! parse_one_seid {
($body:ident, $signal:ident, $peer:ident, $id:ident, $request_variant:ident, $responder_type:ident) => {
if $body.len() != 1 {
Err(Error::RequestInvalid(ErrorCode::BadLength))
} else {
Ok(Request::$request_variant {
stream_id: StreamEndpointId::from_msg(&$body[0]),
responder: $responder_type {
signal: $signal,
peer: $peer,
id: $id,
},
})
}
};
}
impl Request {
fn get_req_seids(body: &[u8]) -> Result<Vec<StreamEndpointId>> {
if body.len() < 1 {
return Err(Error::RequestInvalid(ErrorCode::BadLength));
}
Ok(body.iter().map(&StreamEndpointId::from_msg).collect())
}
fn get_req_capabilities(encoded: &[u8]) -> Result<Vec<ServiceCapability>> {
if encoded.len() < 2 {
return Err(Error::RequestInvalid(ErrorCode::BadLength));
}
let mut caps = vec![];
let mut loc = 0;
while loc < encoded.len() {
let cap = match ServiceCapability::decode(&encoded[loc..]) {
Ok(cap) => cap,
Err(Error::RequestInvalid(code)) => {
return Err(Error::RequestInvalidExtra(code, encoded[loc]));
}
Err(e) => return Err(e),
};
loc += cap.encoded_len();
caps.push(cap);
}
Ok(caps)
}
fn parse(
peer: Arc<PeerInner>, id: TxLabel, signal: SignalIdentifier, body: &[u8],
) -> Result<Request> {
match signal {
SignalIdentifier::Discover => {
// Discover Request has no body (Sec 8.6.1)
if body.len() > 0 {
return Err(Error::RequestInvalid(ErrorCode::BadLength));
}
Ok(Request::Discover {
responder: DiscoverResponder { peer: peer, id: id },
})
}
SignalIdentifier::GetCapabilities => parse_one_seid!(
body,
signal,
peer,
id,
GetCapabilities,
GetCapabilitiesResponder
),
SignalIdentifier::GetAllCapabilities => parse_one_seid!(
body,
signal,
peer,
id,
GetAllCapabilities,
GetCapabilitiesResponder
),
SignalIdentifier::SetConfiguration => {
if body.len() < 4 {
return Err(Error::RequestInvalid(ErrorCode::BadLength));
}
let requested = Request::get_req_capabilities(&body[2..])?;
Ok(Request::SetConfiguration {
local_stream_id: StreamEndpointId::from_msg(&body[0]),
remote_stream_id: StreamEndpointId::from_msg(&body[1]),
capabilities: requested,
responder: ConfigureResponder { signal, peer, id },
})
}
SignalIdentifier::Reconfigure => {
if body.len() < 3 {
return Err(Error::RequestInvalid(ErrorCode::BadLength));
}
let requested = Request::get_req_capabilities(&body[1..])?;
match requested.iter().find(|x| !x.is_application()) {
Some(x) => {
return Err(Error::RequestInvalidExtra(
ErrorCode::InvalidCapabilities,
x.to_category_byte(),
));
}
None => (),
};
Ok(Request::Reconfigure {
local_stream_id: StreamEndpointId::from_msg(&body[0]),
capabilities: requested,
responder: ConfigureResponder { signal, peer, id },
})
}
SignalIdentifier::Open => {
parse_one_seid!(body, signal, peer, id, Open, SimpleResponder)
}
SignalIdentifier::Start => {
let seids = Request::get_req_seids(body)?;
Ok(Request::Start {
stream_ids: seids,
responder: StreamResponder { signal, peer, id },
})
}
SignalIdentifier::Close => {
parse_one_seid!(body, signal, peer, id, Close, SimpleResponder)
}
SignalIdentifier::Suspend => {
let seids = Request::get_req_seids(body)?;
Ok(Request::Suspend {
stream_ids: seids,
responder: StreamResponder { signal, peer, id },
})
}
SignalIdentifier::Abort => {
parse_one_seid!(body, signal, peer, id, Abort, SimpleResponder)
}
_ => Err(Error::UnimplementedMessage),
}
}
}
/// A stream of requests from the remote peer.
#[derive(Debug)]
pub struct RequestStream {
inner: Arc<PeerInner>,
}
impl Unpin for RequestStream {}
impl Stream for RequestStream {
type Item = Result<Request>;
fn poll_next(self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Option<Self::Item>> {
Poll::Ready(match ready!(self.inner.poll_recv_request(lw)) {
Ok(UnparsedRequest(SignalingHeader { label, signal, .. }, body)) => {
match Request::parse(self.inner.clone(), label, signal, &body) {
Err(Error::RequestInvalid(code)) => {
self.inner.send_reject(label, signal, code)?;
return Poll::Pending;
}
Err(Error::RequestInvalidExtra(code, extra)) => {
self.inner
.send_reject_params(label, signal, &[extra, u8::from(&code)])?;
return Poll::Pending;
}
Err(Error::UnimplementedMessage) => {
self.inner
.send_reject(label, signal, ErrorCode::NotSupportedCommand)?;
return Poll::Pending;
}
x => Some(x),
}
}
Err(Error::PeerDisconnected) => None,
Err(e) => Some(Err(e)),
})
}
}
impl Drop for RequestStream {
fn drop(&mut self) {
self.inner.incoming_requests.lock().listener = RequestListener::None;
self.inner.wake_any();
}
}
// Simple responses have no body data.
#[derive(Debug)]
pub struct SimpleResponse {}
impl Decodable for SimpleResponse {
fn decode(from: &[u8]) -> Result<Self> {
if from.len() > 0 {
return Err(Error::InvalidMessage);
}
Ok(SimpleResponse {})
}
}
#[derive(Debug)]
struct DiscoverResponse {
endpoints: Vec<StreamInformation>,
}
impl Decodable for DiscoverResponse {
fn decode(from: &[u8]) -> Result<Self> {
let mut endpoints = Vec::<StreamInformation>::new();
let mut idx = 0;
while idx < from.len() {
let endpoint = StreamInformation::decode(&from[idx..])?;
idx += endpoint.encoded_len();
endpoints.push(endpoint);
}
Ok(DiscoverResponse {
endpoints: endpoints,
})
}
}
#[derive(Debug)]
pub struct DiscoverResponder {
peer: Arc<PeerInner>,
id: TxLabel,
}
impl DiscoverResponder {
/// Sends the response to a discovery request.
/// At least one endpoint must be present.
/// Will result in a Error::PeerWrite if the distant peer is disconnected.
pub fn send(self, endpoints: &[StreamInformation]) -> Result<()> {
if endpoints.len() == 0 {
// There shall be at least one SEP in a response (Sec 8.6.2)
return Err(Error::Encoding);
}
let mut params = vec![0 as u8; endpoints.len() * endpoints[0].encoded_len()];
let mut idx = 0;
for endpoint in endpoints {
endpoint.encode(&mut params[idx..idx + endpoint.encoded_len()])?;
idx += endpoint.encoded_len();
}
self.peer
.send_response(self.id, SignalIdentifier::Discover, &params)
}
pub fn reject(self, error_code: ErrorCode) -> Result<()> {
self.peer
.send_reject(self.id, SignalIdentifier::Discover, error_code)
}
}
#[derive(Debug)]
pub struct GetCapabilitiesResponder {
peer: Arc<PeerInner>,
signal: SignalIdentifier,
id: TxLabel,
}
impl GetCapabilitiesResponder {
pub fn send(self, capabilities: &[ServiceCapability]) -> Result<()> {
let included_iter = capabilities.iter().filter(|x| x.in_response(self.signal));
let reply_len = included_iter.clone().fold(0, |a, b| a + b.encoded_len());
let mut reply = vec![0 as u8; reply_len];
let mut pos = 0;
for capability in included_iter {
let size = capability.encoded_len();
capability.encode(&mut reply[pos..pos + size])?;
pos += size;
}
self.peer.send_response(self.id, self.signal, &reply)
}
pub fn reject(self, error_code: ErrorCode) -> Result<()> {
self.peer.send_reject(self.id, self.signal, error_code)
}
}
#[derive(Debug)]
struct GetCapabilitiesResponse {
capabilities: Vec<ServiceCapability>,
}
impl Decodable for GetCapabilitiesResponse {
fn decode(from: &[u8]) -> Result<Self> {
let mut capabilities = Vec::<ServiceCapability>::new();
let mut idx = 0;
while idx < from.len() {
let capability = ServiceCapability::decode(&from[idx..])?;
idx = idx + capability.encoded_len();
capabilities.push(capability);
}
Ok(GetCapabilitiesResponse {
capabilities: capabilities,
})
}
}
#[derive(Debug)]
pub struct SimpleResponder {
peer: Arc<PeerInner>,
signal: SignalIdentifier,
id: TxLabel,
}
impl SimpleResponder {
pub fn send(self) -> Result<()> {
self.peer.send_response(self.id, self.signal, &[])
}
pub fn reject(self, error_code: ErrorCode) -> Result<()> {
self.peer.send_reject(self.id, self.signal, error_code)
}
}
#[derive(Debug)]
pub struct StreamResponder {
peer: Arc<PeerInner>,
signal: SignalIdentifier,
id: TxLabel,
}
impl StreamResponder {
pub fn send(self) -> Result<()> {
self.peer.send_response(self.id, self.signal, &[])
}
pub fn reject(self, stream_id: &StreamEndpointId, error_code: ErrorCode) -> Result<()> {
self.peer.send_reject_params(
self.id,
self.signal,
&[stream_id.to_msg(), u8::from(&error_code)],
)
}
}
#[derive(Debug)]
pub struct ConfigureResponder {
peer: Arc<PeerInner>,
signal: SignalIdentifier,
id: TxLabel,
}
impl ConfigureResponder {
pub fn send(self) -> Result<()> {
self.peer.send_response(self.id, self.signal, &[])
}
pub fn reject(
self, capability: Option<&ServiceCapability>, error_code: ErrorCode,
) -> Result<()> {
let service_byte: u8 = match capability {
None => 0x0, // If no service category applies, see notes in Sec 8.11.3 or 8.9.3
Some(cap) => cap.to_category_byte(),
};
self.peer
.send_reject_params(self.id, self.signal, &[service_byte, u8::from(&error_code)])
}
}
#[derive(Debug)]
struct UnparsedRequest(SignalingHeader, Vec<u8>);
impl UnparsedRequest {
fn new(header: SignalingHeader, body: Vec<u8>) -> UnparsedRequest {
UnparsedRequest(header, body)
}
}
#[derive(Debug, Default)]
struct RequestQueue {
listener: RequestListener,
queue: VecDeque<UnparsedRequest>,
}
#[derive(Debug)]
enum RequestListener {
/// No one is listening.
None,
/// Someone wants to listen but hasn't polled.
New,
/// Someone is listening, and can be woken whith the waker.
Some(Waker),
}
impl Default for RequestListener {
fn default() -> Self {
RequestListener::None
}
}
/// An enum representing an interest in the response to a command.
#[derive(Debug)]
enum ResponseWaiter {
/// A new waiter which hasn't been polled yet.
WillPoll,
/// A task waiting for a response, which can be woken with the waker.
Waiting(Waker),
/// A response that has been received, stored here until it's polled, at
/// which point it will be decoded.
Received(Vec<u8>),
/// It's still waiting on the reponse, but the receiver has decided they
/// don't care and we'll throw it out.
Discard,
}
impl ResponseWaiter {
/// Check if a message has been received.
fn is_received(&self) -> bool {
if let ResponseWaiter::Received(_) = self {
true
} else {
false
}
}
fn unwrap_received(self) -> Vec<u8> {
if let ResponseWaiter::Received(buf) = self {
buf
} else {
panic!("expected received buf")
}
}
}
fn decode_signaling_response<D: Decodable>(
expected_signal: SignalIdentifier, buf: Vec<u8>,
) -> Result<D> {
let header = SignalingHeader::decode(buf.as_slice())?;
if header.signal() != expected_signal {
return Err(Error::InvalidHeader);
}
if !header.is_type(SignalingMessageType::ResponseAccept) {
let params_idx = header.encoded_len();
match header.signal() {
SignalIdentifier::Start | SignalIdentifier::Suspend => {
return Err(Error::RemoteStreamRejected(
buf[params_idx] >> 2,
buf[params_idx + 1],
));
}
SignalIdentifier::SetConfiguration | SignalIdentifier::Reconfigure => {
return Err(Error::RemoteConfigRejected(
buf[params_idx],
buf[params_idx + 1],
));
}
_ => return Err(Error::RemoteRejected(buf[params_idx])),
};
}
D::decode(&buf[header.encoded_len()..])
}
/// A future that polls for the response to a command we sent.
#[derive(Debug)]
pub struct CommandResponse {
id: TxLabel,
// Some(x) if we're still waiting on the response.
inner: Option<Arc<PeerInner>>,
}
impl Unpin for CommandResponse {}
impl futures::Future for CommandResponse {
type Output = Result<Vec<u8>>;
fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
let this = &mut *self;
let res;
{
let client = this.inner.as_ref().ok_or(Error::AlreadyReceived)?;
res = client.poll_recv_response(&this.id, lw);
}
if let Poll::Ready(Ok(_)) = res {
let inner = this
.inner
.take()
.expect("CommandResponse polled after completion");
inner.wake_any();
}
res
}
}
impl FusedFuture for CommandResponse {
fn is_terminated(&self) -> bool {
self.inner.is_none()
}
}
impl Drop for CommandResponse {
fn drop(&mut self) {
if let Some(inner) = &self.inner {
inner.remove_response_interest(&self.id);
inner.wake_any();
}
}
}
#[derive(Debug)]
struct PeerInner {
/// The signaling channel
signaling: fasync::Socket,
/// A map of transaction ids that have been sent but the response has not
/// been received and/or processed yet.
///
/// Waiters are added with `add_response_waiter` and get removed when they are
/// polled or they are removed with `remove_waiter`
response_waiters: Mutex<Slab<ResponseWaiter>>,
/// A queue of requests that have been received and are waiting to
/// be reponded to, along with the waker for the task that has
/// taken the request receiver (if it exists)
incoming_requests: Mutex<RequestQueue>,
}
impl PeerInner {
/// Add a response waiter, and return a id that can be used to send the
/// transaction. Responses then can be received using poll_recv_response
fn add_response_waiter(&self) -> Result<TxLabel> {
let key = self
.response_waiters
.lock()
.insert(ResponseWaiter::WillPoll);
let id = TxLabel::try_from(key as u8);
if id.is_err() {
fx_log_warn!(tag: "avdtp", "Transaction IDs are exhausted");
self.response_waiters.lock().remove(key);
}
id
}
/// When a waiter isn't interested in the response anymore, we need to just
/// throw it out. This is called when the response future is dropped.
fn remove_response_interest(&self, id: &TxLabel) {
let mut lock = self.response_waiters.lock();
let idx = usize::from(id);
if lock[idx].is_received() {
lock.remove(idx);
} else {
lock[idx] = ResponseWaiter::Discard;
}
}
// Attempts to receive a new request by processing all packets on the socket.
// Resolves to an unprocessed request (header, body) if one was received.
// Resolves to an error if there was an error reading from the socket or if the peer
// disconnected.
fn poll_recv_request(&self, lw: &LocalWaker) -> Poll<Result<UnparsedRequest>> {
let is_closed = self.recv_all(lw)?;
let mut lock = self.incoming_requests.lock();
if let Some(request) = lock.queue.pop_front() {
Poll::Ready(Ok(request))
} else {
lock.listener = RequestListener::Some(lw.clone().into_waker());
if is_closed {
Poll::Ready(Err(Error::PeerDisconnected))
} else {
Poll::Pending
}
}
}
// Attempts to receive a response to a request by processing all packets on the socket.
// Resolves to the bytes in the response body if one was received.
// Resolves to an error if there was an error reading from the socket, if the peer
// disconnected, or if the |label| is not being waited on.
fn poll_recv_response(&self, label: &TxLabel, lw: &LocalWaker) -> Poll<Result<Vec<u8>>> {
let is_closed = self.recv_all(lw)?;
let mut waiters = self.response_waiters.lock();
let idx = usize::from(label);
// We expect() below because the label above came from an internally-created object,
// so the waiters should always exist in the map.
if waiters
.get(idx)
.expect("Polled unregistered waiter")
.is_received()
{
// We got our response.
let buf = waiters.remove(idx).unwrap_received();
Poll::Ready(Ok(buf))
} else {
// Set the waker to be notified when a response shows up.
*waiters.get_mut(idx).expect("Polled unregistered waiter") =
ResponseWaiter::Waiting(lw.clone().into_waker());
if is_closed {
Poll::Ready(Err(Error::PeerDisconnected))
} else {
Poll::Pending
}
}
}
/// Poll for any packets on the signaling socket
/// Returns whether the channel was closed, or an Error::PeerRead or Error::PeerWrite
/// if there was a problem communicating on the socket.
fn recv_all(&self, lw: &LocalWaker) -> Result<bool> {
let mut buf = Vec::<u8>::new();
loop {
let packet_size = match self.signaling.poll_datagram(&mut buf, lw) {
Poll::Ready(Err(zx::Status::PEER_CLOSED)) => {
fx_vlog!(tag: "avdtp", 1, "Signaling peer closed");
return Ok(true);
}
Poll::Ready(Err(e)) => return Err(Error::PeerRead(e)),
Poll::Pending => return Ok(false),
Poll::Ready(Ok(size)) => size,
};
if packet_size == 0 {
continue;
}
// Detects General Reject condition and sends the response back.
// On other headers with errors, sends BAD_HEADER to the peer
// and attempts to continue.
let header = match SignalingHeader::decode(buf.as_slice()) {
Err(Error::InvalidSignalId(label, id)) => {
self.send_general_reject(label, id)?;
buf = buf.split_off(packet_size);
continue;
}
Err(_) => {
// Only possible other return is OutOfRange
// Returned only when the packet is too small, can't make a meaningful reject.
fx_log_info!(tag: "avdtp", "received unrejectable message");
buf = buf.split_off(packet_size);
continue;
}
Ok(x) => Ok(x),
}?;
// Commands from the remote get translated into requests.
if header.is_command() {
let mut lock = self.incoming_requests.lock();
let body = buf.split_off(header.encoded_len());
buf.clear();
lock.queue.push_back(UnparsedRequest::new(header, body));
if let RequestListener::Some(ref waker) = lock.listener {
waker.wake();
}
} else {
// Should be a response to a command we sent
let mut waiters = self.response_waiters.lock();
let idx = usize::from(&header.label());
let rest = buf.split_off(packet_size);
if let Some(&ResponseWaiter::Discard) = waiters.get(idx) {
waiters.remove(idx);
} else if let Some(entry) = waiters.get_mut(idx) {
let old_entry = mem::replace(entry, ResponseWaiter::Received(buf));
if let ResponseWaiter::Waiting(waker) = old_entry {
waker.wake();
}
} else {
fx_vlog!(tag: "avdtp", 1, "response for {:?} we did not send, dropping", header.label());
}
buf = rest;
// Note: we drop any TxLabel response we are not waiting for
}
}
}
// Wakes up an arbitrary task that has begun polling on the channel so that
// it will call recv_all and be registered as the new channel reader.
fn wake_any(&self) {
// Try to wake up response waiters first, rather than the event listener.
// The event listener is a stream, and so could be between poll_nexts,
// Response waiters should always be actively polled once
// they've begun being polled on a task.
{
let lock = self.response_waiters.lock();
for (_, response_waiter) in lock.iter() {
if let ResponseWaiter::Waiting(waker) = response_waiter {
waker.wake();
return;
}
}
}
{
let lock = self.incoming_requests.lock();
if let RequestListener::Some(waker) = &lock.listener {
waker.wake();
return;
}
}
}
// Build and send a General Reject message (Section 8.18)
fn send_general_reject(&self, label: TxLabel, invalid_signal_id: u8) -> Result<()> {
// Build the packet ourselves rather than make SignalingHeader build an packet with an
// invalid signal id.
let packet: &[u8; 2] = &[u8::from(&label) << 4 | 0x01, invalid_signal_id & 0x3F];
self.send_signal(packet)
}
fn send_response(&self, label: TxLabel, signal: SignalIdentifier, params: &[u8]) -> Result<()> {
let header = SignalingHeader::new(label, signal, SignalingMessageType::ResponseAccept);
let mut packet = vec![0 as u8; header.encoded_len() + params.len()];
header.encode(packet.as_mut_slice())?;
packet[header.encoded_len()..].clone_from_slice(params);
self.send_signal(&packet)
}
fn send_reject(
&self, label: TxLabel, signal: SignalIdentifier, error_code: ErrorCode,
) -> Result<()> {
self.send_reject_params(label, signal, &[u8::from(&error_code)])
}
fn send_reject_params(
&self, label: TxLabel, signal: SignalIdentifier, params: &[u8],
) -> Result<()> {
let header = SignalingHeader::new(label, signal, SignalingMessageType::ResponseReject);
let mut packet = vec![0 as u8; header.encoded_len() + params.len()];
header.encode(packet.as_mut_slice())?;
packet[header.encoded_len()..].clone_from_slice(params);
self.send_signal(&packet)
}
fn send_signal(&self, data: &[u8]) -> Result<()> {
self.signaling
.as_ref()
.write(data)
.map_err(|x| Error::PeerWrite(x))?;
Ok(())
}
}