blob: b1a0e3a9292ff6d50fbdcdb1e32fd9828c30c870 [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
// TODO(turnage): Remove file after migrating clients to sessions2.
use crate::{
fidl_clones::*, log_error::log_error_discard_result, mpmc,
state::active_session_queue::ActiveSessionQueue, state::session_list::SessionList, Ref, Result,
};
use fidl::encoding::OutOfLine;
use fidl::endpoints::*;
use fidl_fuchsia_media::Metadata;
use fidl_fuchsia_media_sessions::*;
use fuchsia_async as fasync;
use fuchsia_zircon as zx;
use futures::{future::try_join, Future, FutureExt, StreamExt, TryStreamExt};
use std::{
collections::HashMap,
ops::{Deref, DerefMut},
rc::Rc,
};
#[derive(Clone, Debug)]
pub struct SessionRegistration {
pub id: Rc<zx::Event>,
pub koid: zx::Koid,
pub is_local: bool,
}
/// `Session` is the in-process proxy to a media session.
#[derive(Clone)]
pub struct Session {
proxy: Rc<SessionProxy>,
state: Ref<SessionState>,
events: mpmc::Receiver<Clonable<SessionEvent>>,
cancel_signal: mpmc::Receiver<()>,
}
#[derive(Debug, Clone)]
pub enum SessionCollectionEvent {
Added,
Removed,
}
impl Session {
pub async fn serve(
client_end: ClientEnd<SessionMarker>,
registration: SessionRegistration,
active_session_queue: Ref<ActiveSessionQueue>,
session_list: Ref<SessionList>,
collection_event_sink: mpmc::Sender<(SessionRegistration, SessionCollectionEvent)>,
active_session_sink: mpmc::Sender<Option<SessionRegistration>>,
) -> Result<Self> {
let proxy = client_end.into_proxy()?;
let mut event_stream = proxy.take_event_stream();
let event_sender = mpmc::Sender::default();
let event_receiver = event_sender.new_receiver();
let cancel_signaller = mpmc::Sender::with_buffer_size(1);
let cancel_signal = cancel_signaller.new_receiver();
let state = Ref::default();
let session = Session {
proxy: Rc::new(proxy),
events: event_receiver,
state: state.clone(),
cancel_signal,
};
fasync::spawn_local(async move {
while let Ok(Some(event)) = event_stream.try_next().await {
if is_active_status(&event) && registration.is_local {
let ref mut queue = active_session_queue.lock().await;
let active_session_changed = queue.promote_session(registration.clone());
if active_session_changed {
active_session_sink.send(queue.active_session()).await;
}
}
state.lock().await.deref_mut().update(&event);
event_sender.send(Clonable(event)).await;
}
session_list.lock().await.deref_mut().remove(registration.koid);
active_session_queue.lock().await.deref_mut().remove_session(&registration);
collection_event_sink
.send((registration.clone(), SessionCollectionEvent::Removed))
.await;
cancel_signaller.send(()).await;
});
Ok(session)
}
pub async fn connect(&self, server_end: ServerEnd<SessionMarker>) -> Result<()> {
let (request_stream, control_handle) = server_end.into_stream_and_control_handle()?;
let events_to_catch_up_client = self.state.lock().await.deref().events();
for event in events_to_catch_up_client {
if Self::send_event(&control_handle, &event).is_err() {
// Client is disconnected.
return Ok(());
}
}
fasync::spawn_local(
try_join(self.request_forwarder(request_stream), self.event_forwarder(control_handle))
.map(log_error_discard_result),
);
Ok(())
}
fn request_forwarder(
&self,
mut request_stream: SessionRequestStream,
) -> impl Future<Output = Result<()>> {
let proxy = self.proxy.clone();
let mut cancel_signal = self.cancel_signal.clone();
async move {
let mut cancel_signal = cancel_signal.next().fuse();
loop {
futures::select! {
request = request_stream.select_next_some() => {
Self::serve_request(proxy.deref(), request?).await?;
},
_cancel = cancel_signal => {
break;
}
}
}
Ok(())
}
}
fn event_forwarder(
&self,
control_handle: SessionControlHandle,
) -> impl Future<Output = Result<()>> {
let mut event_stream = self.events.clone();
let mut cancel_signal = self.cancel_signal.clone();
async move {
let mut cancel_signal = cancel_signal.next().fuse();
loop {
futures::select! {
Clonable(event) = event_stream.select_next_some() => {
Self::send_event(&control_handle, &event)?;
}
_cancel = cancel_signal => {
break;
}
}
}
Ok(())
}
}
fn send_event(control_handle: &SessionControlHandle, event: &SessionEvent) -> Result<()> {
Ok(match event {
SessionEvent::OnPlaybackStatusChanged { playback_status } => control_handle
.send_on_playback_status_changed(clone_playback_status(playback_status)),
SessionEvent::OnMetadataChanged { media_metadata } => {
control_handle.send_on_metadata_changed(&mut media_metadata.clone())
}
SessionEvent::OnPlaybackCapabilitiesChanged { playback_capabilities } => control_handle
.send_on_playback_capabilities_changed(clone_playback_capabilities(
playback_capabilities,
)),
SessionEvent::OnMediaImagesChanged { media_images } => {
let mut images: Vec<MediaImage> =
media_images.iter().map(clone_media_image).collect();
control_handle.send_on_media_images_changed(&mut images.iter_mut())
}
}?)
}
async fn serve_request(proxy: &SessionProxy, request: SessionRequest) -> Result<()> {
match request {
SessionRequest::Play { .. } => proxy.play()?,
SessionRequest::Pause { .. } => proxy.pause()?,
SessionRequest::Stop { .. } => proxy.stop()?,
SessionRequest::SeekToPosition { position, .. } => proxy.seek_to_position(position)?,
SessionRequest::SkipForward { skip_amount, .. } => proxy.skip_forward(skip_amount)?,
SessionRequest::SkipReverse { skip_amount, .. } => proxy.skip_reverse(skip_amount)?,
SessionRequest::NextItem { .. } => proxy.next_item()?,
SessionRequest::PrevItem { .. } => proxy.prev_item()?,
SessionRequest::SetPlaybackRate { playback_rate, .. } => {
proxy.set_playback_rate(playback_rate)?
}
SessionRequest::SetRepeatMode { repeat_mode, .. } => {
proxy.set_repeat_mode(repeat_mode)?
}
SessionRequest::SetShuffleMode { shuffle_on, .. } => {
proxy.set_shuffle_mode(shuffle_on)?
}
SessionRequest::BindGainControl { gain_control_request, .. } => {
proxy.bind_gain_control(gain_control_request)?
}
SessionRequest::ConnectToExtension { extension, channel, .. } => {
proxy.connect_to_extension(&extension, channel)?
}
SessionRequest::GetMediaImageBitmap {
url,
mut minimum_size,
mut desired_size,
responder,
} => {
let mut bitmap = proxy.get_media_image_bitmap(
&url,
&mut minimum_size,
&mut desired_size,
).await?;
let response = bitmap.as_mut().map(|b| OutOfLine(b.deref_mut()));
responder.send(response)?
}
};
Ok(())
}
}
/// `SessionState` keeps the last advertised state from each of a session's event
/// streams so that new clients can be caught up when they connect.
#[derive(Debug, Default)]
struct SessionState {
playback_status: Option<PlaybackStatus>,
playback_capabilities: Option<PlaybackCapabilities>,
media_metadata: Option<Metadata>,
media_images: HashMap<MediaImageType, MediaImage>,
}
impl Clone for SessionState {
fn clone(&self) -> Self {
Self {
playback_status: self.playback_status.as_ref().map(clone_playback_status),
playback_capabilities: self
.playback_capabilities
.as_ref()
.map(clone_playback_capabilities),
media_metadata: self.media_metadata.clone(),
media_images: self
.media_images
.iter()
.map(|(image_type, image)| (*image_type, clone_media_image(image)))
.collect(),
}
}
}
impl SessionState {
/// Returns the state with each field represented as an event.
pub fn events(&self) -> Vec<SessionEvent> {
let mut events = Vec::new();
if let Some(event) = self
.playback_status
.as_ref()
.map(clone_playback_status)
.map(|playback_status| SessionEvent::OnPlaybackStatusChanged { playback_status })
{
events.push(event);
}
if let Some(event) =
self.playback_capabilities.as_ref().map(clone_playback_capabilities).map(
|playback_capabilities| SessionEvent::OnPlaybackCapabilitiesChanged {
playback_capabilities,
},
)
{
events.push(event);
}
if let Some(event) = self
.media_metadata
.clone()
.map(|media_metadata| SessionEvent::OnMetadataChanged { media_metadata })
{
events.push(event);
}
// We don't want to send an empty list of media images on first connection.
if !self.media_images.is_empty() {
events.push(SessionEvent::OnMediaImagesChanged {
media_images: self.media_images.values().map(clone_media_image).collect(),
});
}
events
}
fn update(&mut self, event: &SessionEvent) {
match event {
SessionEvent::OnPlaybackStatusChanged { ref playback_status } => {
self.playback_status.get_or_insert_with(|| clone_playback_status(playback_status));
}
SessionEvent::OnMetadataChanged { ref media_metadata } => {
self.media_metadata.get_or_insert_with(|| media_metadata.clone());
}
SessionEvent::OnPlaybackCapabilitiesChanged { ref playback_capabilities } => {
self.playback_capabilities
.get_or_insert_with(|| clone_playback_capabilities(playback_capabilities));
}
SessionEvent::OnMediaImagesChanged { media_images } => {
for image in media_images {
self.media_images
.entry(image.image_type)
.or_insert_with(|| clone_media_image(image));
}
}
}
}
}
fn is_active_status(event: &SessionEvent) -> bool {
match event {
SessionEvent::OnPlaybackStatusChanged {
playback_status: PlaybackStatus { playback_state: Some(PlaybackState::Playing), .. },
..
} => true,
_ => false,
}
}