blob: d9f5f71deb1f1a66c87d1c9d8b6ea286c5194192 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#![feature(async_await, await_macro, futures_api)]
use {
fuchsia_async as fasync,
fuchsia_syslog::{fx_log_info, fx_log_warn, fx_vlog},
fuchsia_zircon::{self as zx, Duration, Time},
ready, select,
task::{LocalWaker, Poll, Waker},
use crate::types::{
Decodable, Encodable, ErrorCode, SignalIdentifier, SignalingHeader, SignalingMessageType,
TryFrom, TxLabel,
mod tests;
mod types;
pub use crate::types::{
ContentProtectionType, EndpointType, Error, MediaCodecType, MediaType, Result,
ServiceCapability, StreamEndpointId,
/// An AVDTP signaling peer can send commands to another peer, receive requests and send responses.
/// Media transport is not handled by this peer.
/// Requests from the distant peer are delivered through the request stream available through
/// take_request_stream(). Only one RequestStream can be active at a time. Only valid requests
/// are sent to the request stream - invalid formats are automatically rejected.
/// Responses are sent using responders that are included in the request stream from the connected
/// peer.
pub struct Peer {
inner: Arc<PeerInner>,
impl Peer {
/// Create a new peer from a signaling channel socket.
pub fn new(signaling: zx::Socket) -> result::Result<Peer, zx::Status> {
Ok(Peer {
inner: Arc::new(PeerInner {
signaling: fasync::Socket::from_socket(signaling)?,
response_waiters: Mutex::new(Slab::<ResponseWaiter>::new()),
incoming_requests: Mutex::<RequestQueue>::default(),
/// Take the event listener for this peer. Panics if the stream is already
/// held.
pub fn take_request_stream(&self) -> RequestStream {
let mut lock = self.inner.incoming_requests.lock();
if let RequestListener::None = lock.listener {
lock.listener = RequestListener::New;
} else {
panic!("Request stream has already been taken");
RequestStream {
inner: self.inner.clone(),
/// Send a Stream End Point Discovery (Sec 8.6) command to the remote peer.
/// Asynchronously returns a the reply in a vector of endpoint information.
/// Error will be RemoteRejected with the error code returned by the remote
/// if the remote peer rejected the command.
pub async fn discover(&self) -> Result<Vec<StreamInformation>> {
let response: Result<DiscoverResponse> =
await!(self.send_command(SignalIdentifier::Discover, &[]));
match response {
Ok(response) => Ok(response.endpoints),
Err(e) => Err(e),
/// Send a Get Capabilities (Sec 8.7) command to the remote peer for the
/// given `stream_id`.
/// Asynchronously returns the reply which contains the ServiceCapabilities
/// reported.
/// In general, Get All Capabilities should be preferred to this command.
/// Error will be RemoteRejected with the error code reported by the remote
/// if the remote peer rejects the command.
pub async fn get_capabilities<'a>(
&'a self, stream_id: &'a StreamEndpointId,
) -> Result<Vec<ServiceCapability>> {
let stream_params = &[stream_id.to_msg()];
let response: Result<GetCapabilitiesResponse> =
await!(self.send_command(SignalIdentifier::GetCapabilities, stream_params));
match response {
Ok(response) => Ok(response.capabilities),
Err(e) => Err(e),
/// Send a Get All Capabilities (Sec 8.8) command to the remote peer for the
/// given `stream_id`.
/// Asynchronously returns the reply which contains the ServiceCapabilities
/// reported.
/// Error will be RemoteRejected with the error code reported by the remote
/// if the remote peer rejects the command.
pub async fn get_all_capabilities<'a>(
&'a self, stream_id: &'a StreamEndpointId,
) -> Result<Vec<ServiceCapability>> {
let stream_params = &[stream_id.to_msg()];
let response: Result<GetCapabilitiesResponse> =
await!(self.send_command(SignalIdentifier::GetAllCapabilities, stream_params));
match response {
Ok(response) => Ok(response.capabilities),
Err(e) => Err(e),
/// Send a Stream Configuration (Sec 8.9) command to the remote peer for the
/// given remote `stream_id`, communicating the association to a local
/// `local_stream_id` and the required stream `capabilities`.
/// Panics if `capabilities` is empty.
/// Returns Ok(()) if the command was accepted, and RemoteConfigRejected
/// if the remote refused.
pub async fn set_configuration<'a>(
&'a self, stream_id: &'a StreamEndpointId, local_stream_id: &'a StreamEndpointId,
capabilities: &'a [ServiceCapability],
) -> Result<()> {
assert!(!capabilities.is_empty(), "must set at least one capability");
let mut params: Vec<u8> = Vec::new();
params.resize(capabilities.iter().fold(2, |a, x| a + x.encoded_len()), 0);
params[0] = stream_id.to_msg();
params[1] = local_stream_id.to_msg();
let mut idx = 2;
for capability in capabilities {
capability.encode(&mut params[idx..])?;
idx += capability.encoded_len();
let response: Result<SimpleResponse> =
await!(self.send_command(SignalIdentifier::SetConfiguration, &params));
/// Send a Stream Reconfigure (Sec 8.11) command to the remote peer for the
/// given remote `stream_id`, to reconfigure the Application Service
/// capabilities in `capabilities`.
/// Note: Per the spec, only the Media Codec and Content Protection
/// capablities will be accepted in this command.
/// Panics if there are no capabilities to configure.
/// Returns Ok(()) if the command was accepted, and RemoteConfigRejected
/// if the remote refused.
pub async fn reconfigure<'a>(
&'a self, stream_id: &'a StreamEndpointId, capabilities: &'a [ServiceCapability],
) -> Result<()> {
assert!(!capabilities.is_empty(), "must set at least one capability");
let mut params: Vec<u8> = Vec::new();
params.resize(capabilities.iter().fold(1, |a, x| a + x.encoded_len()), 0);
params[0] = stream_id.to_msg();
let mut idx = 1;
for capability in capabilities {
if !capability.is_application() {
return Err(Error::Encoding);
capability.encode(&mut params[idx..])?;
idx += capability.encoded_len();
let response: Result<SimpleResponse> =
await!(self.send_command(SignalIdentifier::Reconfigure, &params));
/// Send a Open Stream Command (Sec 8.12) to the remote peer for the given
/// `stream_id`.
/// Returns Ok(()) if the command is accepted, and RemoteRejected if the
/// remote peer rejects the command with the code returned by the remote.
pub async fn open<'a>(&'a self, stream_id: &'a StreamEndpointId) -> Result<()> {
let stream_params = &[stream_id.to_msg()];
let response: Result<SimpleResponse> =
await!(self.send_command(SignalIdentifier::Open, stream_params));
/// Send a Start Stream Command (Sec 8.13) to the remote peer for all the
/// streams in `stream_ids`.
/// Returns Ok(()) if the command is accepted, and RemoteStreamRejected
/// with the stream endpoint id and error code reported by the remote if
/// the remote signals a failure.
pub async fn start<'a>(&'a self, stream_ids: &'a [StreamEndpointId]) -> Result<()> {
let mut stream_params = Vec::with_capacity(stream_ids.len());
for stream_id in stream_ids {
let response: Result<SimpleResponse> =
await!(self.send_command(SignalIdentifier::Start, &stream_params));
/// Send a Close Stream Command (Sec 8.14) to the remote peer for the given
/// `stream_id`.
/// Returns Ok(()) if the command is accepted, and RemoteRejected if the
/// remote peer rejects the command with the code returned by the remote.
pub async fn close<'a>(&'a self, stream_id: &'a StreamEndpointId) -> Result<()> {
let stream_params = &[stream_id.to_msg()];
let response: Result<SimpleResponse> =
await!(self.send_command(SignalIdentifier::Close, stream_params));
/// Send a Suspend Command (Sec 8.15) to the remote peer for all the
/// streams in `stream_ids`.
/// Returns Ok(()) if the command is accepted, and RemoteStreamRejected
/// with the stream endpoint id and error code reported by the remote if
/// the remote signals a failure.
pub async fn suspend<'a>(&'a self, stream_ids: &'a [StreamEndpointId]) -> Result<()> {
let mut stream_params = Vec::with_capacity(stream_ids.len());
for stream_id in stream_ids {
let response: Result<SimpleResponse> =
await!(self.send_command(SignalIdentifier::Suspend, &stream_params));
/// Send a Close Stream Command (Sec 8.14) to the remote peer for the given
/// `stream_id`.
/// Returns Ok(()) if the command is accepted, and RemoteRejected if the
/// remote peer rejects the command with the code returned by the remote.
pub async fn abort<'a>(&'a self, stream_id: &'a StreamEndpointId) -> Result<()> {
let stream_params = &[stream_id.to_msg()];
let response: Result<SimpleResponse> =
await!(self.send_command(SignalIdentifier::Abort, stream_params));
/// The maximum amount of time we will wait for a response to a signaling command.
fn command_timeout() -> Duration {
const RTX_SIG_TIMER_MS: i64 = 3000;
/// Sends a signal on the socket and receive a future that will complete
/// when we get the expected reponse.
async fn send_command<'a, D: Decodable>(
&'a self, signal: SignalIdentifier, payload: &'a [u8],
) -> Result<D> {
let id = self.inner.add_response_waiter()?;
let header = SignalingHeader::new(id, signal, SignalingMessageType::Command);
let mut buf = vec![0; header.encoded_len()];
let mut response = CommandResponse {
id: header.label(),
inner: Some(self.inner.clone()),
let mut timeout = fasync::Timer::new(Time::after(Peer::command_timeout())).fuse();
select! {
_ = timeout => Err(Error::Timeout),
r = response => {
let response_buf = r?;
decode_signaling_response(header.signal(), response_buf)
/// A request from the connected peer.
/// Each variant of this includes a responder which implements two functions:
/// - send(...) will send a response with the information provided.
/// - reject(ErrorCode) will send an reject response with the given error code.
pub enum Request {
Discover {
responder: DiscoverResponder,
GetCapabilities {
stream_id: StreamEndpointId,
responder: GetCapabilitiesResponder,
GetAllCapabilities {
stream_id: StreamEndpointId,
responder: GetCapabilitiesResponder,
SetConfiguration {
local_stream_id: StreamEndpointId,
remote_stream_id: StreamEndpointId,
capabilities: Vec<ServiceCapability>,
responder: ConfigureResponder,
Open {
stream_id: StreamEndpointId,
responder: SimpleResponder,
Reconfigure {
local_stream_id: StreamEndpointId,
capabilities: Vec<ServiceCapability>,
responder: ConfigureResponder,
Start {
stream_ids: Vec<StreamEndpointId>,
responder: StreamResponder,
Close {
stream_id: StreamEndpointId,
responder: SimpleResponder,
Suspend {
stream_ids: Vec<StreamEndpointId>,
responder: StreamResponder,
Abort {
stream_id: StreamEndpointId,
responder: SimpleResponder,
// TODO(jamuraa): add the rest of the requests
macro_rules! parse_one_seid {
($body:ident, $signal:ident, $peer:ident, $id:ident, $request_variant:ident, $responder_type:ident) => {
if $body.len() != 1 {
} else {
Ok(Request::$request_variant {
stream_id: StreamEndpointId::from_msg(&$body[0]),
responder: $responder_type {
signal: $signal,
peer: $peer,
id: $id,
impl Request {
fn get_req_seids(body: &[u8]) -> Result<Vec<StreamEndpointId>> {
if body.len() < 1 {
return Err(Error::RequestInvalid(ErrorCode::BadLength));
fn get_req_capabilities(encoded: &[u8]) -> Result<Vec<ServiceCapability>> {
if encoded.len() < 2 {
return Err(Error::RequestInvalid(ErrorCode::BadLength));
let mut caps = vec![];
let mut loc = 0;
while loc < encoded.len() {
let cap = match ServiceCapability::decode(&encoded[loc..]) {
Ok(cap) => cap,
Err(Error::RequestInvalid(code)) => {
return Err(Error::RequestInvalidExtra(code, encoded[loc]));
Err(e) => return Err(e),
loc += cap.encoded_len();
fn parse(
peer: Arc<PeerInner>, id: TxLabel, signal: SignalIdentifier, body: &[u8],
) -> Result<Request> {
match signal {
SignalIdentifier::Discover => {
// Discover Request has no body (Sec 8.6.1)
if body.len() > 0 {
return Err(Error::RequestInvalid(ErrorCode::BadLength));
Ok(Request::Discover {
responder: DiscoverResponder { peer: peer, id: id },
SignalIdentifier::GetCapabilities => parse_one_seid!(
SignalIdentifier::GetAllCapabilities => parse_one_seid!(
SignalIdentifier::SetConfiguration => {
if body.len() < 4 {
return Err(Error::RequestInvalid(ErrorCode::BadLength));
let requested = Request::get_req_capabilities(&body[2..])?;
Ok(Request::SetConfiguration {
local_stream_id: StreamEndpointId::from_msg(&body[0]),
remote_stream_id: StreamEndpointId::from_msg(&body[1]),
capabilities: requested,
responder: ConfigureResponder { signal, peer, id },
SignalIdentifier::Reconfigure => {
if body.len() < 3 {
return Err(Error::RequestInvalid(ErrorCode::BadLength));
let requested = Request::get_req_capabilities(&body[1..])?;
match requested.iter().find(|x| !x.is_application()) {
Some(x) => {
return Err(Error::RequestInvalidExtra(
None => (),
Ok(Request::Reconfigure {
local_stream_id: StreamEndpointId::from_msg(&body[0]),
capabilities: requested,
responder: ConfigureResponder { signal, peer, id },
SignalIdentifier::Open => {
parse_one_seid!(body, signal, peer, id, Open, SimpleResponder)
SignalIdentifier::Start => {
let seids = Request::get_req_seids(body)?;
Ok(Request::Start {
stream_ids: seids,
responder: StreamResponder { signal, peer, id },
SignalIdentifier::Close => {
parse_one_seid!(body, signal, peer, id, Close, SimpleResponder)
SignalIdentifier::Suspend => {
let seids = Request::get_req_seids(body)?;
Ok(Request::Suspend {
stream_ids: seids,
responder: StreamResponder { signal, peer, id },
SignalIdentifier::Abort => {
parse_one_seid!(body, signal, peer, id, Abort, SimpleResponder)
_ => Err(Error::UnimplementedMessage),
/// A stream of requests from the remote peer.
pub struct RequestStream {
inner: Arc<PeerInner>,
impl Unpin for RequestStream {}
impl Stream for RequestStream {
type Item = Result<Request>;
fn poll_next(self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Option<Self::Item>> {
Poll::Ready(match ready!(self.inner.poll_recv_request(lw)) {
Ok(UnparsedRequest(SignalingHeader { label, signal, .. }, body)) => {
match Request::parse(self.inner.clone(), label, signal, &body) {
Err(Error::RequestInvalid(code)) => {
self.inner.send_reject(label, signal, code)?;
return Poll::Pending;
Err(Error::RequestInvalidExtra(code, extra)) => {
.send_reject_params(label, signal, &[extra, u8::from(&code)])?;
return Poll::Pending;
Err(Error::UnimplementedMessage) => {
.send_reject(label, signal, ErrorCode::NotSupportedCommand)?;
return Poll::Pending;
x => Some(x),
Err(Error::PeerDisconnected) => None,
Err(e) => Some(Err(e)),
impl Drop for RequestStream {
fn drop(&mut self) {
self.inner.incoming_requests.lock().listener = RequestListener::None;
/// All information related to a stream. Part of the Discovery Response.
/// See Sec 8.6.2
#[derive(Debug, PartialEq)]
pub struct StreamInformation {
id: StreamEndpointId,
in_use: bool,
media_type: MediaType,
endpoint_type: EndpointType,
impl StreamInformation {
/// Create a new StreamInformation from an ID.
/// This will only fail if the ID given is out of the range of valid SEIDs (0x01 - 0x3E)
pub fn new(
id: u8, in_use: bool, media_type: MediaType, endpoint_type: EndpointType,
) -> Result<StreamInformation> {
Ok(StreamInformation {
id: StreamEndpointId::try_from(id)?,
in_use: in_use,
media_type: media_type,
endpoint_type: endpoint_type,
pub fn id(&self) -> &StreamEndpointId {
impl Decodable for StreamInformation {
fn decode(from: &[u8]) -> Result<Self> {
if from.len() < 2 {
return Err(Error::InvalidMessage);
let id = StreamEndpointId::from_msg(&from[0]);
let in_use: bool = from[0] & 0x02 != 0;
let media_type = MediaType::try_from(from[1] >> 4)?;
let endpoint_type = EndpointType::try_from((from[1] >> 3) & 0x1)?;
Ok(StreamInformation {
id: id,
in_use: in_use,
media_type: media_type,
endpoint_type: endpoint_type,
impl Encodable for StreamInformation {
fn encoded_len(&self) -> usize {
fn encode(&self, into: &mut [u8]) -> Result<()> {
if into.len() < self.encoded_len() {
return Err(Error::Encoding);
into[0] = | if self.in_use { 0x02 } else { 0x00 };
into[1] = u8::from(&self.media_type) << 4 | u8::from(&self.endpoint_type) << 3;
// Simple responses have no body data.
pub struct SimpleResponse {}
impl Decodable for SimpleResponse {
fn decode(from: &[u8]) -> Result<Self> {
if from.len() > 0 {
return Err(Error::InvalidMessage);
Ok(SimpleResponse {})
struct DiscoverResponse {
endpoints: Vec<StreamInformation>,
impl Decodable for DiscoverResponse {
fn decode(from: &[u8]) -> Result<Self> {
let mut endpoints = Vec::<StreamInformation>::new();
let mut idx = 0;
while idx < from.len() {
let endpoint = StreamInformation::decode(&from[idx..])?;
idx += endpoint.encoded_len();
Ok(DiscoverResponse {
endpoints: endpoints,
pub struct DiscoverResponder {
peer: Arc<PeerInner>,
id: TxLabel,
impl DiscoverResponder {
/// Sends the response to a discovery request.
/// At least one endpoint must be present.
/// Will result in a Error::PeerWrite if the distant peer is disconnected.
pub fn send(self, endpoints: &[StreamInformation]) -> Result<()> {
if endpoints.len() == 0 {
// There shall be at least one SEP in a response (Sec 8.6.2)
return Err(Error::Encoding);
let mut params = vec![0 as u8; endpoints.len() * endpoints[0].encoded_len()];
let mut idx = 0;
for endpoint in endpoints {
endpoint.encode(&mut params[idx..idx + endpoint.encoded_len()])?;
idx += endpoint.encoded_len();
.send_response(, SignalIdentifier::Discover, &params)
pub fn reject(self, error_code: ErrorCode) -> Result<()> {
.send_reject(, SignalIdentifier::Discover, error_code)
pub struct GetCapabilitiesResponder {
peer: Arc<PeerInner>,
signal: SignalIdentifier,
id: TxLabel,
impl GetCapabilitiesResponder {
pub fn send(self, capabilities: &[ServiceCapability]) -> Result<()> {
let included_iter = capabilities.iter().filter(|x| x.in_response(self.signal));
let reply_len = included_iter.clone().fold(0, |a, b| a + b.encoded_len());
let mut reply = vec![0 as u8; reply_len];
let mut pos = 0;
for capability in included_iter {
let size = capability.encoded_len();
capability.encode(&mut reply[pos..pos + size])?;
pos += size;
self.peer.send_response(, self.signal, &reply)
pub fn reject(self, error_code: ErrorCode) -> Result<()> {
self.peer.send_reject(, self.signal, error_code)
struct GetCapabilitiesResponse {
capabilities: Vec<ServiceCapability>,
impl Decodable for GetCapabilitiesResponse {
fn decode(from: &[u8]) -> Result<Self> {
let mut capabilities = Vec::<ServiceCapability>::new();
let mut idx = 0;
while idx < from.len() {
let capability = ServiceCapability::decode(&from[idx..])?;
idx = idx + capability.encoded_len();
Ok(GetCapabilitiesResponse {
capabilities: capabilities,
pub struct SimpleResponder {
peer: Arc<PeerInner>,
signal: SignalIdentifier,
id: TxLabel,
impl SimpleResponder {
pub fn send(self) -> Result<()> {
self.peer.send_response(, self.signal, &[])
pub fn reject(self, error_code: ErrorCode) -> Result<()> {
self.peer.send_reject(, self.signal, error_code)
pub struct StreamResponder {
peer: Arc<PeerInner>,
signal: SignalIdentifier,
id: TxLabel,
impl StreamResponder {
pub fn send(self) -> Result<()> {
self.peer.send_response(, self.signal, &[])
pub fn reject(self, stream_id: &StreamEndpointId, error_code: ErrorCode) -> Result<()> {
&[stream_id.to_msg(), u8::from(&error_code)],
pub struct ConfigureResponder {
peer: Arc<PeerInner>,
signal: SignalIdentifier,
id: TxLabel,
impl ConfigureResponder {
pub fn send(self) -> Result<()> {
self.peer.send_response(, self.signal, &[])
pub fn reject(
self, capability: Option<&ServiceCapability>, error_code: ErrorCode,
) -> Result<()> {
let service_byte: u8 = match capability {
None => 0x0, // If no service category applies, see notes in Sec 8.11.3 or 8.9.3
Some(cap) => cap.to_category_byte(),
.send_reject_params(, self.signal, &[service_byte, u8::from(&error_code)])
struct UnparsedRequest(SignalingHeader, Vec<u8>);
impl UnparsedRequest {
fn new(header: SignalingHeader, body: Vec<u8>) -> UnparsedRequest {
UnparsedRequest(header, body)
#[derive(Debug, Default)]
struct RequestQueue {
listener: RequestListener,
queue: VecDeque<UnparsedRequest>,
enum RequestListener {
/// No one is listening.
/// Someone wants to listen but hasn't polled.
/// Someone is listening, and can be woken whith the waker.
impl Default for RequestListener {
fn default() -> Self {
/// An enum representing an interest in the response to a command.
enum ResponseWaiter {
/// A new waiter which hasn't been polled yet.
/// A task waiting for a response, which can be woken with the waker.
/// A response that has been received, stored here until it's polled, at
/// which point it will be decoded.
/// It's still waiting on the reponse, but the receiver has decided they
/// don't care and we'll throw it out.
impl ResponseWaiter {
/// Check if a message has been received.
fn is_received(&self) -> bool {
if let ResponseWaiter::Received(_) = self {
} else {
fn unwrap_received(self) -> Vec<u8> {
if let ResponseWaiter::Received(buf) = self {
} else {
panic!("expected received buf")
fn decode_signaling_response<D: Decodable>(
expected_signal: SignalIdentifier, buf: Vec<u8>,
) -> Result<D> {
let header = SignalingHeader::decode(buf.as_slice())?;
if header.signal() != expected_signal {
return Err(Error::InvalidHeader);
if !header.is_type(SignalingMessageType::ResponseAccept) {
let params_idx = header.encoded_len();
match header.signal() {
SignalIdentifier::Start | SignalIdentifier::Suspend => {
return Err(Error::RemoteStreamRejected(
buf[params_idx] >> 2,
buf[params_idx + 1],
SignalIdentifier::SetConfiguration | SignalIdentifier::Reconfigure => {
return Err(Error::RemoteConfigRejected(
buf[params_idx + 1],
_ => return Err(Error::RemoteRejected(buf[params_idx])),
/// A future that polls for the response to a command we sent.
pub struct CommandResponse {
id: TxLabel,
// Some(x) if we're still waiting on the response.
inner: Option<Arc<PeerInner>>,
impl Unpin for CommandResponse {}
impl futures::Future for CommandResponse {
type Output = Result<Vec<u8>>;
fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
let this = &mut *self;
let res;
let client = this.inner.as_ref().ok_or(Error::AlreadyReceived)?;
res = client.poll_recv_response(&, lw);
if let Poll::Ready(Ok(_)) = res {
let inner = this
.expect("CommandResponse polled after completion");
impl FusedFuture for CommandResponse {
fn is_terminated(&self) -> bool {
impl Drop for CommandResponse {
fn drop(&mut self) {
if let Some(inner) = &self.inner {
struct PeerInner {
/// The signaling channel
signaling: fasync::Socket,
/// A map of transaction ids that have been sent but the response has not
/// been received and/or processed yet.
/// Waiters are added with `add_response_waiter` and get removed when they are
/// polled or they are removed with `remove_waiter`
response_waiters: Mutex<Slab<ResponseWaiter>>,
/// A queue of requests that have been received and are waiting to
/// be reponded to, along with the waker for the task that has
/// taken the request receiver (if it exists)
incoming_requests: Mutex<RequestQueue>,
impl PeerInner {
/// Add a response waiter, and return a id that can be used to send the
/// transaction. Responses then can be received using poll_recv_response
fn add_response_waiter(&self) -> Result<TxLabel> {
let key = self
let id = TxLabel::try_from(key as u8);
if id.is_err() {
fx_log_warn!(tag: "avdtp", "Transaction IDs are exhausted");
/// When a waiter isn't interested in the response anymore, we need to just
/// throw it out. This is called when the response future is dropped.
fn remove_response_interest(&self, id: &TxLabel) {
let mut lock = self.response_waiters.lock();
let idx = usize::from(id);
if lock[idx].is_received() {
} else {
lock[idx] = ResponseWaiter::Discard;
// Attempts to receive a new request by processing all packets on the socket.
// Resolves to an unprocessed request (header, body) if one was received.
// Resolves to an error if there was an error reading from the socket or if the peer
// disconnected.
fn poll_recv_request(&self, lw: &LocalWaker) -> Poll<Result<UnparsedRequest>> {
let is_closed = self.recv_all(lw)?;
let mut lock = self.incoming_requests.lock();
if let Some(request) = lock.queue.pop_front() {
} else {
lock.listener = RequestListener::Some(lw.clone().into_waker());
if is_closed {
} else {
// Attempts to receive a response to a request by processing all packets on the socket.
// Resolves to the bytes in the response body if one was received.
// Resolves to an error if there was an error reading from the socket, if the peer
// disconnected, or if the |label| is not being waited on.
fn poll_recv_response(&self, label: &TxLabel, lw: &LocalWaker) -> Poll<Result<Vec<u8>>> {
let is_closed = self.recv_all(lw)?;
let mut waiters = self.response_waiters.lock();
let idx = usize::from(label);
// We expect() below because the label above came from an internally-created object,
// so the waiters should always exist in the map.
if waiters
.expect("Polled unregistered waiter")
// We got our response.
let buf = waiters.remove(idx).unwrap_received();
} else {
// Set the waker to be notified when a response shows up.
*waiters.get_mut(idx).expect("Polled unregistered waiter") =
if is_closed {
} else {
/// Poll for any packets on the signaling socket
/// Returns whether the channel was closed, or an Error::PeerRead or Error::PeerWrite
/// if there was a problem communicating on the socket.
fn recv_all(&self, lw: &LocalWaker) -> Result<bool> {
let mut buf = Vec::<u8>::new();
loop {
let packet_size = match self.signaling.poll_datagram(&mut buf, lw) {
Poll::Ready(Err(zx::Status::PEER_CLOSED)) => {
fx_vlog!(tag: "avdtp", 1, "Signaling peer closed");
return Ok(true);
Poll::Ready(Err(e)) => return Err(Error::PeerRead(e)),
Poll::Pending => return Ok(false),
Poll::Ready(Ok(size)) => size,
if packet_size == 0 {
// Detects General Reject condition and sends the response back.
// On other headers with errors, sends BAD_HEADER to the peer
// and attempts to continue.
let header = match SignalingHeader::decode(buf.as_slice()) {
Err(Error::InvalidSignalId(label, id)) => {
self.send_general_reject(label, id)?;
buf = buf.split_off(packet_size);
Err(_) => {
// Only possible other return is OutOfRange
// Returned only when the packet is too small, can't make a meaningful reject.
fx_log_info!(tag: "avdtp", "received unrejectable message");
buf = buf.split_off(packet_size);
Ok(x) => Ok(x),
// Commands from the remote get translated into requests.
if header.is_command() {
let mut lock = self.incoming_requests.lock();
let body = buf.split_off(header.encoded_len());
lock.queue.push_back(UnparsedRequest::new(header, body));
if let RequestListener::Some(ref waker) = lock.listener {
} else {
// Should be a response to a command we sent
let mut waiters = self.response_waiters.lock();
let idx = usize::from(&header.label());
let rest = buf.split_off(packet_size);
if let Some(&ResponseWaiter::Discard) = waiters.get(idx) {
} else if let Some(entry) = waiters.get_mut(idx) {
let old_entry = mem::replace(entry, ResponseWaiter::Received(buf));
if let ResponseWaiter::Waiting(waker) = old_entry {
} else {
fx_vlog!(tag: "avdtp", 1, "response for {:?} we did not send, dropping", header.label());
buf = rest;
// Note: we drop any TxLabel response we are not waiting for
// Wakes up an arbitrary task that has begun polling on the channel so that
// it will call recv_all and be registered as the new channel reader.
fn wake_any(&self) {
// Try to wake up response waiters first, rather than the event listener.
// The event listener is a stream, and so could be between poll_nexts,
// Response waiters should always be actively polled once
// they've begun being polled on a task.
let lock = self.response_waiters.lock();
for (_, response_waiter) in lock.iter() {
if let ResponseWaiter::Waiting(waker) = response_waiter {
let lock = self.incoming_requests.lock();
if let RequestListener::Some(waker) = &lock.listener {
// Build and send a General Reject message (Section 8.18)
fn send_general_reject(&self, label: TxLabel, invalid_signal_id: u8) -> Result<()> {
// Build the packet ourselves rather than make SignalingHeader build an packet with an
// invalid signal id.
let packet: &[u8; 2] = &[u8::from(&label) << 4 | 0x01, invalid_signal_id & 0x3F];
fn send_response(&self, label: TxLabel, signal: SignalIdentifier, params: &[u8]) -> Result<()> {
let header = SignalingHeader::new(label, signal, SignalingMessageType::ResponseAccept);
let mut packet = vec![0 as u8; header.encoded_len() + params.len()];
fn send_reject(
&self, label: TxLabel, signal: SignalIdentifier, error_code: ErrorCode,
) -> Result<()> {
self.send_reject_params(label, signal, &[u8::from(&error_code)])
fn send_reject_params(
&self, label: TxLabel, signal: SignalIdentifier, params: &[u8],
) -> Result<()> {
let header = SignalingHeader::new(label, signal, SignalingMessageType::ResponseReject);
let mut packet = vec![0 as u8; header.encoded_len() + params.len()];
fn send_signal(&self, data: &[u8]) -> Result<()> {
.map_err(|x| Error::PeerWrite(x))?;