blob: 808f8384a2d2ddddd9ade7bbc9e963f442f56d39 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
failure::{bail, Error, format_err, ResultExt},
fidl_fuchsia_wlan_device as fidl_wlan_dev,
fidl_fuchsia_wlan_mlme::{self as fidl_mlme, DeviceQueryConfirm, MlmeEventStream},
fuchsia_async::Timer,
fuchsia_wlan_dev as wlan_dev,
fuchsia_zircon::prelude::*,
futures::{
channel::mpsc,
future::{Future, FutureObj},
select,
stream::{Stream, StreamExt, TryStreamExt},
},
log::{error, info, warn},
pin_utils::pin_mut,
std::{
collections::HashSet,
marker::Unpin,
sync::Arc,
},
wlan_sme,
};
use crate::{
cobalt_reporter::CobaltSender,
device_watch::{self, NewIfaceDevice},
future_util::ConcurrentTasks,
mlme_query_proxy::MlmeQueryProxy,
Never,
station,
stats_scheduler::{self, StatsScheduler},
watchable_map::WatchableMap,
};
pub struct PhyDevice {
pub proxy: fidl_wlan_dev::PhyProxy,
pub device: wlan_dev::Device,
}
pub type ClientSmeServer = mpsc::UnboundedSender<super::station::client::Endpoint>;
pub type ApSmeServer = mpsc::UnboundedSender<super::station::ap::Endpoint>;
pub type MeshSmeServer = mpsc::UnboundedSender<super::station::mesh::Endpoint>;
pub enum SmeServer {
Client(ClientSmeServer),
Ap(ApSmeServer),
Mesh(MeshSmeServer),
}
pub struct IfaceDevice {
pub sme_server: SmeServer,
pub stats_sched: StatsScheduler,
pub device: wlan_dev::Device,
pub mlme_query: MlmeQueryProxy,
}
pub type PhyMap = WatchableMap<u16, PhyDevice>;
pub type IfaceMap = WatchableMap<u16, IfaceDevice>;
pub async fn serve_phys(phys: Arc<PhyMap>) -> Result<Never, Error> {
let mut new_phys = device_watch::watch_phy_devices()?;
let mut active_phys = ConcurrentTasks::new();
loop {
let mut new_phy = new_phys.next();
select! {
new_phy => match new_phy {
None => bail!("new phy stream unexpectedly finished"),
Some(Err(e)) => bail!("new phy stream returned an error: {}", e),
Some(Ok(new_phy)) => {
let fut = serve_phy(&phys, new_phy);
active_phys.add(fut);
}
},
active_phys => active_phys.into_any(),
}
}
}
async fn serve_phy(phys: &PhyMap, new_phy: device_watch::NewPhyDevice) {
info!("new phy #{}: {}", new_phy.id, new_phy.device.path().to_string_lossy());
let id = new_phy.id;
let event_stream = new_phy.proxy.take_event_stream();
phys.insert(id, PhyDevice {
proxy: new_phy.proxy,
device: new_phy.device,
});
let r = await!(event_stream.map_ok(|_| ()).try_collect::<()>());
phys.remove(&id);
if let Err(e) = r {
error!("error reading from the FIDL channel of phy #{}: {}", id, e);
}
info!("phy removed: #{}", id);
}
pub async fn serve_ifaces(ifaces: Arc<IfaceMap>, cobalt_sender: CobaltSender) -> Result<Never, Error> {
let mut new_ifaces = device_watch::watch_iface_devices()?;
let mut active_ifaces = ConcurrentTasks::new();
loop {
let mut new_iface = new_ifaces.next();
select! {
new_iface => match new_iface {
None => bail!("new iface stream unexpectedly finished"),
Some(Err(e)) => bail!("new iface stream returned an error: {}", e),
Some(Ok(new_iface)) => {
let fut = query_and_serve_iface(new_iface, &ifaces, cobalt_sender.clone());
active_ifaces.add(fut);
}
},
active_ifaces => active_ifaces.into_any(),
}
}
}
async fn query_and_serve_iface(new_iface: NewIfaceDevice, ifaces: &IfaceMap, cobalt_sender: CobaltSender) {
let NewIfaceDevice { id, device, proxy } = new_iface;
let mut event_stream = proxy.take_event_stream();
let query_resp = match await!(query_iface(proxy.clone(), &mut event_stream)) {
Ok(x) => x,
Err(e) => {
error!("Failed to query new iface '{}': {}", device.path().display(), e);
return;
}
};
let (stats_sched, stats_reqs) = stats_scheduler::create_scheduler();
let mlme_query = MlmeQueryProxy::new(proxy.clone());
let role = query_resp.role;
let (sme, sme_fut) = match create_sme(proxy, event_stream, query_resp, stats_reqs, cobalt_sender) {
Ok(x) => x,
Err(e) => {
error!("Failed to create SME for new iface '{}': {}",
device.path().display(), e);
return;
}
};
info!("new iface #{} with role '{:?}': {}", id, role, device.path().to_string_lossy());
ifaces.insert(id, IfaceDevice {
sme_server: sme,
stats_sched,
device,
mlme_query,
});
let r = await!(sme_fut);
if let Err(e) = r {
error!("Error serving station for iface #{}: {}", id, e);
}
ifaces.remove(&id);
info!("iface removed: {}", id);
}
async fn query_iface(proxy: fidl_mlme::MlmeProxy, event_stream: &mut MlmeEventStream)
-> Result<DeviceQueryConfirm, Error>
{
let query_req = &mut fidl_mlme::DeviceQueryRequest{
foo: 0,
};
proxy.device_query_req(query_req)
.context("failed to send request to device")?;
let query_conf = wait_for_query_conf(event_stream);
pin_mut!(query_conf);
let mut timeout = Timer::new(5.seconds().after_now());
select! {
query_conf => query_conf,
timeout => bail!("query request timed out"),
}
}
async fn wait_for_query_conf(event_stream: &mut MlmeEventStream)
-> Result<DeviceQueryConfirm, Error>
{
while let Some(event) = await!(event_stream.next()) {
match event {
Ok(fidl_mlme::MlmeEvent::DeviceQueryConf { resp }) => return Ok(resp),
Ok(other) => {
warn!("Unexpected message from MLME while waiting for \
device query response: {:?}", other);
},
Err(e) => bail!("error reading from FIDL channel: {}", e),
}
}
return Err(format_err!("device closed the channel before returning query response"));
}
fn create_sme<S>(proxy: fidl_mlme::MlmeProxy,
event_stream: fidl_mlme::MlmeEventStream,
query_resp: DeviceQueryConfirm,
stats_requests: S,
cobalt_sender: CobaltSender)
-> Result<(SmeServer, impl Future<Output = Result<(), Error>>), Error>
where S: Stream<Item = stats_scheduler::StatsRequest> + Send + Unpin + 'static
{
let device_info = convert_device_info(&query_resp);
match query_resp.role {
fidl_mlme::MacRole::Client => {
let (sender, receiver) = mpsc::unbounded();
let fut = station::client::serve(
proxy, device_info, event_stream, receiver, stats_requests, cobalt_sender);
Ok((SmeServer::Client(sender), FutureObj::new(Box::new(fut))))
},
fidl_mlme::MacRole::Ap => {
let (sender, receiver) = mpsc::unbounded();
let fut = station::ap::serve(
proxy, device_info, event_stream, receiver, stats_requests);
Ok((SmeServer::Ap(sender), FutureObj::new(Box::new(fut))))
},
fidl_mlme::MacRole::Mesh => {
let (sender, receiver) = mpsc::unbounded();
let fut = station::mesh::serve(
proxy, event_stream, receiver, stats_requests);
Ok((SmeServer::Mesh(sender), FutureObj::new(Box::new(fut))))
}
}
}
fn convert_device_info(query_resp: &DeviceQueryConfirm) -> wlan_sme::DeviceInfo {
let mut supported_channels = HashSet::new();
for band in &query_resp.bands {
supported_channels.extend(&band.channels);
}
wlan_sme::DeviceInfo {
supported_channels,
addr: query_resp.mac_addr,
}
}