blob: cd2886203ffe0cd1520c615bc2074de18eb4c751 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use failure::{Error, ResultExt};
use fidl::endpoints::{create_endpoints, ClientEnd};
use fidl_fuchsia_mediaplayer::TimelineFunction;
use fidl_fuchsia_mediasession::{
ActiveSession, ControllerControlHandle, ControllerEvent, ControllerMarker, ControllerRequest,
ControllerRequestStream, ControllerRegistryEvent, ControllerRegistryEventStream,
ControllerRegistryMarker, ControllerRegistryProxy, PlaybackState, PlaybackStatus, PublisherMarker,
PublisherProxy, RepeatMode,
};
use fuchsia_app as app;
use fuchsia_async as fasync;
use futures::stream::{FusedStream, TryStreamExt};
use std::collections::HashMap;
const MEDIASESSION_URL: &str = "fuchsia-pkg://fuchsia.com/mediasession#meta/mediasession.cmx";
fn default_playback_status() -> PlaybackStatus {
PlaybackStatus {
duration: 100,
playback_state: PlaybackState::Playing,
playback_function: TimelineFunction {
subject_time: 0,
reference_time: 0,
subject_delta: 1,
reference_delta: 1,
},
repeat_mode: RepeatMode::Off,
shuffle_on: false,
has_next_item: true,
has_prev_item: false,
error: None,
}
}
struct TestSession {
request_stream: ControllerRequestStream,
control_handle: ControllerControlHandle,
client_end: ClientEnd<ControllerMarker>,
}
impl TestSession {
fn new() -> Result<Self, Error> {
let (client_end, server_end) =
create_endpoints::<ControllerMarker>().expect("Fidl endpoints.");
let (request_stream, control_handle) = server_end
.into_stream_and_control_handle()
.context("Unpacking Controller server end.")?;
Ok(Self {
request_stream,
control_handle,
client_end,
})
}
}
struct TestService {
// This needs to stay alive to keep the service running.
#[allow(unused)]
app: app::client::App,
publisher: PublisherProxy,
controller_registry: ControllerRegistryProxy,
active_session_changes: ControllerRegistryEventStream,
}
impl TestService {
fn new() -> Result<Self, Error> {
let launcher = app::client::Launcher::new().context("Creating launcher.")?;
let mediasession = launcher
.launch(String::from(MEDIASESSION_URL), None)
.context("Launching mediasession.")?;
let publisher = mediasession
.connect_to_service(PublisherMarker)
.context("Connecting to Publisher.")?;
let controller_registry = mediasession
.connect_to_service(ControllerRegistryMarker)
.context("Connecting to ControllerRegistry.")?;
let active_session_changes = controller_registry.take_event_stream();
Ok(Self {
app: mediasession,
publisher,
controller_registry,
active_session_changes,
})
}
async fn expect_active_session(&mut self, expected: Option<u64>) {
assert!(!self.active_session_changes.is_terminated());
let ControllerRegistryEvent::OnActiveSession {
active_session: actual,
} = await!(self.active_session_changes.try_next())
.expect("Reported active session.")
.expect("Active session stream");
assert_eq!(
actual,
expected.map(|session_id| Box::new(ActiveSession { session_id }))
);
}
}
#[fasync::run_singlethreaded]
#[test]
async fn service_reports_no_active_session() {
let mut test_service = TestService::new().expect("Test service.");
await!(test_service.expect_active_session(None));
}
#[fasync::run_singlethreaded]
#[test]
async fn service_routes_controls() {
let test_service = TestService::new().expect("Test service.");
// Creates a new session and publishes it. Returns the proxy through Media
// Session service and the request stream on the backend.
let new_session = || {
async {
let test_session = TestSession::new().expect("Test session.");
let session_id = await!(test_service.publisher.publish(test_session.client_end))
.expect("Session id.");
let (client_end, server_end) =
create_endpoints::<ControllerMarker>().expect("Controller endpoints.");
test_service
.controller_registry
.connect_to_controller_by_id(session_id, server_end)
.expect("To connect to session.");
let proxy = client_end.into_proxy().expect("Controller a proxy.");
(proxy, test_session.request_stream)
}
};
let (proxy_a, mut request_stream_a) = await!(new_session());
let (proxy_b, mut request_stream_b) = await!(new_session());
proxy_a.play().expect("To call Play() on Session a.");
proxy_b.pause().expect("To call Pause() on Session b.");
let a_event = await!(request_stream_a.try_next()).expect("Next request from session a.");
let b_event = await!(request_stream_b.try_next()).expect("Next request from session b.");
assert!(match a_event {
Some(ControllerRequest::Play { .. }) => true,
_ => false,
},);
assert!(match b_event {
Some(ControllerRequest::Pause { .. }) => true,
_ => false,
},);
// Ensure the behaviour continues.
proxy_b.play().expect("To call Play() on Session b.");
let b_event = await!(request_stream_b.try_next()).expect("Next request from session b.");
assert!(match b_event {
Some(ControllerRequest::Play { .. }) => true,
_ => false,
},);
}
#[fasync::run_singlethreaded]
#[test]
async fn service_reports_published_active_session() {
let mut test_service = TestService::new().expect("Test service.");
await!(test_service.expect_active_session(None));
let test_session = TestSession::new().expect("Test session.");
let our_session_id =
await!(test_service.publisher.publish(test_session.client_end)).expect("Session id.");
test_session
.control_handle
.send_on_playback_status_changed(&mut default_playback_status())
.expect("To update playback status.");
await!(test_service.expect_active_session(Some(our_session_id)));
}
#[fasync::run_singlethreaded]
#[test]
async fn service_reports_changed_active_session() {
let mut test_service = TestService::new().expect("Test service.");
await!(test_service.expect_active_session(None));
// Publish sessions.
let session_count: usize = 100;
let mut keep_alive = Vec::new();
for i in 0..session_count {
let test_session = TestSession::new().expect(&format!("Test session {}.", i));
let session_id = await!(test_service.publisher.publish(test_session.client_end))
.expect(&format!("Session {}", i));
test_session
.control_handle
.send_on_playback_status_changed(&mut default_playback_status())
.expect("To update playback status.");
await!(test_service.expect_active_session(Some(session_id)));
keep_alive.push(test_session.control_handle);
}
}
#[fasync::run_singlethreaded]
#[test]
async fn service_broadcasts_events() {
let mut test_service = TestService::new().expect("Test service.");
await!(test_service.expect_active_session(None));
let test_session = TestSession::new().expect("Test session.");
let session_id = await!(test_service.publisher.publish(test_session.client_end))
.expect(&format!("To publish session."));
// Send a single event.
test_session
.control_handle
.send_on_playback_status_changed(&mut default_playback_status())
.expect("To update playback status.");
// Ensure we wait for the service to accept the session.
await!(test_service.expect_active_session(Some(session_id)));
// Connect many clients and ensure they all receive the event.
let client_count: usize = 100;
for _ in 0..client_count {
let (client_end, server_end) =
create_endpoints::<ControllerMarker>().expect("Controller endpoints.");
test_service
.controller_registry
.connect_to_controller_by_id(session_id, server_end)
.expect("To connect to session.");
let mut event_stream = client_end
.into_proxy()
.expect("Controller proxy")
.take_event_stream();
let event = await!(event_stream.try_next()).expect("Next Controller event.");
assert_eq!(
event.and_then(|event| match event {
ControllerEvent::OnPlaybackStatusChanged { playback_status } => {
Some(playback_status)
}
_ => None,
}),
Some(default_playback_status())
);
}
}
#[fasync::run_singlethreaded]
#[test]
async fn service_correctly_tracks_session_ids_states_and_lifetimes() {
let test_service = TestService::new().expect("Test service.");
// Publish 100 sessions and have each of them post a playback status.
let count = 100;
let mut test_sessions = Vec::new();
let numbered_playback_status = |i| PlaybackStatus {
duration: i,
..default_playback_status()
};
for i in 0..count {
let test_session = TestSession::new().expect(&format!("Test session {}.", i));
let session_id = await!(test_service.publisher.publish(test_session.client_end))
.expect(&format!("To publish test session {}.", i));
test_session
.control_handle
.send_on_playback_status_changed(&mut numbered_playback_status(i as i64))
.expect(&format!("To broadcast playback status {}.", i));
test_sessions.push((session_id, test_session.control_handle));
}
enum Expectation {
SessionIsDropped,
SessionReportsPlaybackStatus(PlaybackStatus),
}
// Set up expectations.
let mut expectations = HashMap::new();
let mut control_handles_to_keep_sessions_alive = Vec::new();
for (i, (session_id, control_handle)) in test_sessions.into_iter().enumerate() {
let should_drop = i % 3 == 0;
expectations.insert(
session_id,
if should_drop {
control_handle.shutdown();
Expectation::SessionIsDropped
} else {
control_handles_to_keep_sessions_alive.push(control_handle);
Expectation::SessionReportsPlaybackStatus(numbered_playback_status(i as i64))
},
);
}
// Check all expectations.
for (session_id, expectation) in expectations.into_iter() {
let (client_end, server_end) =
create_endpoints::<ControllerMarker>().expect("Fidl endpoints.");
test_service
.controller_registry
.connect_to_controller_by_id(session_id, server_end)
.expect(&format!(
"To make connection request to session {}",
session_id
));
let mut event_stream = client_end
.into_proxy()
.expect(&format!("Controller proxy for session {}.", session_id))
.take_event_stream();
let maybe_event = await!(event_stream.try_next()).expect("Next session event.");
match expectation {
Expectation::SessionIsDropped => {
// If we shutdown the session, this or the next event should be
// None depending on whether our shutdown reached the service
// before this request.
if maybe_event.is_some() {
let next_event = await!(event_stream.try_next()).expect("Next session event.");
assert!(next_event.is_none(), "{:?}", next_event)
}
}
Expectation::SessionReportsPlaybackStatus(expected) => match maybe_event {
Some(ControllerEvent::OnPlaybackStatusChanged {
playback_status: actual,
}) => assert_eq!(actual, expected),
other => panic!("Expected a playback status event; got: {:?}", other),
},
}
}
}