blob: 27de4efc6bc5fb2683d3bf959aba3e462053e25f [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
mod element_manager_server;
mod event_handler;
#[cfg(test)]
mod testing_utils;
use {
anyhow::{Context as _, Error},
element_management::{Element, ElementManager, ElementManagerError},
event_handler::{ElementEventHandler, EventHandler},
fidl_fuchsia_session::{
ElementControllerMarker, ElementControllerRequestStream, ElementSpec, ProposeElementError,
},
futures::{
channel::mpsc::{self, UnboundedReceiver, UnboundedSender},
StreamExt,
},
rand::{distributions::Alphanumeric, thread_rng, Rng},
std::rc::Rc,
};
pub use element_manager_server::ElementManagerServer;
/// The child collection to add elements to. This must match a collection name declared in
/// this session's CML file.
const ELEMENT_COLLECTION_NAME: &str = "elements";
/// A Struct which manages the running elements.ExposedServices
///
/// The ElementRepository provides a single location for managing elements while
/// allowing multiple connections to be served at one time.
///
/// The repository can spawn many servers which serve the fuchsia.session.ElementManager
/// protocol. Each of these servers can then handle multiple connections.
pub struct ElementRepository<T: ElementManager> {
element_manager: Rc<T>,
receiver: UnboundedReceiver<ElementEvent>,
sender: UnboundedSender<ElementEvent>,
}
impl<T: ElementManager> ElementRepository<T> {
/// Creates a new instance of the element respository
///
/// The ElementRepository is created with Rc<ElementManager> to allow the
/// element manager to be shared by the resulting servers in a lockless manner.
///
/// The type erasure works in conjunction with the stateless nature of the
/// ElementManager to be able to safely work with many servers.
pub fn new(element_manager: Rc<T>) -> ElementRepository<T> {
let (sender, receiver) = mpsc::unbounded();
ElementRepository { element_manager, sender, receiver }
}
/// Starts the event loop which handles incoming events that are handled by the servers
pub async fn run(&mut self) -> Result<(), Error> {
self.run_with_handler(&mut ElementEventHandler::new()).await;
Ok(())
}
/// Runs the repository with a given handler.
///
/// The handler is responsible for responding to incoming events and processing them.
/// A single handler is used for one repository allowing it to be safely mutated.
async fn run_with_handler<'a>(&mut self, handler: &'a mut impl EventHandler) {
while let Some(event) = self.receiver.next().await {
match event {
ElementEvent::ElementAdded { element, element_controller_stream } => {
handler.add_element(element, element_controller_stream);
}
ElementEvent::Shutdown => {
handler.shutdown();
break;
}
}
}
}
/// Creates a new ElementManagerServer suitable for handling incoming connections.
///
/// A repository can handle many servers. All servers will send events to the repository's
/// event handling system.
pub fn make_server(&self) -> ElementManagerServer<T> {
ElementManagerServer::new(self.element_manager.clone(), self.sender.clone())
}
/// Stops the event loop handling incoming requests. After this method is called no more events
/// will be processed
pub fn shutdown(&self) -> Result<(), Error> {
Ok(self
.sender
.unbounded_send(ElementEvent::Shutdown)
.context("Unable to send Shutdown message")?)
}
}
/// Encapsulates the logic for interacting with the ElementManager::launch_element method.
///
/// This method is located in this file instead of the ElementManagerServer so that elements
/// can be launched directly from the repository.
pub(crate) async fn propose_element<T: ElementManager>(
element_manager: &Rc<T>,
spec: ElementSpec,
element_controller: Option<fidl::endpoints::ServerEnd<ElementControllerMarker>>,
) -> Result<(Element, Option<ElementControllerRequestStream>), ProposeElementError> {
let mut child_name: String = thread_rng().sample_iter(&Alphanumeric).take(16).collect();
child_name.make_ascii_lowercase();
let proposal_result =
element_manager.launch_element(spec, &child_name, ELEMENT_COLLECTION_NAME).await;
map_launch_element_result(proposal_result, element_controller)
}
fn map_launch_element_result(
result: Result<Element, ElementManagerError>,
element_controller: Option<fidl::endpoints::ServerEnd<ElementControllerMarker>>,
) -> Result<(Element, Option<ElementControllerRequestStream>), ProposeElementError> {
match result {
Ok(element) => match element_controller {
Some(element_controller) => match element_controller.into_stream() {
Ok(stream) => Ok((element, Some(stream))),
Err(_) => Err(ProposeElementError::Rejected),
},
None => Ok((element, None)),
},
Err(ElementManagerError::UrlMissing { .. }) => Err(ProposeElementError::NotFound),
Err(ElementManagerError::NotCreated { .. }) => Err(ProposeElementError::Rejected),
Err(ElementManagerError::NotBound { .. }) => Err(ProposeElementError::Rejected),
Err(ElementManagerError::NotLaunched { .. }) => Err(ProposeElementError::Rejected),
}
}
/// An enum representing events that the single repository instance will listen to.
pub(crate) enum ElementEvent {
/// Indicates that an element has been added to the session.
ElementAdded {
element: Element,
element_controller_stream: Option<ElementControllerRequestStream>,
},
/// Tells the repository it should shutdown and perform any cleanup.
Shutdown,
}
impl std::fmt::Debug for ElementEvent {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ElementEvent::ElementAdded { .. } => write!(f, "ElementAdded"),
ElementEvent::Shutdown => write!(f, "Shutdown"),
}
}
}
#[cfg(test)]
mod tests {
use {
super::testing_utils::{init_logger, make_mock_element, CallCountEventHandler},
super::*,
fidl::endpoints::create_proxy,
fidl_fuchsia_component as fcomponent,
fidl_fuchsia_session::ElementControllerMarker,
fuchsia_async as fasync,
};
#[fasync::run_singlethreaded(test)]
async fn shutdown_sends_shutdown_message() {
init_logger();
let mut repo = ElementRepository::new_for_test();
assert!(repo.shutdown().is_ok());
let result = repo.receiver.next().await;
match result.unwrap() {
ElementEvent::Shutdown => (),
_ => assert!(false),
};
}
#[fasync::run_singlethreaded(test)]
async fn shutdown_event_forwards_to_handler_and_ends_loop() {
init_logger();
let mut repo = ElementRepository::new_for_test();
let mut handler = CallCountEventHandler::default();
let sender = repo.sender.clone();
fasync::spawn_local(async move {
sender.unbounded_send(ElementEvent::Shutdown).expect("failed to send event");
});
repo.run_with_handler(&mut handler).await;
assert_eq!(handler.shutdown_call_count, 1);
}
#[fasync::run_singlethreaded(test)]
async fn element_added_event_forwards_to_handler() {
init_logger();
let mut repo = ElementRepository::new_for_test();
let mut handler = CallCountEventHandler::default();
let (element, _channel) = make_mock_element();
let sender = repo.sender.clone();
fasync::spawn_local(async move {
sender
.unbounded_send(ElementEvent::ElementAdded {
element,
element_controller_stream: None,
})
.expect("failed to send added event");
// need to shut down the handler
sender.unbounded_send(ElementEvent::Shutdown).expect("failed to send event");
});
repo.run_with_handler(&mut handler).await;
assert_eq!(handler.add_call_count, 1);
}
#[fasync::run_singlethreaded(test)]
async fn map_launch_element_result_url_missing() {
init_logger();
let error = ElementManagerError::url_missing("", "");
let result = map_launch_element_result(Err(error), None);
match result {
Err(ProposeElementError::NotFound) => (),
_ => panic!("wrong error returned"),
}
}
#[fasync::run_singlethreaded(test)]
async fn map_launch_element_result_not_created() {
init_logger();
let error = ElementManagerError::not_created("", "", "", fcomponent::Error::Internal);
let result = map_launch_element_result(Err(error), None);
match result {
Err(ProposeElementError::Rejected) => (),
_ => panic!("wrong error returned"),
}
}
#[fasync::run_singlethreaded(test)]
async fn map_launch_element_result_not_bound() {
init_logger();
let error = ElementManagerError::not_bound("", "", "", fcomponent::Error::Internal);
let result = map_launch_element_result(Err(error), None);
match result {
Err(ProposeElementError::Rejected) => (),
_ => panic!("wrong error returned"),
}
}
#[fasync::run_singlethreaded(test)]
async fn map_launch_element_result_not_launched() {
init_logger();
let error = ElementManagerError::not_launched("", "");
let result = map_launch_element_result(Err(error), None);
match result {
Err(ProposeElementError::Rejected) => (),
_ => panic!("wrong error returned"),
}
}
#[fasync::run_singlethreaded(test)]
async fn map_launch_element_result_just_element() {
init_logger();
let (element, _channel) = make_mock_element();
let result = map_launch_element_result(Ok(element), None);
match result {
Ok((_, None)) => (),
_ => panic!("should have gotten (element, None)"),
}
}
#[fasync::run_singlethreaded(test)]
async fn map_launch_element_result_element_and_controlller() {
init_logger();
let (_controller_proxy, server_end) =
create_proxy::<ElementControllerMarker>().expect("failed to create endpoints");
let (element, _channel) = make_mock_element();
let result = map_launch_element_result(Ok(element), Some(server_end));
match result {
Ok((_, Some(_))) => (),
_ => panic!("should have gotten (element, Some)"),
}
}
}