blob: 9aeb5736012d4971485884a392e350bf486733cb [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use super::*;
use anyhow::Context as _;
use fidl::endpoints::{ControlHandle as _, RequestStream as _};
use fidl_fuchsia_factory_lowpan::{
FactoryDriverMarker, FactoryDriverProxy, FactoryLookupRequest, FactoryLookupRequestStream,
FactoryRegisterRequest, FactoryRegisterRequestStream,
};
use fidl_fuchsia_lowpan_driver::{DriverMarker, DriverProxy, Protocols, RegisterRequest};
use fuchsia_sync::Mutex;
use futures::prelude::*;
use futures::task::{Spawn, SpawnExt};
use lowpan_driver_common::lowpan_fidl::MAX_LOWPAN_DEVICES;
use lowpan_driver_common::{AsyncCondition, ZxStatus};
use regex::Regex;
use std::collections::HashMap;
lazy_static::lazy_static! {
static ref DEVICE_NAME_REGEX: Regex = Regex::new("^[a-z_][-_.+0-9a-z]{1,31}$")
.expect("Device name regex failed to compile");
}
pub struct LowpanService<S> {
pub devices: Arc<Mutex<HashMap<String, DriverProxy>>>,
pub devices_factory: Arc<Mutex<HashMap<String, FactoryDriverProxy>>>,
pub added_removed_cond: Arc<AsyncCondition>,
pub spawner: S,
}
impl<S: Spawn> LowpanService<S> {
pub fn with_spawner(spawner: S) -> LowpanService<S> {
LowpanService {
devices: Default::default(),
devices_factory: Default::default(),
added_removed_cond: Default::default(),
spawner,
}
}
}
impl<S> LowpanService<S> {
pub fn lookup(&self, name: &str) -> Result<DriverProxy, ZxStatus> {
let devices = self.devices.lock();
if let Some(device) = devices.get(name) {
Ok(device.clone())
} else {
Err(ZxStatus::NOT_FOUND)
}
}
pub fn get_devices(&self) -> Vec<String> {
let devices = self.devices.lock();
devices.keys().into_iter().map(|x| x.to_string()).collect()
}
}
impl<S: Spawn> LowpanService<S> {
pub fn register(
&self,
name: &str,
driver: fidl::endpoints::ClientEnd<DriverMarker>,
) -> Result<(), ZxStatus> {
let driver = driver.into_proxy().map_err(|_| ZxStatus::INVALID_ARGS)?;
if !DEVICE_NAME_REGEX.is_match(name) {
error!("Attempted to register LoWPAN device with invalid name {:?}", name);
return Err(ZxStatus::INVALID_ARGS);
}
let name = name.to_string();
{
// Lock the device list.
let mut devices = self.devices.lock();
// Check to make sure there already aren't too many devices.
if devices.len() >= MAX_LOWPAN_DEVICES as usize {
return Err(ZxStatus::NO_RESOURCES);
}
// Check for existing devices with the same name.
if devices.contains_key(&name) {
return Err(ZxStatus::ALREADY_EXISTS);
}
// Insert the new device into the list.
devices.insert(name.clone(), driver.clone());
}
// Indicate that a new device was added.
self.added_removed_cond.trigger();
let devices = self.devices.clone();
let devices_factory = self.devices_factory.clone();
let added_removed_cond = self.added_removed_cond.clone();
// The following code provides a way to automatically
// remove a device when the connection to the LoWPAN Driver
// is lost.
let cleanup_task = driver
.take_event_stream()
.for_each(|_| futures::future::ready(()))
.inspect(move |_: &()| {
info!("Removing device {:?}", &name);
devices.lock().remove(&name);
devices_factory.lock().remove(&name);
// Indicate that the device was removed.
added_removed_cond.trigger();
});
self.spawner.spawn(cleanup_task).expect("Unable to spawn cleanup task");
Ok(())
}
}
macro_rules! impl_serve_to_driver {
($request_stream:ty, $request:ident, $protocol_member:ident) => {
#[async_trait::async_trait()]
impl<S: Sync> ServeTo<$request_stream> for LowpanService<S> {
async fn serve_to(&self, request_stream: $request_stream) -> anyhow::Result<()> {
request_stream
.err_into::<Error>()
.try_for_each_concurrent(MAX_CONCURRENT, |command| async {
match command {
$request::Connect { name, server_end, .. } => {
if let Err(err) = match self.lookup(&name) {
Ok(dev) => dev.get_protocols(Protocols {
$protocol_member: Some(server_end),
..Default::default()
}),
Err(err) => server_end.close_with_epitaph(err.into()),
} {
warn!("{:?}", err);
}
}
}
Result::<(), anyhow::Error>::Ok(())
})
.inspect_err(|e| error!("{:?}", e))
.await
}
}
};
}
impl_serve_to_driver!(DeviceConnectorRequestStream, DeviceConnectorRequest, device);
impl_serve_to_driver!(DeviceExtraConnectorRequestStream, DeviceExtraConnectorRequest, device_extra);
impl_serve_to_driver!(DeviceRouteConnectorRequestStream, DeviceRouteConnectorRequest, device_route);
impl_serve_to_driver!(
DeviceRouteExtraConnectorRequestStream,
DeviceRouteExtraConnectorRequest,
device_route_extra
);
impl_serve_to_driver!(CountersConnectorRequestStream, CountersConnectorRequest, counters);
impl_serve_to_driver!(DeviceTestConnectorRequestStream, DeviceTestConnectorRequest, device_test);
impl_serve_to_driver!(DatasetConnectorRequestStream, DatasetConnectorRequest, thread_dataset);
impl_serve_to_driver!(
LegacyJoiningConnectorRequestStream,
LegacyJoiningConnectorRequest,
thread_legacy_joining
);
impl_serve_to_driver!(MeshcopConnectorRequestStream, MeshcopConnectorRequest, meshcop);
impl_serve_to_driver!(EnergyScanConnectorRequestStream, EnergyScanConnectorRequest, energy_scan);
impl_serve_to_driver!(
ExperimentalDeviceConnectorRequestStream,
ExperimentalDeviceConnectorRequest,
experimental_device
);
impl_serve_to_driver!(
ExperimentalDeviceExtraConnectorRequestStream,
ExperimentalDeviceExtraConnectorRequest,
experimental_device_extra
);
impl_serve_to_driver!(
TelemetryProviderConnectorRequestStream,
TelemetryProviderConnectorRequest,
telemetry_provider
);
impl_serve_to_driver!(FeatureConnectorRequestStream, FeatureConnectorRequest, thread_feature);
impl_serve_to_driver!(
CapabilitiesConnectorRequestStream,
CapabilitiesConnectorRequest,
capabilities
);
#[async_trait::async_trait()]
impl<S: Sync> ServeTo<DeviceWatcherRequestStream> for LowpanService<S> {
async fn serve_to(&self, request_stream: DeviceWatcherRequestStream) -> anyhow::Result<()> {
use futures::lock::Mutex;
let last_device_list: Mutex<Option<Vec<String>>> = Mutex::new(None);
request_stream
.err_into::<Error>()
.try_for_each_concurrent(MAX_CONCURRENT, |command| async {
match command {
DeviceWatcherRequest::WatchDevices { responder } => {
let mut locked_device_list =
last_device_list.try_lock().ok_or(format_err!(
"No more than 1 outstanding call to watch_devices is allowed"
))?;
if locked_device_list.is_none() {
// This is the first call to WatchDevices,
// so we return the whole list.
*locked_device_list = Some(self.get_devices());
let (added, removed) = (locked_device_list.clone().unwrap(), []);
responder.send(&added, &removed).context("error sending response")?;
} else {
// This is a follow-up call.
let current_devices = loop {
let wait = self.added_removed_cond.wait();
let current_devices = self.get_devices();
// Note that this should work even though the returned
// list of interfaces isn't sorted. As long as the
// keys aren't intentionally shuffled when nothing
// has changed then this check should work just fine.
if current_devices != *locked_device_list.as_ref().unwrap() {
break current_devices;
}
// We wait here for something to change.
wait.await;
};
// Devices have been added or removed, let's sort them out.
// Calculate devices added.
// This mechanism is O(n^2), but in reality n is going to
// almost always be 1---so it makes sense to prioritize
// convenience. It may even be slower to try to optimize
// this.
let added = current_devices
.iter()
.filter_map(|name| {
if !locked_device_list.as_ref().unwrap().contains(name) {
Some(name.clone())
} else {
None
}
})
.collect::<Vec<_>>();
// Calculate devices removed.
// This mechanism is O(n^2), but in reality n is going to
// almost always be 1---so it makes sense to prioritize
// convenience. It may even be slower to try to optimize
// this.
let removed = locked_device_list
.as_ref()
.unwrap()
.iter()
.filter_map(|name| {
if !current_devices.contains(name) {
Some(name.clone())
} else {
None
}
})
.collect::<Vec<_>>();
// Save our current list of devices so that we
// can use it the next time this method is called.
*locked_device_list = Some(current_devices);
responder.send(&added, &removed).context("error sending response")?;
}
}
}
Result::<(), anyhow::Error>::Ok(())
})
.inspect_err(|e| error!("{:?}", e))
.await
}
}
#[async_trait::async_trait()]
impl<S: Spawn + Sync> ServeTo<RegisterRequestStream> for LowpanService<S> {
async fn serve_to(&self, request_stream: RegisterRequestStream) -> anyhow::Result<()> {
let control_handle = request_stream.control_handle();
request_stream
.err_into::<Error>()
.try_for_each_concurrent(MAX_CONCURRENT, |command| async {
match command {
RegisterRequest::RegisterDevice { name, driver, .. } => {
info!("Received register request for {:?}", name);
if let Err(err) = self.register(&name, driver) {
control_handle.shutdown_with_epitaph(err);
}
}
}
Result::<(), anyhow::Error>::Ok(())
})
.inspect_err(|e| error!("{:?}", e))
.await
}
}
mod factory {
use super::*;
impl<S: Spawn + Sync> LowpanService<S> {
pub fn lookup_factory(&self, name: &str) -> Result<FactoryDriverProxy, ZxStatus> {
let devices = self.devices_factory.lock();
if let Some(device) = devices.get(name) {
Ok(device.clone())
} else {
Err(ZxStatus::NOT_FOUND)
}
}
pub fn register_factory(
&self,
name: &str,
driver: fidl::endpoints::ClientEnd<FactoryDriverMarker>,
) -> Result<(), ZxStatus> {
let driver = driver.into_proxy().map_err(|_| ZxStatus::INVALID_ARGS)?;
if !DEVICE_NAME_REGEX.is_match(name) {
error!("Attempted to register LoWPAN device with invalid name {:?}", name);
return Err(ZxStatus::INVALID_ARGS);
}
let name = name.to_string();
// Lock the device list.
let mut devices = self.devices_factory.lock();
// Check to make sure there already aren't too many devices.
if devices.len() >= MAX_LOWPAN_DEVICES as usize {
return Err(ZxStatus::NO_RESOURCES);
}
// Check for existing devices with the same name.
if devices.contains_key(&name) {
return Err(ZxStatus::ALREADY_EXISTS);
}
// Insert the new device into the list.
devices.insert(name.clone(), driver.clone());
Ok(())
}
}
#[async_trait::async_trait()]
impl<S: Spawn + Sync> ServeTo<FactoryLookupRequestStream> for LowpanService<S> {
async fn serve_to(&self, request_stream: FactoryLookupRequestStream) -> anyhow::Result<()> {
request_stream
.err_into::<Error>()
.try_for_each_concurrent(MAX_CONCURRENT, |command| async {
match command {
FactoryLookupRequest::Lookup { name, device_factory, .. } => {
info!("Received lookup factory request for {:?}", name);
if let Err(err) = match self.lookup_factory(&name) {
Ok(dev) => dev.get_factory_device(device_factory),
Err(err) => device_factory.close_with_epitaph(err.into()),
} {
error!("{:?}", err);
}
}
}
Result::<(), anyhow::Error>::Ok(())
})
.inspect_err(|e| error!("{:?}", e))
.await
}
}
#[async_trait::async_trait()]
impl<S: Spawn + Sync> ServeTo<FactoryRegisterRequestStream> for LowpanService<S> {
async fn serve_to(
&self,
request_stream: FactoryRegisterRequestStream,
) -> anyhow::Result<()> {
let control_handle = request_stream.control_handle();
request_stream
.err_into::<Error>()
.try_for_each_concurrent(MAX_CONCURRENT, |command| async {
match command {
FactoryRegisterRequest::Register { name, driver, .. } => {
info!("Received register factory request for {:?}", name);
if let Err(err) = self.register_factory(&name, driver) {
control_handle.shutdown_with_epitaph(err);
}
}
}
Result::<(), anyhow::Error>::Ok(())
})
.inspect_err(|e| error!("{:?}", e))
.await
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use fidl::endpoints::create_endpoints;
use fuchsia_async as fasync;
#[fasync::run_until_stalled(test)]
async fn test_interface_name_check() {
let service = LowpanService::with_spawner(FuchsiaGlobalExecutor);
let (client_ep, _) = create_endpoints::<DriverMarker>();
assert_eq!(service.register("lowpan0", client_ep), Ok(()));
let (client_ep, _) = create_endpoints::<DriverMarker>();
assert_eq!(service.register("low pan 0", client_ep), Err(ZxStatus::INVALID_ARGS));
let (client_ep, _) = create_endpoints::<DriverMarker>();
assert_eq!(service.register("0lowpan", client_ep), Err(ZxStatus::INVALID_ARGS));
let (client_ep, _) = create_endpoints::<DriverMarker>();
assert_eq!(service.register("l", client_ep), Err(ZxStatus::INVALID_ARGS));
}
#[fasync::run_until_stalled(test)]
async fn test_factory_interface() {
let service = LowpanService::with_spawner(FuchsiaGlobalExecutor);
let (client_ep, _) = create_endpoints::<FactoryDriverMarker>();
assert_eq!(service.register_factory("lowpan0", client_ep), Ok(()));
}
#[fasync::run_until_stalled(test)]
async fn test_interface_added_notifications() {
let service = LowpanService::with_spawner(FuchsiaGlobalExecutor);
let waiter = service.added_removed_cond.wait();
let (driver_client_ep, _) = create_endpoints::<DriverMarker>();
assert_eq!(service.register("lowpan0", driver_client_ep), Ok(()));
waiter.await;
}
#[fasync::run_singlethreaded(test)]
async fn test_interface_removed_notifications() {
let service = LowpanService::with_spawner(FuchsiaGlobalExecutor);
let mut waiter = service.added_removed_cond.wait();
{
let (driver_client_ep, _) = create_endpoints::<DriverMarker>();
assert_eq!(service.register("lowpan0", driver_client_ep), Ok(()));
waiter.await;
waiter = service.added_removed_cond.wait();
}
waiter.await;
}
}