blob: 331b4d90d5ec82643d99254431ff925e802dfe4e [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use super::*;
use anyhow::Context as _;
use fidl_fuchsia_factory_lowpan::{
FactoryDriverMarker, FactoryDriverProxy, FactoryLookupRequest, FactoryLookupRequestStream,
FactoryRegisterRequest, FactoryRegisterRequestStream,
};
use fidl_fuchsia_lowpan_device::{
DeviceChanges, DriverMarker, DriverProxy, LookupRequest, LookupRequestStream, RegisterRequest,
RegisterRequestStream, ServiceError, MAX_LOWPAN_DEVICES,
};
use fuchsia_syslog::macros::*;
use futures::prelude::*;
use futures::task::{Spawn, SpawnExt};
use lowpan_driver_common::AsyncCondition;
use parking_lot::Mutex;
use regex::Regex;
use std::collections::HashMap;
use std::sync::Arc;
lazy_static::lazy_static! {
static ref DEVICE_NAME_REGEX: Regex = Regex::new("^[a-z_][-_.+0-9a-z]{1,31}$")
.expect("Device name regex failed to compile");
}
pub struct LowpanService<S> {
pub devices: Arc<Mutex<HashMap<String, DriverProxy>>>,
pub devices_factory: Arc<Mutex<HashMap<String, FactoryDriverProxy>>>,
pub added_removed_cond: Arc<AsyncCondition>,
pub spawner: S,
}
impl<S: Spawn> LowpanService<S> {
pub fn with_spawner(spawner: S) -> LowpanService<S> {
LowpanService {
devices: Default::default(),
devices_factory: Default::default(),
added_removed_cond: Default::default(),
spawner,
}
}
}
impl<S> LowpanService<S> {
pub fn lookup(&self, name: &str) -> Result<DriverProxy, ServiceError> {
let devices = self.devices.lock();
if let Some(device) = devices.get(name) {
Ok(device.clone())
} else {
Err(ServiceError::DeviceNotFound)
}
}
pub fn get_devices(&self) -> Vec<String> {
let devices = self.devices.lock();
devices.keys().into_iter().map(|x| x.to_string()).collect()
}
}
impl<S: Spawn> LowpanService<S> {
pub fn register(
&self,
name: &str,
driver: fidl::endpoints::ClientEnd<DriverMarker>,
) -> Result<(), ServiceError> {
let driver = driver.into_proxy().map_err(|_| ServiceError::InvalidArgument)?;
if !DEVICE_NAME_REGEX.is_match(name) {
fx_log_err!("Attempted to register LoWPAN device with invalid name {:?}", name);
return Err(ServiceError::InvalidInterfaceName);
}
let name = name.to_string();
{
// Lock the device list.
let mut devices = self.devices.lock();
// Check to make sure there already aren't too many devices.
if devices.len() >= MAX_LOWPAN_DEVICES as usize {
return Err(ServiceError::TooManyDevices);
}
// Check for existing devices with the same name.
if devices.contains_key(&name) {
return Err(ServiceError::DeviceAlreadyExists);
}
// Insert the new device into the list.
devices.insert(name.clone(), driver.clone());
}
// Indicate that a new device was added.
self.added_removed_cond.trigger();
let devices = self.devices.clone();
let devices_factory = self.devices_factory.clone();
let added_removed_cond = self.added_removed_cond.clone();
// The following code provides a way to automatically
// remove a device when the connection to the LoWPAN Driver
// is lost.
let cleanup_task = driver
.take_event_stream()
.for_each(|_| futures::future::ready(()))
.inspect(move |_: &()| {
fx_log_info!("Removing device {:?}", &name);
devices.lock().remove(&name);
devices_factory.lock().remove(&name);
// Indicate that the device was removed.
added_removed_cond.trigger();
});
self.spawner.spawn(cleanup_task).expect("Unable to spawn cleanup task");
Ok(())
}
}
#[async_trait::async_trait()]
impl<S: Sync> ServeTo<LookupRequestStream> for LowpanService<S> {
async fn serve_to(&self, request_stream: LookupRequestStream) -> anyhow::Result<()> {
use futures::lock::Mutex;
let last_device_list: Mutex<Option<Vec<String>>> = Mutex::new(None);
request_stream
.err_into::<Error>()
.try_for_each_concurrent(MAX_CONCURRENT, |command| async {
match command {
LookupRequest::LookupDevice { name, protocols, responder } => {
fx_log_info!("Received lookup request for {:?}", name);
let mut ret = self.lookup(&name).and_then(|dev| {
dev.get_protocols(protocols).map_err(|_| ServiceError::DeviceNotFound)
});
responder.send(&mut ret)?;
fx_log_info!("Responded to lookup request {:?}", name);
}
LookupRequest::GetDevices { responder } => {
fx_log_info!("Received get devices request");
responder
.send(&mut self.get_devices().iter().map(|s| &**s))
.context("error sending response")?;
fx_log_info!("Responded to get devices request");
}
LookupRequest::WatchDevices { responder } => {
let mut locked_device_list =
last_device_list.try_lock().ok_or(format_err!(
"No more than 1 outstanding call to watch_devices is allowed"
))?;
if locked_device_list.is_none() {
// This is the first call to WatchDevices,
// so we return the whole list.
*locked_device_list = Some(self.get_devices());
let mut device_changes = DeviceChanges {
added: locked_device_list.clone().unwrap(),
removed: vec![],
};
responder
.send(&mut device_changes)
.context("error sending response")?;
} else {
// This is a follow-up call.
let current_devices = loop {
let wait = self.added_removed_cond.wait();
let current_devices = self.get_devices();
// Note that this should work even though the returned
// list of interfaces isn't sorted. As long as the
// keys aren't intentionally shuffled when nothing
// has changed then this check should work just fine.
if current_devices != *locked_device_list.as_ref().unwrap() {
break current_devices;
}
// We wait here for something to change.
wait.await;
};
// Devices have been added or removed, let's sort them out.
let mut device_changes = DeviceChanges {
// Calculate devices added.
// This mechanism is O(n^2), but in reality n is going to
// almost always be 1---so it makes sense to prioritize
// convenience. It may even be slower to try to optimize
// this.
added: current_devices
.iter()
.filter_map(|name| {
if !locked_device_list.as_ref().unwrap().contains(name) {
Some(name.clone())
} else {
None
}
})
.collect(),
// Calculate devices removed.
// This mechanism is O(n^2), but in reality n is going to
// almost always be 1---so it makes sense to prioritize
// convenience. It may even be slower to try to optimize
// this.
removed: locked_device_list
.as_ref()
.unwrap()
.iter()
.filter_map(|name| {
if !current_devices.contains(name) {
Some(name.clone())
} else {
None
}
})
.collect(),
};
// Save our current list of devices so that we
// can use it the next time this method is called.
*locked_device_list = Some(current_devices);
responder
.send(&mut device_changes)
.context("error sending response")?;
}
}
}
Result::<(), anyhow::Error>::Ok(())
})
.inspect_err(|e| fx_log_err!("{:?}", e))
.await
}
}
#[async_trait::async_trait()]
impl<S: Spawn + Sync> ServeTo<RegisterRequestStream> for LowpanService<S> {
async fn serve_to(&self, request_stream: RegisterRequestStream) -> anyhow::Result<()> {
request_stream
.err_into::<Error>()
.try_for_each_concurrent(MAX_CONCURRENT, |command| async {
match command {
RegisterRequest::RegisterDevice { name, driver, responder } => {
fx_log_info!("Received register request for {:?}", name);
let mut response = self.register(&name, driver);
responder
.send(&mut response)
.context("error sending response to register request")?;
fx_log_info!("Responded to register request {:?}", name);
}
}
Result::<(), anyhow::Error>::Ok(())
})
.inspect_err(|e| fx_log_err!("{:?}", e))
.await
}
}
mod factory {
use super::*;
impl<S: Spawn + Sync> LowpanService<S> {
pub fn lookup_factory(&self, name: &str) -> Result<FactoryDriverProxy, ServiceError> {
let devices = self.devices_factory.lock();
if let Some(device) = devices.get(name) {
Ok(device.clone())
} else {
Err(ServiceError::DeviceNotFound)
}
}
pub fn register_factory(
&self,
name: &str,
driver: fidl::endpoints::ClientEnd<FactoryDriverMarker>,
) -> Result<(), ServiceError> {
let driver = driver.into_proxy().map_err(|_| ServiceError::InvalidArgument)?;
if !DEVICE_NAME_REGEX.is_match(name) {
fx_log_err!("Attempted to register LoWPAN device with invalid name {:?}", name);
return Err(ServiceError::InvalidInterfaceName);
}
let name = name.to_string();
// Lock the device list.
let mut devices = self.devices_factory.lock();
// Check to make sure there already aren't too many devices.
if devices.len() >= MAX_LOWPAN_DEVICES as usize {
return Err(ServiceError::TooManyDevices);
}
// Check for existing devices with the same name.
if devices.contains_key(&name) {
return Err(ServiceError::DeviceAlreadyExists);
}
// Insert the new device into the list.
devices.insert(name.clone(), driver.clone());
Ok(())
}
}
#[async_trait::async_trait()]
impl<S: Spawn + Sync> ServeTo<FactoryLookupRequestStream> for LowpanService<S> {
async fn serve_to(&self, request_stream: FactoryLookupRequestStream) -> anyhow::Result<()> {
request_stream
.err_into::<Error>()
.try_for_each_concurrent(MAX_CONCURRENT, |command| async {
match command {
FactoryLookupRequest::Lookup { name, device_factory, responder } => {
fx_log_info!("Received lookup factory request for {:?}", name);
let mut ret = self.lookup_factory(&name).and_then(|dev| {
dev.get_factory_device(device_factory)
.map_err(|_| ServiceError::DeviceNotFound)
});
responder.send(&mut ret)?;
fx_log_info!("Responded to lookup factory request {:?}", name);
}
}
Result::<(), anyhow::Error>::Ok(())
})
.inspect_err(|e| fx_log_err!("{:?}", e))
.await
}
}
#[async_trait::async_trait()]
impl<S: Spawn + Sync> ServeTo<FactoryRegisterRequestStream> for LowpanService<S> {
async fn serve_to(
&self,
request_stream: FactoryRegisterRequestStream,
) -> anyhow::Result<()> {
request_stream
.err_into::<Error>()
.try_for_each_concurrent(MAX_CONCURRENT, |command| async {
match command {
FactoryRegisterRequest::Register { name, driver, responder } => {
fx_log_info!("Received register factory request for {:?}", name);
let mut response = self.register_factory(&name, driver);
responder
.send(&mut response)
.context("error sending response to register factory request")?;
fx_log_info!("Responded to register factory request {:?}", name);
}
}
Result::<(), anyhow::Error>::Ok(())
})
.inspect_err(|e| fx_log_err!("{:?}", e))
.await
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use fidl::endpoints::create_endpoints;
use fuchsia_async as fasync;
#[fasync::run_until_stalled(test)]
async fn test_interface_name_check() {
let service = LowpanService::with_spawner(FuchsiaGlobalExecutor);
let (client_ep, _) = create_endpoints::<DriverMarker>().unwrap();
assert_eq!(service.register("lowpan0", client_ep), Ok(()));
let (client_ep, _) = create_endpoints::<DriverMarker>().unwrap();
assert_eq!(
service.register("low pan 0", client_ep),
Err(ServiceError::InvalidInterfaceName)
);
let (client_ep, _) = create_endpoints::<DriverMarker>().unwrap();
assert_eq!(service.register("0lowpan", client_ep), Err(ServiceError::InvalidInterfaceName));
let (client_ep, _) = create_endpoints::<DriverMarker>().unwrap();
assert_eq!(service.register("l", client_ep), Err(ServiceError::InvalidInterfaceName));
}
#[fasync::run_until_stalled(test)]
async fn test_factory_interface() {
let service = LowpanService::with_spawner(FuchsiaGlobalExecutor);
let (client_ep, _) = create_endpoints::<FactoryDriverMarker>().unwrap();
assert_eq!(service.register_factory("lowpan0", client_ep), Ok(()));
}
#[fasync::run_until_stalled(test)]
async fn test_interface_added_notifications() {
let service = LowpanService::with_spawner(FuchsiaGlobalExecutor);
let waiter = service.added_removed_cond.wait();
let (driver_client_ep, _) = create_endpoints::<DriverMarker>().unwrap();
assert_eq!(service.register("lowpan0", driver_client_ep), Ok(()));
waiter.await;
}
#[fasync::run_singlethreaded(test)]
async fn test_interface_removed_notifications() {
let service = LowpanService::with_spawner(FuchsiaGlobalExecutor);
let mut waiter = service.added_removed_cond.wait();
{
let (driver_client_ep, _) = create_endpoints::<DriverMarker>().unwrap();
assert_eq!(service.register("lowpan0", driver_client_ep), Ok(()));
waiter.await;
waiter = service.added_removed_cond.wait();
}
waiter.await;
}
}