blob: a019e87f6b6bb317c8afab8c0b2a2866cc01520f [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use fidl_fuchsia_wlan_stats::IfaceStats;
use fuchsia_zircon as zx;
use futures::channel::{mpsc, oneshot};
use futures::prelude::*;
use parking_lot::Mutex;
use std::sync::Arc;
use crate::future_util::GroupAvailableExt;
use crate::Never;
// TODO(gbonik): get rid of the Mutex when FIDL APIs make it possible
// Mutex is a workaround for the fact that Responder.send() takes a &mut.
pub type StatsRef = Arc<Mutex<IfaceStats>>;
type Responder = oneshot::Sender<StatsRef>;
pub struct StatsScheduler {
queue: mpsc::UnboundedSender<Responder>,
}
pub fn create_scheduler() -> (StatsScheduler, impl Stream<Item = StatsRequest>) {
let (sender, receiver) = mpsc::unbounded();
let scheduler = StatsScheduler { queue: sender };
let req_stream = receiver
.map(Ok)
.group_available()
.map_ok(StatsRequest)
.map(|x| x.unwrap_or_else(Never::into_any));
(scheduler, req_stream)
}
impl StatsScheduler {
pub fn get_stats(&self) -> impl Future<Output = Result<StatsRef, zx::Status>> {
let (sender, receiver) = oneshot::channel();
// Ignore the error: if the other side is closed, `sender` will be immediately
// dropped, and `receiver` will be notified
self.queue.unbounded_send(sender).unwrap_or_else(|_| ());
receiver.map_err(|_| zx::Status::CANCELED)
}
}
pub struct StatsRequest(Vec<Responder>);
impl StatsRequest {
pub fn reply(self, stats: IfaceStats) {
let stats = Arc::new(Mutex::new(stats));
for responder in self.0 {
responder.send(stats.clone()).unwrap_or_else(|_| ());
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use fidl_fuchsia_wlan_stats::{Counter, DispatcherStats, IfaceStats, PacketCounter};
use fuchsia_async as fasync;
use futures::task::Poll;
#[test]
fn schedule() {
let mut exec = fasync::Executor::new().expect("Failed to create an executor");
let (sched, req_stream) = create_scheduler();
let mut fut1 = sched.get_stats();
let mut fut2 = sched.get_stats();
let mut counter = 0;
let mut server = req_stream.for_each(move |req| {
counter += 100;
future::ready(req.reply(fake_iface_stats(counter)))
});
exec.run_until_stalled(&mut server);
let res1 = exec.run_until_stalled(&mut fut1);
match res1 {
Poll::Ready(Ok(r)) => assert_eq!(fake_iface_stats(100), *r.lock()),
Poll::Ready(Err(e)) => panic!("request future 1 returned an error: {:?}", e),
Poll::Pending => panic!("request future 1 returned 'Pending'"),
}
let res2 = exec.run_until_stalled(&mut fut2);
match res2 {
Poll::Ready(Ok(r)) => assert_eq!(fake_iface_stats(100), *r.lock()),
Poll::Ready(Err(e)) => panic!("request future 2 returned an error: {:?}", e),
Poll::Pending => panic!("request future 2 returned 'Pending'"),
}
let mut fut3 = sched.get_stats();
exec.run_until_stalled(&mut server);
let res3 = exec.run_until_stalled(&mut fut3);
match res3 {
Poll::Ready(Ok(r)) => assert_eq!(fake_iface_stats(200), *r.lock()),
Poll::Ready(Err(e)) => panic!("request future 3 returned an error: {:?}", e),
Poll::Pending => panic!("request future 3 returned 'Pending'"),
}
}
#[test]
fn canceled_if_server_dropped_after_request() {
let mut exec = fasync::Executor::new().expect("Failed to create an executor");
let (sched, req_stream) = create_scheduler();
let mut fut = sched.get_stats();
::std::mem::drop(req_stream);
let res = exec.run_until_stalled(&mut fut);
if let Poll::Ready(Err(zx::Status::CANCELED)) = res {
// OK
} else {
panic!("canceled error not found");
}
}
#[test]
fn canceled_if_server_dropped_before_request() {
let mut exec = fasync::Executor::new().expect("Failed to create an executor");
let (sched, req_stream) = create_scheduler();
::std::mem::drop(req_stream);
let mut fut = sched.get_stats();
let res = exec.run_until_stalled(&mut fut);
if let Poll::Ready(Err(zx::Status::CANCELED)) = res {
// OK
} else {
panic!("no cancelled error");
}
}
fn fake_iface_stats(count: u64) -> IfaceStats {
IfaceStats {
dispatcher_stats: DispatcherStats {
any_packet: fake_packet_counter(count),
mgmt_frame: fake_packet_counter(count),
ctrl_frame: fake_packet_counter(count),
data_frame: fake_packet_counter(count),
},
mlme_stats: None,
}
}
fn fake_packet_counter(count: u64) -> PacketCounter {
PacketCounter {
in_: fake_counter(count),
out: fake_counter(count),
drop: fake_counter(count),
in_bytes: fake_counter(count),
out_bytes: fake_counter(count),
drop_bytes: fake_counter(count),
}
}
fn fake_counter(count: u64) -> Counter {
Counter { count: count, name: "foo".to_string() }
}
}