blob: d60dc5e228ac7149b4fb7fb01bd63a2085eb570a [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
pub mod ap;
pub mod client;
pub mod mesh;
use failure::{bail, format_err};
use fidl_fuchsia_wlan_mlme::{self as fidl_mlme, MlmeEvent, MlmeEventStream, MlmeProxy};
use fidl_fuchsia_wlan_stats::IfaceStats;
use fuchsia_async as fasync;
use futures::channel::mpsc;
use futures::prelude::*;
use futures::select;
use log::warn;
use pin_utils::pin_mut;
use std::marker::Unpin;
use std::sync::{Arc, Mutex};
use wlan_sme::{
timer::{TimeEntry, TimedEvent},
MlmeRequest, MlmeStream, Station,
};
use crate::fidl_util::is_peer_closed;
use crate::stats_scheduler::StatsRequest;
use crate::Never;
// The returned future successfully terminates when MLME closes the channel
async fn serve_mlme_sme<STA, SRS, TS>(
proxy: MlmeProxy,
mut event_stream: MlmeEventStream,
station: Arc<Mutex<STA>>,
mut mlme_stream: MlmeStream,
stats_requests: SRS,
time_stream: TS,
) -> Result<(), failure::Error>
where
STA: Station,
SRS: Stream<Item = StatsRequest> + Unpin,
TS: Stream<Item = TimeEntry<<STA as wlan_sme::Station>::Event>> + Unpin,
{
let (mut stats_sender, stats_receiver) = mpsc::channel(1);
let stats_fut = serve_stats(proxy.clone(), stats_requests, stats_receiver);
pin_mut!(stats_fut);
let mut stats_fut = stats_fut.fuse();
let mut timeout_stream = make_timer_stream(time_stream).fuse();
loop {
select! {
// Fuse rationale: any `none`s in the MLME stream should result in
// bailing immediately, so we don't need to track if we've seen a
// `None` or not and can `fuse` directly in the `select` call.
mlme_event = event_stream.next().fuse() => match mlme_event {
// Handle the stats response separately since it is SME-independent
Some(Ok(MlmeEvent::StatsQueryResp{ resp })) => handle_stats_resp(&mut stats_sender, resp)?,
Some(Ok(other)) => station.lock().unwrap().on_mlme_event(other),
Some(Err(ref e)) if is_peer_closed(e) => return Ok(()),
None => return Ok(()),
Some(Err(e)) => bail!("Error reading an event from MLME channel: {}", e),
},
mlme_req = mlme_stream.next().fuse() => match mlme_req {
Some(req) => match forward_mlme_request(req, &proxy) {
Ok(()) => {},
Err(ref e) if is_peer_closed(e) => return Ok(()),
Err(e) => bail!("Error forwarding a request from SME to MLME: {}", e),
},
None => bail!("Stream of requests from SME to MLME has ended unexpectedly"),
},
timeout = timeout_stream.next() => match timeout {
Some(timed_event) => station.lock().unwrap().on_timeout(timed_event),
None => bail!("SME timer stream has ended unexpectedly"),
},
stats = stats_fut => stats?.into_any(),
}
}
}
fn make_timer_stream<E>(
time_stream: impl Stream<Item = TimeEntry<E>>,
) -> impl Stream<Item = TimedEvent<E>> {
time_stream
.map(|(deadline, timed_event)| fasync::Timer::new(deadline).map(|_| timed_event))
.buffer_unordered(usize::max_value())
}
fn forward_mlme_request(req: MlmeRequest, proxy: &MlmeProxy) -> Result<(), fidl::Error> {
match req {
MlmeRequest::Scan(mut req) => proxy.start_scan(&mut req),
MlmeRequest::Join(mut req) => proxy.join_req(&mut req),
MlmeRequest::Authenticate(mut req) => proxy.authenticate_req(&mut req),
MlmeRequest::AuthResponse(mut resp) => proxy.authenticate_resp(&mut resp),
MlmeRequest::Associate(mut req) => proxy.associate_req(&mut req),
MlmeRequest::AssocResponse(mut resp) => proxy.associate_resp(&mut resp),
MlmeRequest::Deauthenticate(mut req) => proxy.deauthenticate_req(&mut req),
MlmeRequest::Eapol(mut req) => proxy.eapol_req(&mut req),
MlmeRequest::SetKeys(mut req) => proxy.set_keys_req(&mut req),
MlmeRequest::SetCtrlPort(mut req) => proxy.set_controlled_port(&mut req),
MlmeRequest::Start(mut req) => proxy.start_req(&mut req),
MlmeRequest::Stop(mut req) => proxy.stop_req(&mut req),
MlmeRequest::SendMpOpenAction(mut req) => proxy.send_mp_open_action(&mut req),
MlmeRequest::SendMpConfirmAction(mut req) => proxy.send_mp_confirm_action(&mut req),
MlmeRequest::MeshPeeringEstablished(mut req) => proxy.mesh_peering_established(&mut req),
}
}
fn handle_stats_resp(
stats_sender: &mut mpsc::Sender<IfaceStats>,
resp: fidl_mlme::StatsQueryResponse,
) -> Result<(), failure::Error> {
stats_sender.try_send(resp.stats).or_else(|e| {
if e.is_full() {
// We only expect one response from MLME per each request, so the bounded
// queue of size 1 should always suffice.
warn!("Received an extra GetStatsResp from MLME, discarding");
Ok(())
} else {
Err(format_err!("Failed to send a message to stats future"))
}
})
}
async fn serve_stats<S>(
proxy: MlmeProxy,
mut stats_requests: S,
mut responses: mpsc::Receiver<IfaceStats>,
) -> Result<Never, failure::Error>
where
S: Stream<Item = StatsRequest> + Unpin,
{
while let Some(req) = await!(stats_requests.next()) {
proxy
.stats_query_req()
.map_err(|e| format_err!("Failed to send a StatsReq to MLME: {}", e))?;
match await!(responses.next()) {
Some(response) => req.reply(response),
None => bail!("Stream of stats responses has ended unexpectedly"),
};
}
Err(format_err!("Stream of stats requests has ended unexpectedly"))
}
#[cfg(test)]
mod tests {
use super::*;
use {
fuchsia_zircon::{self as zx, prelude::DurationNum},
futures::channel::mpsc::{self, UnboundedSender},
futures::Poll,
pin_utils::pin_mut,
};
type Event = u32;
#[test]
fn test_timer() {
let mut exec = fasync::Executor::new().expect("failed to create an executor");
let fut = async {
let (timer, time_stream) = mpsc::unbounded::<TimeEntry<Event>>();
let mut timeout_stream = make_timer_stream(time_stream);
let now = zx::Time::get(zx::ClockId::Monotonic);
schedule(&timer, now + 40.millis(), 0);
schedule(&timer, now + 10.millis(), 1);
schedule(&timer, now + 20.millis(), 2);
schedule(&timer, now + 30.millis(), 3);
let mut events = vec![];
for _ in 0u32..4 {
match await!(timeout_stream.next()) {
Some(event) => events.push(event.event),
None => panic!("timer terminates prematurely"),
}
}
events
};
pin_mut!(fut);
for _ in 0u32..4 {
assert_eq!(Poll::Pending, exec.run_until_stalled(&mut fut));
assert!(exec.wake_next_timer().is_some());
}
match exec.run_until_stalled(&mut fut) {
Poll::Ready(events) => assert_eq!(events, vec![1, 2, 3, 0]),
Poll::Pending => panic!("expect future to complete"),
}
}
fn schedule(timer: &UnboundedSender<TimeEntry<Event>>, deadline: zx::Time, event: Event) {
let entry = (deadline, TimedEvent { id: 0, event });
timer.unbounded_send(entry).expect("expect send successful");
}
}