| // Copyright 2018 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| use crate::{Result, SessionId}; |
| use anyhow::Context as _; |
| use assert_matches::assert_matches; |
| use diagnostics_reader::{ArchiveReader, ComponentSelector, Inspect}; |
| use fidl::endpoints::{create_endpoints, create_proxy, create_request_stream}; |
| use fidl_fuchsia_diagnostics::*; |
| use fidl_fuchsia_logger::LogSinkMarker; |
| use fidl_fuchsia_media::*; |
| use fidl_fuchsia_media_sessions2::*; |
| use fuchsia_async as fasync; |
| use fuchsia_component::{client, server::*}; |
| use fuchsia_component_test::{ |
| Capability, ChildOptions, LocalComponentHandles, RealmBuilder, RealmInstance, Ref, Route, |
| }; |
| use fuchsia_inspect as inspect; |
| use futures::{ |
| channel::mpsc, |
| sink::SinkExt, |
| stream::{StreamExt, TryStreamExt}, |
| }; |
| use std::collections::HashMap; |
| |
| const MEDIASESSION_URL: &str = "#meta/mediasession.cm"; |
| const MEDIASESSION_NAME: &str = "mediasession"; |
| |
| struct TestService { |
| #[allow(unused)] |
| realm: RealmInstance, |
| publisher: PublisherProxy, |
| discovery: DiscoveryProxy, |
| archive: ArchiveAccessorProxy, |
| observer_discovery: ObserverDiscoveryProxy, |
| new_usage_watchers: mpsc::Receiver<(AudioRenderUsage, UsageWatcherProxy)>, |
| usage_watchers: HashMap<AudioRenderUsage, UsageWatcherProxy>, |
| } |
| |
| impl TestService { |
| async fn new() -> Result<Self> { |
| let builder = RealmBuilder::new().await.unwrap(); |
| let mediasession = builder |
| .add_child(MEDIASESSION_NAME, MEDIASESSION_URL, ChildOptions::new()) |
| .await |
| .unwrap(); |
| let (new_usage_watchers_sink, new_usage_watchers) = mpsc::channel(10); |
| let usage_reporter = builder |
| .add_local_child( |
| "usage_reporter", |
| move |handles: LocalComponentHandles| { |
| let new_usage_watchers_sink = new_usage_watchers_sink.clone(); |
| Box::pin(Self::usage_reporter_mock(handles, new_usage_watchers_sink)) |
| }, |
| ChildOptions::new(), |
| ) |
| .await |
| .unwrap(); |
| builder |
| .add_route( |
| Route::new() |
| .capability(Capability::protocol::<LogSinkMarker>()) |
| .from(Ref::parent()) |
| .to(&mediasession), |
| ) |
| .await |
| .unwrap(); |
| builder |
| .add_route( |
| Route::new() |
| .capability(Capability::protocol::<PublisherMarker>()) |
| .from(&mediasession) |
| .to(Ref::parent()), |
| ) |
| .await |
| .unwrap(); |
| builder |
| .add_route( |
| Route::new() |
| .capability(Capability::protocol::<DiscoveryMarker>()) |
| .from(&mediasession) |
| .to(Ref::parent()), |
| ) |
| .await |
| .unwrap(); |
| builder |
| .add_route( |
| Route::new() |
| .capability(Capability::protocol::<ObserverDiscoveryMarker>()) |
| .from(&mediasession) |
| .to(Ref::parent()), |
| ) |
| .await |
| .unwrap(); |
| builder |
| .add_route( |
| Route::new() |
| .capability(Capability::protocol::<ActiveSessionMarker>()) |
| .from(&mediasession) |
| .to(Ref::parent()), |
| ) |
| .await |
| .unwrap(); |
| builder |
| .add_route( |
| Route::new() |
| .capability(Capability::protocol::<UsageReporterMarker>()) |
| .from(&usage_reporter) |
| .to(&mediasession), |
| ) |
| .await?; |
| |
| let realm = builder.build().await.unwrap(); |
| |
| let publisher = realm |
| .root |
| .connect_to_protocol_at_exposed_dir::<PublisherMarker>() |
| .context("Connecting to Publisher")?; |
| let discovery = realm |
| .root |
| .connect_to_protocol_at_exposed_dir::<DiscoveryMarker>() |
| .context("Connecting to Discovery")?; |
| let observer_discovery = realm |
| .root |
| .connect_to_protocol_at_exposed_dir::<ObserverDiscoveryMarker>() |
| .context("Connecting to ObserverDiscovery")?; |
| let archive = client::connect_to_protocol::<ArchiveAccessorMarker>() |
| .context("Connecting to ArchiveAccessor")?; |
| |
| Ok(Self { |
| realm, |
| publisher, |
| discovery, |
| archive, |
| observer_discovery, |
| new_usage_watchers, |
| usage_watchers: HashMap::new(), |
| }) |
| } |
| |
| async fn usage_reporter_mock( |
| handles: LocalComponentHandles, |
| new_usage_watchers_sink: mpsc::Sender<(AudioRenderUsage, UsageWatcherProxy)>, |
| ) -> Result<()> { |
| let mut fs = ServiceFs::new(); |
| let mut tasks = vec![]; |
| |
| fs.dir("svc").add_fidl_service::<_, UsageReporterRequestStream>( |
| move |mut request_stream| { |
| let mut new_usage_watchers_sink = new_usage_watchers_sink.clone(); |
| tasks.push(fasync::Task::local(async move { |
| while let Some(Ok(UsageReporterRequest::Watch { |
| usage, usage_watcher, .. |
| })) = request_stream.next().await |
| { |
| match (usage, usage_watcher.into_proxy()) { |
| (Usage::RenderUsage(usage), Ok(usage_watcher)) => { |
| new_usage_watchers_sink |
| .send((usage, usage_watcher)) |
| .await |
| .expect("Forwarding new UsageWatcher from service under test"); |
| } |
| (_, Ok(_)) => { |
| println!("Service under test tried to watch a capture usage") |
| } |
| (_, Err(e)) => println!("Service under test sent bad request: {:?}", e), |
| } |
| } |
| })); |
| }, |
| ); |
| |
| fs.serve_connection(handles.outgoing_dir)?; |
| fs.collect::<()>().await; |
| |
| Ok(()) |
| } |
| |
| fn new_watcher(&self, watch_options: WatchOptions) -> Result<TestWatcher> { |
| let (watcher_client, watcher_server) = create_endpoints(); |
| self.discovery.watch_sessions(&watch_options, watcher_client)?; |
| Ok(TestWatcher { |
| watcher: watcher_server.into_stream().context("Turning watcher into stream")?, |
| }) |
| } |
| |
| fn new_observer_watcher(&self, watch_options: WatchOptions) -> Result<TestWatcher> { |
| let (watcher_client, watcher_server) = create_endpoints(); |
| self.observer_discovery.watch_sessions(&watch_options, watcher_client)?; |
| Ok(TestWatcher { |
| watcher: watcher_server |
| .into_stream() |
| .context("Turning observer watcher into stream")?, |
| }) |
| } |
| |
| async fn dequeue_watcher(&mut self) { |
| if let Some((usage, watcher)) = self.new_usage_watchers.next().await { |
| self.usage_watchers.insert(usage, watcher); |
| } else { |
| panic!("Watcher channel closed.") |
| } |
| } |
| |
| async fn start_interruption(&mut self, usage: AudioRenderUsage) { |
| if let Some(watcher) = self.usage_watchers.get(&usage) { |
| watcher |
| .on_state_changed( |
| &Usage::RenderUsage(usage), |
| &UsageState::Muted(UsageStateMuted::default()), |
| ) |
| .await |
| .expect("Sending interruption start to service under test"); |
| } else { |
| panic!("Can't start interruption; no watcher is registered for usage {:?}", usage) |
| } |
| } |
| |
| async fn stop_interruption(&mut self, usage: AudioRenderUsage) { |
| if let Some(watcher) = self.usage_watchers.get(&usage) { |
| watcher |
| .on_state_changed( |
| &Usage::RenderUsage(usage), |
| &UsageState::Unadjusted(UsageStateUnadjusted::default()), |
| ) |
| .await |
| .expect("Sending interruption stop to service under test"); |
| } else { |
| panic!("Can't stop interruption; no watcher is registered for usage {:?}", usage) |
| } |
| } |
| |
| async fn inspect_tree(&mut self) -> inspect::hierarchy::DiagnosticsHierarchy { |
| ArchiveReader::new() |
| .with_archive(self.archive.clone()) |
| .add_selector(ComponentSelector::new(vec![format!( |
| "realm_builder\\:{}/{}", |
| self.realm.root.child_name(), |
| MEDIASESSION_NAME, |
| )])) |
| .snapshot::<Inspect>() |
| .await |
| .expect("Got batch") |
| .into_iter() |
| .next() |
| .and_then(|result| result.payload) |
| .expect("Got payload") |
| } |
| } |
| |
| struct TestWatcher { |
| watcher: SessionsWatcherRequestStream, |
| } |
| |
| impl TestWatcher { |
| async fn wait_for_n_updates(&mut self, n: usize) -> Result<Vec<(SessionId, SessionInfoDelta)>> { |
| let mut updates: Vec<(SessionId, SessionInfoDelta)> = vec![]; |
| for i in 0..n { |
| let (id, delta, responder) = self |
| .watcher |
| .try_next() |
| .await? |
| .and_then(|r| r.into_session_updated()) |
| .with_context(|| format!("Unwrapping watcher request {:?}", i))?; |
| responder.send().with_context(|| format!("Sending ack for watcher request {:?}", i))?; |
| updates.push((id, delta)); |
| } |
| Ok(updates) |
| } |
| |
| async fn wait_for_removal(&mut self) -> Result<SessionId> { |
| let (id, responder) = self |
| .watcher |
| .try_next() |
| .await? |
| .and_then(|r| r.into_session_removed()) |
| .context("Unwrapping watcher request for awaited removal")?; |
| responder.send().context("Sending ack for removal")?; |
| Ok(id) |
| } |
| } |
| |
| struct TestPlayer { |
| requests: PlayerRequestStream, |
| id: SessionId, |
| } |
| |
| impl TestPlayer { |
| async fn new(service: &TestService) -> Result<Self> { |
| let (player_client, requests) = |
| create_request_stream().context("Creating player request stream")?; |
| let id = service |
| .publisher |
| .publish( |
| player_client, |
| &PlayerRegistration { domain: Some(test_domain()), ..Default::default() }, |
| ) |
| .await |
| .context("Registering new player")?; |
| Ok(Self { requests, id }) |
| } |
| |
| async fn emit_delta(&mut self, delta: PlayerInfoDelta) -> Result<()> { |
| match self.requests.try_next().await? { |
| Some(PlayerRequest::WatchInfoChange { responder }) => responder.send(&delta)?, |
| _ => { |
| return Err(anyhow::anyhow!("Expected status change request.")); |
| } |
| } |
| |
| Ok(()) |
| } |
| |
| async fn wait_for_request(&mut self, predicate: impl Fn(PlayerRequest) -> bool) -> Result<()> { |
| while let Some(request) = self.requests.try_next().await? { |
| if predicate(request) { |
| return Ok(()); |
| } |
| } |
| Err(anyhow::anyhow!("Did not receive request that matched predicate.")) |
| } |
| } |
| |
| fn test_domain() -> String { |
| String::from("domain://TEST") |
| } |
| |
| fn delta_with_state(state: PlayerState) -> PlayerInfoDelta { |
| PlayerInfoDelta { |
| player_status: Some(PlayerStatus { |
| player_state: Some(state), |
| repeat_mode: Some(RepeatMode::Off), |
| shuffle_on: Some(false), |
| content_type: Some(ContentType::Audio), |
| ..Default::default() |
| }), |
| ..Default::default() |
| } |
| } |
| |
| fn local_delta_with_state(state: PlayerState) -> PlayerInfoDelta { |
| let mut delta = delta_with_state(state); |
| delta.local = Some(true); |
| delta |
| } |
| |
| fn remote_delta_with_state(state: PlayerState) -> PlayerInfoDelta { |
| let mut delta = delta_with_state(state); |
| delta.local = Some(false); |
| delta |
| } |
| |
| fn delta_with_interruption( |
| state: PlayerState, |
| interruption_behavior: InterruptionBehavior, |
| ) -> PlayerInfoDelta { |
| let mut delta = delta_with_state(state); |
| delta.interruption_behavior = Some(interruption_behavior); |
| delta |
| } |
| |
| #[fuchsia::test(logging_tags = ["mediasession_tests"])] |
| async fn can_publish_players() -> Result<()> { |
| let service = TestService::new().await?; |
| |
| let mut player = TestPlayer::new(&service).await?; |
| let mut watcher = service.new_watcher(Default::default())?; |
| |
| player.emit_delta(delta_with_state(PlayerState::Playing)).await?; |
| let mut sessions = watcher.wait_for_n_updates(1).await?; |
| |
| let (_id, delta) = sessions.remove(0); |
| assert_eq!(delta.domain, Some(test_domain())); |
| |
| Ok(()) |
| } |
| |
| #[fuchsia::test(logging_tags = ["mediasession_tests"])] |
| async fn can_receive_deltas() -> Result<()> { |
| let service = TestService::new().await?; |
| |
| let mut player1 = TestPlayer::new(&service).await?; |
| let mut player2 = TestPlayer::new(&service).await?; |
| let mut watcher = service.new_watcher(Default::default())?; |
| |
| player1.emit_delta(delta_with_state(PlayerState::Playing)).await?; |
| player2.emit_delta(delta_with_state(PlayerState::Playing)).await?; |
| let _ = watcher.wait_for_n_updates(2).await?; |
| |
| player2 |
| .emit_delta(PlayerInfoDelta { |
| player_capabilities: Some(PlayerCapabilities { |
| flags: Some(PlayerCapabilityFlags::PLAY), |
| ..Default::default() |
| }), |
| ..Default::default() |
| }) |
| .await?; |
| let mut updates = watcher.wait_for_n_updates(1).await?; |
| let (_id, delta) = updates.remove(0); |
| assert_eq!( |
| delta.player_capabilities, |
| Some(PlayerCapabilities { flags: Some(PlayerCapabilityFlags::PLAY), ..Default::default() }) |
| ); |
| |
| player1 |
| .emit_delta(PlayerInfoDelta { |
| player_capabilities: Some(PlayerCapabilities { |
| flags: Some(PlayerCapabilityFlags::PAUSE), |
| ..Default::default() |
| }), |
| ..Default::default() |
| }) |
| .await?; |
| let mut updates = watcher.wait_for_n_updates(1).await?; |
| let (_id, delta) = updates.remove(0); |
| assert_eq!( |
| delta.player_capabilities, |
| Some(PlayerCapabilities { |
| flags: Some(PlayerCapabilityFlags::PAUSE), |
| ..Default::default() |
| }) |
| ); |
| |
| Ok(()) |
| } |
| |
| #[fuchsia::test(logging_tags = ["mediasession_tests"])] |
| async fn active_status() -> Result<()> { |
| let service = TestService::new().await?; |
| |
| let mut player = TestPlayer::new(&service).await?; |
| let mut watcher = service.new_watcher(Default::default())?; |
| |
| player.emit_delta(delta_with_state(PlayerState::Idle)).await?; |
| let _ = watcher.wait_for_n_updates(1).await?; |
| |
| player.emit_delta(delta_with_state(PlayerState::Playing)).await?; |
| let mut updates = watcher.wait_for_n_updates(1).await?; |
| let (_, delta) = updates.remove(0); |
| assert_eq!( |
| delta.is_locally_active, |
| Some(true), |
| "Expected unknown locality playing state to be locally active." |
| ); |
| |
| player.emit_delta(delta_with_state(PlayerState::Paused)).await?; |
| let mut updates = watcher.wait_for_n_updates(1).await?; |
| let (_, delta) = updates.remove(0); |
| assert_eq!( |
| delta.is_locally_active, |
| Some(false), |
| "Expected unknown locality paused state not to be locally active." |
| ); |
| |
| player.emit_delta(local_delta_with_state(PlayerState::Playing)).await?; |
| let mut updates = watcher.wait_for_n_updates(1).await?; |
| let (_, delta) = updates.remove(0); |
| assert_eq!( |
| delta.is_locally_active, |
| Some(true), |
| "Expected local playing state to be locally active." |
| ); |
| |
| player.emit_delta(remote_delta_with_state(PlayerState::Playing)).await?; |
| let mut updates = watcher.wait_for_n_updates(1).await?; |
| let (_, delta) = updates.remove(0); |
| assert_eq!( |
| delta.is_locally_active, |
| Some(false), |
| "Expected remote playing state not to be locally active." |
| ); |
| |
| player.emit_delta(local_delta_with_state(PlayerState::Playing)).await?; |
| let mut updates = watcher.wait_for_n_updates(1).await?; |
| let (_, delta) = updates.remove(0); |
| assert_eq!( |
| delta.is_locally_active, |
| Some(true), |
| "Expected local playing state to be locally active." |
| ); |
| |
| Ok(()) |
| } |
| |
| #[fuchsia::test(logging_tags = ["mediasession_tests"])] |
| async fn player_controls_are_proxied() -> Result<()> { |
| let service = TestService::new().await?; |
| |
| let mut player = TestPlayer::new(&service).await?; |
| let mut watcher = service.new_watcher(Default::default())?; |
| |
| player.emit_delta(delta_with_state(PlayerState::Playing)).await?; |
| let mut updates = watcher.wait_for_n_updates(1).await?; |
| let (id, _) = updates.remove(0); |
| |
| // We take the watch request from the player's queue and don't answer it, so that |
| // the stream of requests coming in that we match on down below doesn't contain it. |
| let _watch_request = player.requests.try_next().await?; |
| |
| let (session_client, session_server) = create_endpoints(); |
| let session: SessionControlProxy = session_client.into_proxy()?; |
| session.play()?; |
| service.discovery.connect_to_session(id, session_server)?; |
| |
| player |
| .wait_for_request(|request| match request { |
| PlayerRequest::Play { .. } => true, |
| _ => false, |
| }) |
| .await?; |
| |
| let (_volume_client, volume_server) = create_endpoints(); |
| session.bind_volume_control(volume_server)?; |
| player |
| .wait_for_request(|request| match request { |
| PlayerRequest::BindVolumeControl { .. } => true, |
| _ => false, |
| }) |
| .await |
| } |
| |
| #[fuchsia::test(logging_tags = ["mediasession_tests"])] |
| async fn player_disconnection_propagates() -> Result<()> { |
| let service = TestService::new().await?; |
| |
| let mut player = TestPlayer::new(&service).await?; |
| let mut watcher = service.new_watcher(Default::default())?; |
| |
| player.emit_delta(delta_with_state(PlayerState::Playing)).await?; |
| let mut updates = watcher.wait_for_n_updates(1).await?; |
| let (id, _) = updates.remove(0); |
| |
| let (session_client, session_server) = create_endpoints(); |
| let session: SessionControlProxy = session_client.into_proxy()?; |
| service.discovery.connect_to_session(id, session_server)?; |
| |
| drop(player); |
| watcher.wait_for_removal().await?; |
| let mut session_events = session.take_event_stream(); |
| while let Some(_) = session_events.next().await {} |
| |
| Ok(()) |
| } |
| |
| #[fuchsia::test(logging_tags = ["mediasession_tests"])] |
| async fn watch_filter_active() -> Result<()> { |
| let service = TestService::new().await?; |
| |
| let mut player1 = TestPlayer::new(&service).await?; |
| let mut player2 = TestPlayer::new(&service).await?; |
| let _player3 = TestPlayer::new(&service).await?; |
| let mut active_watcher = |
| service.new_watcher(WatchOptions { only_active: Some(true), ..Default::default() })?; |
| |
| player1.emit_delta(delta_with_state(PlayerState::Playing)).await?; |
| let updates = active_watcher.wait_for_n_updates(1).await?; |
| assert_eq!(updates.len(), 1); |
| assert_eq!(updates[0].1.is_locally_active, Some(true), "Update: {:?}", updates[0]); |
| let player1_id = updates[0].0; |
| |
| player2.emit_delta(delta_with_state(PlayerState::Playing)).await?; |
| let updates = active_watcher.wait_for_n_updates(1).await?; |
| assert_eq!(updates.len(), 1); |
| assert_eq!(updates[0].1.is_locally_active, Some(true), "Update: {:?}", updates[1]); |
| |
| player1.emit_delta(delta_with_state(PlayerState::Paused)).await?; |
| assert_eq!(active_watcher.wait_for_removal().await?, player1_id); |
| |
| Ok(()) |
| } |
| |
| #[fuchsia::test(logging_tags = ["mediasession_tests"])] |
| async fn disconnected_player_results_in_removal_event() -> Result<()> { |
| let service = TestService::new().await?; |
| |
| let mut player1 = TestPlayer::new(&service).await?; |
| let mut watcher = service.new_watcher(Default::default())?; |
| |
| player1.emit_delta(delta_with_state(PlayerState::Playing)).await?; |
| let _updates = watcher.wait_for_n_updates(1).await?; |
| |
| let expected_id = player1.id; |
| drop(player1); |
| let removed_id = watcher.wait_for_removal().await?; |
| assert_eq!(removed_id, expected_id); |
| |
| Ok(()) |
| } |
| |
| #[fuchsia::test(logging_tags = ["mediasession_tests"])] |
| async fn player_status() -> Result<()> { |
| let service = TestService::new().await?; |
| |
| let mut player = TestPlayer::new(&service).await?; |
| |
| let expected_player_status = || PlayerStatus { |
| duration: Some(11), |
| is_live: Some(true), |
| player_state: Some(PlayerState::Playing), |
| timeline_function: Some(TimelineFunction { |
| subject_time: 0, |
| reference_time: 10, |
| subject_delta: 1, |
| reference_delta: 1, |
| }), |
| repeat_mode: Some(RepeatMode::Group), |
| shuffle_on: Some(true), |
| content_type: Some(ContentType::Movie), |
| error: Some(Error::Other), |
| ..Default::default() |
| }; |
| |
| player |
| .emit_delta(PlayerInfoDelta { |
| player_status: Some(expected_player_status()), |
| ..Default::default() |
| }) |
| .await?; |
| |
| let (session, session_request) = create_proxy()?; |
| service.discovery.connect_to_session(player.id, session_request)?; |
| let status = session.watch_status().await.expect("Watching player status"); |
| let actual_player_status = status.player_status.expect("Unwrapping player status"); |
| |
| assert_eq!(actual_player_status, expected_player_status()); |
| |
| Ok(()) |
| } |
| |
| #[fuchsia::test(logging_tags = ["mediasession_tests"])] |
| async fn player_capabilities() -> Result<()> { |
| let service = TestService::new().await?; |
| |
| let mut player = TestPlayer::new(&service).await?; |
| |
| let expected_player_capabilities = || PlayerCapabilities { |
| flags: Some(PlayerCapabilityFlags::PAUSE | PlayerCapabilityFlags::SKIP_FORWARD), |
| ..Default::default() |
| }; |
| |
| player |
| .emit_delta(PlayerInfoDelta { |
| player_capabilities: Some(expected_player_capabilities()), |
| ..Default::default() |
| }) |
| .await?; |
| |
| let (session, session_request) = create_proxy()?; |
| service.discovery.connect_to_session(player.id, session_request)?; |
| let status = session.watch_status().await.expect("Watching player capabilities"); |
| let actual_player_capabilities = |
| status.player_capabilities.expect("Unwrapping player capabilities"); |
| |
| assert_eq!(actual_player_capabilities, expected_player_capabilities()); |
| |
| Ok(()) |
| } |
| |
| #[fuchsia::test(logging_tags = ["mediasession_tests"])] |
| async fn media_images() -> Result<()> { |
| let service = TestService::new().await?; |
| |
| let mut player = TestPlayer::new(&service).await?; |
| |
| let expected_media_images = || { |
| vec![ |
| MediaImage { |
| image_type: Some(MediaImageType::SourceIcon), |
| sizes: Some(vec![ImageSizeVariant { |
| url: String::from("http://url1"), |
| width: 10, |
| height: 10, |
| }]), |
| ..Default::default() |
| }, |
| MediaImage { |
| image_type: Some(MediaImageType::Artwork), |
| sizes: Some(vec![ImageSizeVariant { |
| url: String::from("http://url1"), |
| width: 10, |
| height: 10, |
| }]), |
| ..Default::default() |
| }, |
| ] |
| }; |
| |
| player |
| .emit_delta(PlayerInfoDelta { |
| media_images: Some(expected_media_images()), |
| ..Default::default() |
| }) |
| .await?; |
| |
| let (session, session_request) = create_proxy()?; |
| service.discovery.connect_to_session(player.id, session_request)?; |
| let status = session.watch_status().await.expect("Watching media images"); |
| let actual_media_images = status.media_images.expect("Unwrapping media images"); |
| |
| assert_eq!(actual_media_images, expected_media_images()); |
| |
| Ok(()) |
| } |
| |
| #[fuchsia::test(logging_tags = ["mediasession_tests"])] |
| async fn players_get_ids() -> Result<()> { |
| let service = TestService::new().await?; |
| |
| let player1 = TestPlayer::new(&service).await?; |
| let player2 = TestPlayer::new(&service).await?; |
| |
| assert_ne!(player1.id, player2.id); |
| |
| Ok(()) |
| } |
| |
| #[fuchsia::test(logging_tags = ["mediasession_tests"])] |
| async fn session_controllers_can_watch_session_status() -> Result<()> { |
| let service = TestService::new().await?; |
| let mut watcher = service.new_watcher(Default::default())?; |
| |
| let mut player1 = TestPlayer::new(&service).await?; |
| let mut player2 = TestPlayer::new(&service).await?; |
| |
| let (session1, session1_request) = create_proxy()?; |
| player1.emit_delta(delta_with_state(PlayerState::Playing)).await?; |
| let _updates = watcher.wait_for_n_updates(1).await?; |
| |
| service.discovery.connect_to_session(player1.id, session1_request)?; |
| let status1 = session1.watch_status().await.context("Watching session status (1st time)")?; |
| assert_matches!( |
| status1.player_status, |
| Some(PlayerStatus { player_state: Some(PlayerState::Playing), .. }) |
| ); |
| |
| player2.emit_delta(delta_with_state(PlayerState::Buffering)).await?; |
| player1.emit_delta(delta_with_state(PlayerState::Paused)).await?; |
| let _updates = watcher.wait_for_n_updates(2).await?; |
| let status1 = session1.watch_status().await.context("Watching session status (2nd time)")?; |
| assert_matches!( |
| status1.player_status, |
| Some(PlayerStatus { player_state: Some(PlayerState::Paused), .. }) |
| ); |
| |
| Ok(()) |
| } |
| |
| #[fuchsia::test(logging_tags = ["mediasession_tests"])] |
| async fn session_observers_can_watch_session_status() -> Result<()> { |
| let service = TestService::new().await?; |
| let mut watcher = service.new_observer_watcher(Default::default())?; |
| |
| let mut player1 = TestPlayer::new(&service).await?; |
| let mut player2 = TestPlayer::new(&service).await?; |
| |
| player1.emit_delta(delta_with_state(PlayerState::Playing)).await?; |
| let _updates = watcher.wait_for_n_updates(1).await?; |
| |
| let (session1, session1_request) = create_proxy()?; |
| service.observer_discovery.connect_to_session(player1.id, session1_request)?; |
| let status1 = session1.watch_status().await.context("Watching session status (1st time)")?; |
| assert_matches!( |
| status1.player_status, |
| Some(PlayerStatus { player_state: Some(PlayerState::Playing), .. }) |
| ); |
| |
| player2.emit_delta(delta_with_state(PlayerState::Buffering)).await?; |
| player1.emit_delta(delta_with_state(PlayerState::Paused)).await?; |
| let _updates = watcher.wait_for_n_updates(2).await?; |
| let status1 = session1.watch_status().await.context("Watching session status (2nd time)")?; |
| assert_matches!( |
| status1.player_status, |
| Some(PlayerStatus { player_state: Some(PlayerState::Paused), .. }) |
| ); |
| |
| Ok(()) |
| } |
| |
| #[fuchsia::test(logging_tags = ["mediasession_tests"])] |
| async fn player_disconnection_disconects_observers() -> Result<()> { |
| let service = TestService::new().await?; |
| let mut watcher = service.new_observer_watcher(Default::default())?; |
| |
| let mut player = TestPlayer::new(&service).await?; |
| |
| player.emit_delta(delta_with_state(PlayerState::Playing)).await?; |
| let _updates = watcher.wait_for_n_updates(1).await?; |
| |
| let (session, session_request) = create_proxy()?; |
| service.observer_discovery.connect_to_session(player.id, session_request)?; |
| assert!(session.watch_status().await.is_ok()); |
| |
| drop(player); |
| while let Ok(_) = session.watch_status().await {} |
| |
| // Passes by terminating, indicating the observer is disconnected. |
| |
| Ok(()) |
| } |
| |
| #[fuchsia::test(logging_tags = ["mediasession_tests"])] |
| async fn observers_caught_up_with_state_of_session() -> Result<()> { |
| let service = TestService::new().await?; |
| let mut watcher = service.new_observer_watcher(Default::default())?; |
| |
| let mut player = TestPlayer::new(&service).await?; |
| |
| player.emit_delta(delta_with_state(PlayerState::Playing)).await?; |
| let _updates = watcher.wait_for_n_updates(1).await?; |
| |
| let (session1, session1_request) = create_proxy()?; |
| service.observer_discovery.connect_to_session(player.id, session1_request)?; |
| let status1 = session1.watch_status().await.context("Watching session status (1st time)")?; |
| assert_matches!( |
| status1.player_status, |
| Some(PlayerStatus { player_state: Some(PlayerState::Playing), .. }) |
| ); |
| |
| let (session2, session2_request) = create_proxy()?; |
| service.observer_discovery.connect_to_session(player.id, session2_request)?; |
| let status2 = session2.watch_status().await.context("Watching session status (2nd time)")?; |
| assert_matches!( |
| status2.player_status, |
| Some(PlayerStatus { player_state: Some(PlayerState::Playing), .. }) |
| ); |
| |
| Ok(()) |
| } |
| |
| #[fuchsia::test(logging_tags = ["mediasession_tests"])] |
| async fn player_is_interrupted() -> Result<()> { |
| let mut service = TestService::new().await?; |
| let mut player = TestPlayer::new(&service).await?; |
| |
| player |
| .emit_delta(delta_with_interruption(PlayerState::Playing, InterruptionBehavior::Pause)) |
| .await?; |
| service.dequeue_watcher().await; |
| |
| // We take the watch request from the player's queue and don't answer it, so that |
| // the stream of requests coming in that we match on down below doesn't contain it. |
| let _watch_request = player.requests.try_next().await?; |
| |
| service.start_interruption(AudioRenderUsage::Media).await; |
| player |
| .wait_for_request(|request| matches!(request, PlayerRequest::Pause { .. })) |
| .await |
| .expect("Waiting for player to receive pause"); |
| |
| service.stop_interruption(AudioRenderUsage::Media).await; |
| player |
| .wait_for_request(|request| matches!(request, PlayerRequest::Play { .. })) |
| .await |
| .expect("Waiting for player to receive `Play` command"); |
| |
| Ok(()) |
| } |
| |
| #[fuchsia::test(logging_tags = ["mediasession_tests"])] |
| async fn unenrolled_player_is_not_paused_when_interrupted() -> Result<()> { |
| let mut service = TestService::new().await?; |
| let mut player1 = TestPlayer::new(&service).await?; |
| let mut player2 = TestPlayer::new(&service).await?; |
| |
| player1.emit_delta(delta_with_state(PlayerState::Playing)).await?; |
| player2 |
| .emit_delta(delta_with_interruption(PlayerState::Playing, InterruptionBehavior::Pause)) |
| .await?; |
| service.dequeue_watcher().await; |
| |
| // We take the watch request from the player's queue and don't answer it, so that |
| // the stream of requests coming in that we match on down below doesn't contain it. |
| let _watch_request1 = player1.requests.try_next().await?; |
| let _watch_request2 = player2.requests.try_next().await?; |
| |
| service.start_interruption(AudioRenderUsage::Media).await; |
| player2 |
| .wait_for_request(|request| matches!(request, PlayerRequest::Pause { .. })) |
| .await |
| .expect("Waiting for player to receive pause"); |
| |
| drop(service); |
| let next = player1.requests.try_next().await?; |
| assert!(next.is_none()); |
| |
| Ok(()) |
| } |
| |
| #[fuchsia::test(logging_tags = ["mediasession_tests"])] |
| async fn player_paused_before_interruption_is_not_resumed_by_its_end() -> Result<()> { |
| let mut service = TestService::new().await?; |
| let mut player1 = TestPlayer::new(&service).await?; |
| let mut player2 = TestPlayer::new(&service).await?; |
| |
| player1 |
| .emit_delta(delta_with_interruption(PlayerState::Playing, InterruptionBehavior::Pause)) |
| .await?; |
| player2 |
| .emit_delta(delta_with_interruption(PlayerState::Paused, InterruptionBehavior::Pause)) |
| .await?; |
| service.dequeue_watcher().await; |
| |
| // We take the watch request from the player's queue and don't answer it, so that |
| // the stream of requests coming in that we match on down below doesn't contain it. |
| let _watch_request1 = player1.requests.try_next().await?; |
| let _watch_request2 = player2.requests.try_next().await?; |
| |
| service.start_interruption(AudioRenderUsage::Media).await; |
| player1 |
| .wait_for_request(|request| matches!(request, PlayerRequest::Pause { .. })) |
| .await |
| .expect("Waiting for player to receive pause"); |
| |
| service.stop_interruption(AudioRenderUsage::Media).await; |
| player1 |
| .wait_for_request(|request| matches!(request, PlayerRequest::Play { .. })) |
| .await |
| .expect("Waiting for player to receive play"); |
| |
| drop(service); |
| let next = player2.requests.try_next().await?; |
| assert!(next.is_none()); |
| |
| Ok(()) |
| } |
| |
| #[fuchsia::test(logging_tags = ["mediasession_tests"])] |
| async fn player_paused_during_interruption_is_not_resumed_by_its_end() -> Result<()> { |
| let mut service = TestService::new().await?; |
| let mut player = TestPlayer::new(&service).await?; |
| let (session, session_server) = create_proxy()?; |
| service.discovery.connect_to_session(player.id, session_server)?; |
| |
| player |
| .emit_delta(delta_with_interruption(PlayerState::Playing, InterruptionBehavior::Pause)) |
| .await?; |
| service.dequeue_watcher().await; |
| |
| // We take the watch request from the player's queue and don't answer it, so that |
| // the stream of requests coming in that we match on down below doesn't contain it. |
| let _watch_request = player.requests.try_next().await?; |
| |
| service.start_interruption(AudioRenderUsage::Media).await; |
| player |
| .wait_for_request(|request| matches!(request, PlayerRequest::Pause { .. })) |
| .await |
| .expect("Waiting for player to receive pause"); |
| |
| session.pause()?; |
| player |
| .wait_for_request(|request| matches!(request, PlayerRequest::Pause { .. })) |
| .await |
| .expect("Waiting for player to receive pause"); |
| |
| service.stop_interruption(AudioRenderUsage::Media).await; |
| |
| drop(service); |
| let next = player.requests.try_next().await?; |
| assert!(next.is_none()); |
| |
| Ok(()) |
| } |
| |
| #[fuchsia::test(logging_tags = ["mediasession_tests"])] |
| async fn active_session_initializes_clients_without_player() -> Result<()> { |
| let service = TestService::new().await?; |
| let active_session_discovery = service |
| .realm |
| .root |
| .connect_to_protocol_at_exposed_dir::<ActiveSessionMarker>() |
| .context("Connecting to Active Session service")?; |
| |
| let session = active_session_discovery |
| .watch_active_session() |
| .await |
| .context("Watching the active session")?; |
| assert_matches!(session, None); |
| |
| Ok(()) |
| } |
| |
| #[fuchsia::test(logging_tags = ["mediasession_tests"])] |
| async fn active_session_initializes_clients_with_idle_player() -> Result<()> { |
| let service = TestService::new().await?; |
| let mut player = TestPlayer::new(&service).await?; |
| let mut watcher = service.new_watcher(Default::default())?; |
| let active_session_discovery = service |
| .realm |
| .root |
| .connect_to_protocol_at_exposed_dir::<ActiveSessionMarker>() |
| .context("Connecting to Active Session service")?; |
| |
| player.emit_delta(delta_with_state(PlayerState::Idle)).await?; |
| let _ = watcher.wait_for_n_updates(1).await?; |
| |
| let session = active_session_discovery |
| .watch_active_session() |
| .await |
| .context("Watching the active session")?; |
| assert_matches!(session, None); |
| |
| Ok(()) |
| } |
| |
| #[fuchsia::test(logging_tags = ["mediasession_tests"])] |
| async fn active_session_initializes_clients_with_active_player() -> Result<()> { |
| let service = TestService::new().await?; |
| let mut player = TestPlayer::new(&service).await?; |
| let mut watcher = service.new_watcher(Default::default())?; |
| let active_session_discovery = service |
| .realm |
| .root |
| .connect_to_protocol_at_exposed_dir::<ActiveSessionMarker>() |
| .context("Connecting to Active Session service")?; |
| |
| player.emit_delta(delta_with_state(PlayerState::Playing)).await?; |
| let _ = watcher.wait_for_n_updates(1).await?; |
| |
| // We take the watch request from the player's queue and don't answer it, so that |
| // the stream of requests coming in that we match on down below doesn't contain it. |
| let _watch_request = player.requests.try_next().await?; |
| |
| let session = active_session_discovery |
| .watch_active_session() |
| .await |
| .context("Watching the active session")?; |
| let session = session.expect("Unwrapping active session channel"); |
| let session = session.into_proxy().expect("Creating session proxy"); |
| session.play().context("Sending play command to session")?; |
| |
| player |
| .wait_for_request(|request| matches!(request, PlayerRequest::Play { .. })) |
| .await |
| .expect("Waiting for player to receive play command"); |
| |
| Ok(()) |
| } |
| |
| #[fuchsia::test(logging_tags = ["mediasession_tests"])] |
| async fn active_session_falls_back_when_session_removed() -> Result<()> { |
| let service = TestService::new().await?; |
| let mut watcher = service.new_watcher(Default::default())?; |
| let active_session_discovery = service |
| .realm |
| .root |
| .connect_to_protocol_at_exposed_dir::<ActiveSessionMarker>() |
| .context("Connecting to Active Session service")?; |
| |
| let mut player1 = TestPlayer::new(&service).await?; |
| let mut player2 = TestPlayer::new(&service).await?; |
| |
| let session = |
| active_session_discovery.watch_active_session().await.context("Syncing active session")?; |
| assert!(session.is_none()); |
| |
| player1.emit_delta(delta_with_state(PlayerState::Playing)).await?; |
| let _ = watcher.wait_for_n_updates(1).await?; |
| |
| player2.emit_delta(delta_with_state(PlayerState::Playing)).await?; |
| let _ = watcher.wait_for_n_updates(1).await?; |
| |
| player1.emit_delta(delta_with_state(PlayerState::Paused)).await?; |
| let _ = watcher.wait_for_n_updates(1).await?; |
| |
| let session = active_session_discovery |
| .watch_active_session() |
| .await |
| .context("Watching active session 1st time")?; |
| let _session = session.expect("Unwrapping active session channel"); |
| |
| drop(player2); |
| let _ = watcher.wait_for_removal().await?; |
| |
| let session = active_session_discovery |
| .watch_active_session() |
| .await |
| .context("Watching the active session 2nd time")?; |
| let session = session.expect("Unwrapping active session channel 2nd time"); |
| let session = session.into_proxy().expect("Creating session proxy 2nd time"); |
| |
| let info_delta = session.watch_status().await.expect("Watching session status"); |
| assert_eq!( |
| info_delta.player_status.and_then(|status| status.player_state), |
| Some(PlayerState::Paused) |
| ); |
| |
| drop(player1); |
| let _ = watcher.wait_for_removal().await?; |
| assert!(session.watch_status().await.is_err()); |
| |
| Ok(()) |
| } |
| |
| #[fuchsia::test(logging_tags = ["mediasession_tests"])] |
| async fn inspect_tree_correct() -> Result<()> { |
| let mut service = TestService::new().await?; |
| let player1 = TestPlayer::new(&service).await?; |
| let player2 = TestPlayer::new(&service).await?; |
| let ids = vec![format!("{}", player1.id), format!("{}", player2.id)]; |
| |
| let hierarchy = service.inspect_tree().await; |
| assert_eq!(hierarchy.children.len(), 1); |
| let players = &hierarchy.children[0]; |
| assert_eq!(players.name, "players"); |
| assert_eq!(players.children.len(), 2); |
| assert!(ids.contains(&players.children[0].name)); |
| assert!(ids.contains(&players.children[1].name)); |
| |
| Ok(()) |
| } |