blob: 52dcbfd97ae7cb94511872924dc7d427c8009e87 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use crate::log_error::log_error_discard_result;
use crate::service::ServiceEvent;
use crate::{Result, CHANNEL_BUFFER_SIZE};
use failure::ResultExt;
use fidl::endpoints::ServerEnd;
use fidl_fuchsia_mediasession::{
ControllerControlHandle, ControllerEvent, ControllerEventStream, ControllerMarker,
ControllerProxy, ControllerRequest, PlaybackState,
};
use fuchsia_async as fasync;
use futures::{
channel::mpsc::{channel, Receiver, Sender},
future, select, Future, FutureExt, SinkExt, StreamExt, TryFutureExt, TryStreamExt,
};
/// `Session` multiplexes the `fuchsia.mediasession.Controller` implementation of
/// a published media session.
pub struct Session {
id: u64,
event_broadcaster: EventBroadcaster,
request_forwarder: RequestForwarder,
service_event_sink: Sender<ServiceEvent>,
}
impl Session {
/// Creates a new `Session` which multiplexes `controller_proxy`.
///
/// `Session` should not be used after it sends a `SessionClosed`
/// `ServiceEvent`.
pub fn new(
id: u64, controller_proxy: ControllerProxy, service_event_sink: Sender<ServiceEvent>,
) -> Result<Self> {
let event_stream = controller_proxy.take_event_stream();
Ok(Self {
id,
event_broadcaster: EventBroadcaster::new(id, service_event_sink.clone(), event_stream),
request_forwarder: RequestForwarder::new(controller_proxy),
service_event_sink,
})
}
pub fn id(&self) -> u64 {
self.id
}
pub async fn serve(self, mut fidl_requests: Receiver<ServerEnd<ControllerMarker>>) {
let (forwarder_handle, forwarder_registration) = future::AbortHandle::new_pair();
let request_sink = self.request_forwarder.start(forwarder_registration);
let mut service_event_sink = self.service_event_sink;
let session_id = self.id;
let mut listener_sink = self.event_broadcaster.start(
async move {
forwarder_handle.abort();
trylog!(
await!(service_event_sink.send(ServiceEvent::SessionClosed(session_id)))
.context("Sending Session epitaph.")
);
},
);
// Connect all new clients to the request forwarding and event broadcasting tasks.
while let Some(new_client) = await!(fidl_requests.next()) {
let request_sink = request_sink.clone();
let (request_stream, control_handle) = trylog!(new_client
.into_stream_and_control_handle()
.context("Converting client to request stream."));
// Send the control handle to the event broadcasting task.
trylog!(await!(listener_sink.send(control_handle)));
// Forward the request stream to the request serving task.
fasync::spawn(
request_stream
.filter_map(|result| {
future::ready(match result {
Ok(event) => Some(Ok(event)),
Err(_) => None,
})
})
.forward(request_sink)
.map(log_error_discard_result),
);
}
}
}
/// Forwards requests from all proxied clients to the backing
/// `fuchsia.mediasession.Controller` implementation.
struct RequestForwarder {
controller_proxy: ControllerProxy,
}
impl RequestForwarder {
fn new(controller_proxy: ControllerProxy) -> Self {
Self { controller_proxy }
}
fn start(self, abort_registration: future::AbortRegistration) -> Sender<ControllerRequest> {
let (request_sink, request_stream) = channel(CHANNEL_BUFFER_SIZE);
let serve_fut = future::Abortable::new(self.serve(request_stream), abort_registration)
.unwrap_or_else(|_| {
// This is just an abort message; ignore it.
// This happens when the backing implementation disconnects and is not an error.
});
fasync::spawn(serve_fut);
request_sink
}
async fn serve(self, mut request_stream: Receiver<ControllerRequest>) {
while let Some(request) = await!(request_stream.next()) {
trylog!(self.serve_request_with_controller(request));
}
}
/// Forwards a single request to the `ControllerProxy`.
fn serve_request_with_controller(&self, request: ControllerRequest) -> Result<()> {
match request {
ControllerRequest::Play { .. } => self.controller_proxy.play()?,
ControllerRequest::Pause { .. } => self.controller_proxy.pause()?,
ControllerRequest::Stop { .. } => self.controller_proxy.stop()?,
ControllerRequest::SeekToPosition { position, .. } => {
self.controller_proxy.seek_to_position(position)?
}
ControllerRequest::SkipForward { skip_amount, .. } => {
self.controller_proxy.skip_forward(skip_amount)?
}
ControllerRequest::SkipReverse { skip_amount, .. } => {
self.controller_proxy.skip_reverse(skip_amount)?
}
ControllerRequest::NextItem { .. } => self.controller_proxy.next_item()?,
ControllerRequest::PrevItem { .. } => self.controller_proxy.prev_item()?,
ControllerRequest::SetPlaybackRate { playback_rate, .. } => {
self.controller_proxy.set_playback_rate(playback_rate)?
}
ControllerRequest::SetRepeatMode { repeat_mode, .. } => {
self.controller_proxy.set_repeat_mode(repeat_mode)?
}
ControllerRequest::SetShuffleMode { shuffle_on, .. } => {
self.controller_proxy.set_shuffle_mode(shuffle_on)?
}
ControllerRequest::ConnectToExtension {
extension, channel, ..
} => self
.controller_proxy
.connect_to_extension(&extension, channel)?,
};
Ok(())
}
}
/// Stores the most recent events sent by the backing
/// `fuchsia.mediasession.Controller` implementation which represent the state of
/// the media session.
#[derive(Default)]
struct SessionState {
metadata: Option<ControllerEvent>,
playback_capabilities: Option<ControllerEvent>,
playback_status: Option<ControllerEvent>,
}
impl SessionState {
fn new() -> Self {
Default::default()
}
/// Updates the stored state with the new `event`.
fn update(&mut self, event: ControllerEvent) {
let to_update = match &event {
ControllerEvent::OnPlaybackStatusChanged { .. } => &mut self.playback_status,
ControllerEvent::OnMetadataChanged { .. } => &mut self.metadata,
ControllerEvent::OnPlaybackCapabilitiesChanged { .. } => {
&mut self.playback_capabilities
}
};
*to_update = Some(event);
}
}
/// Broadcasts events from the backing `Controller` implementation to all proxied
/// clients and, if it is qualifying activity, to the Media Session service so it
/// can track active sessions.
struct EventBroadcaster {
id: u64,
service_event_sink: Sender<ServiceEvent>,
source: ControllerEventStream,
}
impl EventBroadcaster {
fn new(
id: u64, service_event_sink: Sender<ServiceEvent>, source: ControllerEventStream,
) -> Self {
Self {
id,
service_event_sink,
source,
}
}
fn start(
self, epitaph: impl Future<Output = ()> + Send + 'static,
) -> Sender<ControllerControlHandle> {
let (listener_sink, listener_stream) = channel(CHANNEL_BUFFER_SIZE);
fasync::spawn(
async move {
await!(self.serve(listener_stream));
await!(epitaph);
},
);
listener_sink
}
/// Continuously broadcasts events from the `source` to listeners
/// received over the `listener_stream`.
///
/// New listeners are pushed the most recent state from before their
/// connection.
///
/// Event listeners are dropped as their client ends disconnect.
async fn serve(mut self, mut listener_stream: Receiver<ControllerControlHandle>) {
let mut session_state = SessionState::new();
let mut listeners = Vec::new();
loop {
select! {
event = self.source.try_next() => {
let event = trylogbreak!(event);
match event {
Some(mut event) => {
if Self::event_is_an_active_playback_status(&event) {
trylogbreak!(await!(self.service_event_sink.send(ServiceEvent::SessionActivity(self.id))));
}
Self::broadcast_event(&mut event, &mut listeners);
session_state.update(event);
},
None => break,
};
}
listener = listener_stream.next() => {
match listener.map(|listener| {
Self::deliver_state(&mut session_state, &listener).map(|_| listener)
}) {
Some(Ok(listener)) => listeners.push(listener),
Some(Err(e)) => eprintln!("Failed to push state to new event listener: {}.", e),
_ => (),
}
}
complete => {
break
},
}
}
for listener in listeners.into_iter() {
listener.shutdown();
}
}
/// Broadcasts an event to all event listeners by control handle.
///
/// Only connected event listeners are retained.
///
/// This will not work for events which have handles as they can only be sent
/// once.
fn broadcast_event(event: &mut ControllerEvent, listeners: &mut Vec<ControllerControlHandle>) {
listeners.retain(|listener| Self::send_event(listener, event).is_ok());
}
/// Sends an event to a listener by a control handle.
fn send_event(listener: &ControllerControlHandle, event: &mut ControllerEvent) -> Result<()> {
match event {
ControllerEvent::OnPlaybackStatusChanged { playback_status } => {
listener.send_on_playback_status_changed(playback_status)
}
ControllerEvent::OnMetadataChanged { media_metadata } => {
listener.send_on_metadata_changed(media_metadata)
}
ControllerEvent::OnPlaybackCapabilitiesChanged {
playback_capabilities,
} => listener.send_on_playback_capabilities_changed(playback_capabilities),
}
.map_err(Into::into)
}
/// Delivers `state` to `listener`.
fn deliver_state(state: &mut SessionState, listener: &ControllerControlHandle) -> Result<()> {
[
&mut state.metadata,
&mut state.playback_capabilities,
&mut state.playback_status,
]
.iter_mut()
.filter_map(|update: &mut &mut Option<ControllerEvent>| (*update).as_mut())
.map(|update: &mut ControllerEvent| Self::send_event(&listener, update))
.collect()
}
fn event_is_an_active_playback_status(event: &ControllerEvent) -> bool {
match *event {
ControllerEvent::OnPlaybackStatusChanged {
ref playback_status,
} if playback_status.playback_state == PlaybackState::Playing => true,
_ => false,
}
}
}