blob: be4482ce5ffa1e1709ff747d7dad91362ef7eda7 [file] [log] [blame]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
anyhow::Error,
bt_a2dp::permits::{Permit, Permits},
core::pin::Pin,
core::task::{Context, Poll},
fidl::prelude::*,
fidl_fuchsia_bluetooth_internal_a2dp::{
ControllerRequest, ControllerRequestStream, ControllerSuspendResponder,
StreamSuspenderRequestStream,
},
fuchsia_async as fasync,
fuchsia_bluetooth::types::PeerId,
fuchsia_component::server::{ServiceFs, ServiceObj},
futures::select,
futures::stream::{SelectAll, Stream},
futures::StreamExt,
std::cell::RefCell,
std::sync::{Arc, Weak},
tracing::{info, trace, warn},
};
/// An interface for managing the state of streaming connections with remote peers.
pub trait StreamController {
/// Manages the lifetime of a stream suspend request. When the `Token` is dropped, the streaming
/// suspension may be lifted.
type Token: Send + Unpin;
/// Suspend the stream associated with `id` until the Token is dropped. If `id` is None,
/// all streams will be suspended.
fn suspend(&self, id: Option<PeerId>) -> Result<Self::Token, Error>;
}
/// Owns the set of streaming Permits used to suspend & release A2DP streams.
#[derive(Clone)]
pub struct PermitsManager {
permits: Permits,
held: RefCell<Weak<Vec<Permit>>>,
}
impl From<Permits> for PermitsManager {
fn from(permits: Permits) -> Self {
Self { permits, held: RefCell::new(Weak::new()) }
}
}
impl StreamController for PermitsManager {
type Token = Arc<Vec<Permit>>;
// TODO(https://fxbug.dev/42156985): Use id to actually grab only the permits owned by PeerId
fn suspend(&self, _id: Option<PeerId>) -> Result<Self::Token, Error> {
if let Some(token) = self.held.borrow().upgrade() {
return Ok(token);
}
let token = Arc::new(self.permits.seize());
let _ = self.held.replace(Arc::downgrade(&token));
Ok(token)
}
}
/// A Token representing a suspension on an A2DP media stream.
struct SuspendToken<S: Stream, SC: StreamController> {
/// The identifier associated with the stream. An `id` of None represents a global suspension
/// on all streams.
id: Option<PeerId>,
/// The stream representing the lifetime of the FIDL client's suspend request.
stream: S,
/// The responder used to notify the FIDL client that the suspend request has terminated.
///
/// This will be set as long as the `stream` is active and will be consumed on stream
/// termination.
responder: Option<ControllerSuspendResponder>,
/// The underlying `StreamController` token used to manage the lifetime of the A2DP streaming
/// hold.
_token: <SC as StreamController>::Token,
}
impl<S, SC> Stream for SuspendToken<S, SC>
where
S: Stream + Unpin,
SC: StreamController,
{
type Item = S::Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.stream.poll_next_unpin(cx)
}
}
impl<S: Stream, SC: StreamController> Drop for SuspendToken<S, SC> {
fn drop(&mut self) {
// The FIDL client no longer needs stream suspension. Discard the `token` representing the
// suspension and notify the client that it has been processed.
info!("a2dp.Controller client dropped suspend request for {:?}", self.id);
let _ = self.responder.take().expect("responder exists").send();
}
}
/// Handles an A2DP stream suspend `request`.
///
/// Uses the provided `controller` to suspend the media stream.
///
/// On success, returns a `SuspendToken` representing the suspension, or None if failure.
fn handle_suspend_request<SC: StreamController>(
request: ControllerRequest,
controller: &SC,
) -> Option<SuspendToken<StreamSuspenderRequestStream, SC>> {
let ControllerRequest::Suspend { peer_id, token, responder } = request;
info!("Received a2dp.Controller FIDL request to suspend for peer {:?}", peer_id);
let id = peer_id.map(|id| (*id).into());
let stream = match token.into_stream() {
Ok(st) => st,
Err(e) => {
if !e.is_closed() {
warn!("StreamSuspender channel closed with unexpected error: {:?}", e);
}
let _ = responder.send();
return None;
}
};
// Request to suspend the stream via the `controller`. The lifetime of the returned Token
// will be tied to that of the FIDL client's suspend `stream`.
let token = match controller.suspend(id) {
Ok(token) => token,
Err(e) => {
// If the suspend request couldn't be processed, close the `StreamSuspender` channel and
// notify the FIDL client.
trace!("Couldn't suspend stream for {:?}: {:?}", id, e);
let _ = stream.control_handle().shutdown_with_epitaph(fuchsia_zircon::Status::INTERNAL);
let _ = responder.send();
return None;
}
};
// Notify FIDL client that suspend was successful.
if let Err(e) = stream.control_handle().send_on_suspended() {
trace!("Couldn't notify client of stream suspension: {:?}", e);
}
Some(SuspendToken { id, stream, responder: Some(responder), _token: token })
}
/// Processes FIDL requests from the `stream`.
///
/// The lifetime of this task is tied to the provided `stream`. Each Suspend request
/// uses the `controller` to modify the A2DP streaming state with a given peer.
async fn handle_stream_controller_connection<SC: StreamController>(
mut stream: ControllerRequestStream,
controller: SC,
) {
info!("a2dp.Controller client connected");
let mut suspend_tokens: SelectAll<SuspendToken<StreamSuspenderRequestStream, SC>> =
SelectAll::new();
loop {
select! {
suspend_request = stream.select_next_some() => {
let request = match suspend_request {
Err(e) => {
info!("a2dp.Controller request error: {}. Exiting", e);
break;
}
Ok(r) => r,
};
if let Some(token) = handle_suspend_request(request, &controller) {
suspend_tokens.push(token);
}
}
// The `StreamSuspender` protocol contains no methods.
_ = suspend_tokens.select_next_some() => {},
complete => break,
}
}
info!("a2dp.Controller connection finished");
}
/// Defines a FIDL service implementation for the `a2dp.Controller` capability and spawns a
/// task to process incoming requests. Protocol requests will be handled by the task and
/// use the provided `controller` to manipulate A2DP streaming state.
pub fn add_stream_controller_capability<SC: StreamController + Clone + Send + 'static>(
fs: &mut ServiceFs<ServiceObj<'_, ()>>,
controller: SC,
) {
let _ = fs.dir("svc").add_fidl_service({
move |s| {
fasync::Task::spawn(handle_stream_controller_connection(s, controller.clone())).detach()
}
});
}
#[cfg(test)]
mod tests {
use super::*;
use anyhow::format_err;
use fidl::client::QueryResponseFut;
use fidl::endpoints::{create_proxy, create_proxy_and_stream};
use fidl_fuchsia_bluetooth_internal_a2dp::{
ControllerMarker, ControllerProxy, StreamSuspenderEvent, StreamSuspenderMarker,
StreamSuspenderProxy,
};
use fuchsia_sync::Mutex;
use futures::channel::mpsc;
#[derive(Debug, PartialEq)]
enum Event {
// The `controller` has received a Suspend request.
Suspend(Option<PeerId>),
// The `StreamController::Token` associated with the suspend request has been dropped.
TokenDropped,
}
// A token that notifies the Sender when it has been dropped.
struct MockToken(mpsc::Sender<Event>);
impl Drop for MockToken {
fn drop(&mut self) {
let _ = self.0.try_send(Event::TokenDropped).unwrap();
}
}
/// A mock implementation of a `StreamController` interface implementation that echos events to
/// the `sender`.
#[derive(Clone)]
struct MockStreamController {
sender: mpsc::Sender<Event>,
// Whether the `StreamController::suspend` request should error.
should_error: bool,
}
impl MockStreamController {
fn new(should_error: bool) -> (Self, mpsc::Receiver<Event>) {
let (s, r) = mpsc::channel(0);
(Self { sender: s, should_error }, r)
}
}
impl StreamController for MockStreamController {
type Token = MockToken;
fn suspend(&self, id: Option<PeerId>) -> Result<Self::Token, Error> {
let mut s = self.sender.clone();
let _ = s.try_send(Event::Suspend(id.clone())).unwrap();
if self.should_error {
Err(format_err!("Suspend error"))
} else {
Ok(MockToken(s.clone()))
}
}
}
fn setup_server_and_mock_controller(
should_error: bool,
) -> (fasync::Task<()>, ControllerProxy, mpsc::Receiver<Event>) {
let (c, s) = create_proxy_and_stream::<ControllerMarker>().unwrap();
let (controller, test_events) = MockStreamController::new(should_error);
let _server_task =
fasync::Task::local(handle_stream_controller_connection(s, controller.clone()));
(_server_task, c, test_events)
}
async fn expect_fidl_event_for_client(client: &StreamSuspenderProxy) {
let mut event_stream = client.take_event_stream();
match event_stream.next().await {
Some(Ok(StreamSuspenderEvent::OnSuspended { .. })) => {}
x => panic!("Expected ready with event but got: {:?}", x),
}
}
async fn expect_event(listener: &mut mpsc::Receiver<Event>, expected: Event) {
match listener.next().await {
Some(event) => assert_eq!(event, expected),
x => panic!("Expected ready with event but got: {:?}", x),
}
}
async fn expect_pending(listener: &mut mpsc::Receiver<Event>) {
assert!(futures::poll!(listener.next()).is_pending());
}
/// Makes a client suspend request. Returns the Future associated with the request,
/// and the client end of the suspend token (to be held as the stream suspend request).
fn make_suspend_request(
controller_svc: &ControllerProxy,
id: Option<PeerId>,
) -> (QueryResponseFut<()>, StreamSuspenderProxy) {
let (c, s) = create_proxy::<StreamSuspenderMarker>().unwrap();
let peer_id: Option<fidl_fuchsia_bluetooth::PeerId> = id.map(|id| id.into());
let fidl_req = controller_svc
.suspend(peer_id.as_ref(), s)
.check()
.expect("FIDL suspend request should succeed");
(fidl_req, c)
}
#[fuchsia::test]
async fn suspend_and_release_is_handled_by_server() {
let (mut server, controller_svc, mut test_events) =
setup_server_and_mock_controller(/* should_error */ false);
// Initially, no events from server.
expect_pending(&mut test_events).await;
// Client wants to suspend stream for some peer.
let remote_id = PeerId(9);
let (fidl_request, client_token) = make_suspend_request(&controller_svc, Some(remote_id));
// Expect it to be processed by the `server` and sent to the mock `controller`.
expect_event(&mut test_events, Event::Suspend(Some(remote_id))).await;
// Also expect a FIDL event notifying the client that the stream was suspended.
expect_fidl_event_for_client(&client_token).await;
// Client no longer needs suspension - the server should detect this and drop the
// `StreamController::Token` it is holding for this client.
drop(client_token);
expect_event(&mut test_events, Event::TokenDropped).await;
// Async suspend request should resolve.
fidl_request.await.expect("FIDL suspend request should resolve");
// Even though there are no more tokens, the server task should still be active since
// the FIDL client still holds the `controller_svc` handle.
assert!(futures::poll!(&mut server).is_pending());
}
#[fuchsia::test]
async fn concurrent_suspend_requests_generate_two_suspend_events() {
let (_server, controller_svc, mut test_events) =
setup_server_and_mock_controller(/* should_error */ false);
// Client wants to suspend stream for some peer.
let remote_id = PeerId(13);
let (fidl_request1, client_token1) = make_suspend_request(&controller_svc, Some(remote_id));
// Expect mock `controller` to receive the request + the client notified via a FIDL event.
expect_event(&mut test_events, Event::Suspend(Some(remote_id))).await;
expect_fidl_event_for_client(&client_token1).await;
// Another request to suspend the stream for the same peer.
let (fidl_request2, client_token2) = make_suspend_request(&controller_svc, Some(remote_id));
// Expect mock `controller` to receive the request + the client notified via a FIDL event.
expect_event(&mut test_events, Event::Suspend(Some(remote_id))).await;
expect_fidl_event_for_client(&client_token2).await;
// Client #1 is done, doesn't need suspension - expect the server to drop the
// `StreamController::Token` associated with the request.
drop(client_token1);
expect_event(&mut test_events, Event::TokenDropped).await;
fidl_request1.await.expect("FIDL suspend request should resolve");
// The other Token should be untouched - no other events.
expect_pending(&mut test_events).await;
// Client #2 is done - expect TokenDropped and suspend FIDL call to resolve.
drop(client_token2);
expect_event(&mut test_events, Event::TokenDropped).await;
fidl_request2.await.expect("FIDL suspend request should resolve");
}
#[fuchsia::test]
async fn sequential_suspend_requests_generate_two_suspend_events() {
let (_server, controller_svc, mut test_events) =
setup_server_and_mock_controller(/* should_error */ false);
// Client wants to suspend stream for some peer.
let remote_id = PeerId(16);
let (fidl_request1, client_token1) = make_suspend_request(&controller_svc, Some(remote_id));
// Expect mock `controller` to receive the request + the client notified via a FIDL event.
expect_event(&mut test_events, Event::Suspend(Some(remote_id))).await;
expect_fidl_event_for_client(&client_token1).await;
// Request #1 is done - Token should be dropped.
drop(client_token1);
expect_event(&mut test_events, Event::TokenDropped).await;
fidl_request1.await.expect("FIDL suspend request should resolve");
// Another request to suspend the stream for the same peer is OK. Should generate events.
let (fidl_request2, client_token2) = make_suspend_request(&controller_svc, Some(remote_id));
expect_event(&mut test_events, Event::Suspend(Some(remote_id))).await;
expect_fidl_event_for_client(&client_token2).await;
// Request #2 is done - Token should be dropped.
drop(client_token2);
expect_event(&mut test_events, Event::TokenDropped).await;
fidl_request2.await.expect("FIDL suspend request should resolve");
}
#[fuchsia::test]
async fn events_generated_with_both_global_and_specific_stream_suspension() {
let (_server, controller_svc, mut test_events) =
setup_server_and_mock_controller(/* should_error */ false);
// Client wants a global suspension.
let (fidl_request1, client_token1) = make_suspend_request(&controller_svc, None);
// Expect mock `controller` to receive the request + the client notified via a FIDL event.
expect_event(&mut test_events, Event::Suspend(None)).await;
expect_fidl_event_for_client(&client_token1).await;
// Client wants a specific peer suspension.
let remote_id = PeerId(567);
let (fidl_request2, client_token2) = make_suspend_request(&controller_svc, Some(remote_id));
// Expect mock `controller` to receive the request + the client notified via a FIDL event.
expect_event(&mut test_events, Event::Suspend(Some(remote_id))).await;
expect_fidl_event_for_client(&client_token2).await;
// Client request #2 is done - expect the Token dropping event.
drop(client_token2);
expect_event(&mut test_events, Event::TokenDropped).await;
fidl_request2.await.expect("FIDL suspend request should resolve");
// Request #1 is done - expect the Token dropping event.
drop(client_token1);
expect_event(&mut test_events, Event::TokenDropped).await;
fidl_request1.await.expect("FIDL suspend request should resolve");
}
async fn expect_fidl_error_for_client(client: &StreamSuspenderProxy) {
let mut event_stream = client.take_event_stream();
match event_stream.next().await {
Some(Err(fidl::Error::ClientChannelClosed {
status: fuchsia_zircon::Status::INTERNAL,
..
})) => {}
x => panic!("Expected ready with INTERNAL error but got: {:?}", x),
}
}
#[fuchsia::test]
async fn stream_controller_suspend_error_closes_channel() {
let (_server, controller_svc, mut test_events) =
setup_server_and_mock_controller(/* should_error */ true);
// Client wants to suspend stream for some peer.
let remote_id = PeerId(55);
let (fidl_request, client_token) = make_suspend_request(&controller_svc, Some(remote_id));
// Expect mock `controller` to receive the suspend request, but it errors so the
// `client_token`protocol should close with zx::Status::INTERNAL. The FIDL call should
// also terminate.
expect_event(&mut test_events, Event::Suspend(Some(remote_id))).await;
expect_fidl_error_for_client(&client_token).await;
fidl_request.await.expect("FIDL suspend request should resolve");
}
/// Verifies that a token that is already closed is processed by the task and immediately
/// results in stream suspending + lifting.
#[fuchsia::test]
async fn closed_suspend_token_is_correctly_handled() {
let (_server, controller_svc, mut test_events) =
setup_server_and_mock_controller(/* should_error */ false);
// Client wants to suspend stream for some peer but closes token before sending the request.
let remote_id = PeerId(11);
let (c, s) = create_proxy::<StreamSuspenderMarker>().unwrap();
drop(c);
let fidl_request = controller_svc
.suspend(Some(remote_id.into()).as_ref(), s)
.check()
.expect("FIDL suspend request should succeed");
// Expect the suspend to be processed by the `server` and sent to the mock `controller`.
expect_event(&mut test_events, Event::Suspend(Some(remote_id))).await;
// The server should immediately recognize that the token was dropped and should release.
expect_event(&mut test_events, Event::TokenDropped).await;
// Async suspend request should resolve.
fidl_request.await.expect("FIDL suspend request should resolve");
}
#[fuchsia::test]
async fn stream_controller_connection_handler_exits_when_exhausted() {
let (mut server, controller_svc, mut test_events) =
setup_server_and_mock_controller(/* should_error */ false);
// Client suspends for some peer.
let remote_id = PeerId(19);
let (fidl_request, client_token) = make_suspend_request(&controller_svc, Some(remote_id));
// Expect it to be processed by the `server` and sent to the mock `controller` and a FIDL
// event generated.
expect_event(&mut test_events, Event::Suspend(Some(remote_id))).await;
expect_fidl_event_for_client(&client_token).await;
// Client disconnects the Controller protocol - the server task should still be active
// since there are outstanding tokens to be processed.
drop(controller_svc);
assert!(futures::poll!(&mut server).is_pending());
// Client drops suspend token - the server should detect this and drop the
// `StreamController::Token` it is holding for this client.
drop(client_token);
expect_event(&mut test_events, Event::TokenDropped).await;
// Async suspend request should resolve.
fidl_request.await.expect("FIDL suspend request should resolve");
// The server task should then complete since both the `Controller` request stream and outstanding
// tokens have been exhausted.
server.await;
}
#[fuchsia::test]
fn permit_manager_seizes_permits() {
const TOTAL_PERMITS: usize = 2;
let permits = Permits::new(TOTAL_PERMITS);
let manager = PermitsManager::from(permits.clone());
let permit_holder = Arc::new(Mutex::new(None));
let revoke_from_holder_fn = {
// The permit we have holds a reference to this, so a strong ref to the holder
// here means dropping the holder doesn't drop the permit. Use a weak ref instead.
let holder = Arc::downgrade(&permit_holder);
move || {
holder
.upgrade()
.expect("holder must exist")
.lock()
.take()
.expect("should be holding Permit")
}
};
let permit = permits.get_revokable(revoke_from_holder_fn.clone());
*permit_holder.lock() = Some(permit.expect("permit"));
let token = manager.suspend(None).expect("suspend permits");
// Can't get a permit, token holds them all, and took the one we had.
assert!(permits.get_revokable(revoke_from_holder_fn.clone()).is_none());
assert!(permit_holder.lock().is_none(), "permit should have been revoked");
let token2 = manager.suspend(None).expect("second suspension");
drop(token);
// Can't get a permit, because token2 is still holding a token suspending everything.
assert!(permits.get_revokable(revoke_from_holder_fn.clone()).is_none());
// Can get another permit after the second client drops.
drop(token2);
let permit = permits.get_revokable(revoke_from_holder_fn).expect("permit");
*permit_holder.lock() = Some(permit);
drop(permit_holder);
drop(manager);
drop(permits);
}
}