[wlan][sme] Replace Tokens with oneshot::Sender in mesh.rs
Same idea as the client.rs change. Get rid of extra abstraction and the
UserEvent queue, instead use "oneshot" channels directly.
TEST=CQ
Change-Id: I7467f711974d6cd53f0a78ad61da906fc4106d40
diff --git a/bin/wlan/wlanstack/src/station/mesh.rs b/bin/wlan/wlanstack/src/station/mesh.rs
index 96cece2..18ae775 100644
--- a/bin/wlan/wlanstack/src/station/mesh.rs
+++ b/bin/wlan/wlanstack/src/station/mesh.rs
@@ -7,7 +7,7 @@
fidl_fuchsia_wlan_mlme::{MlmeEventStream, MlmeProxy},
fidl_fuchsia_wlan_sme as fidl_sme,
futures::{
- channel::{mpsc, oneshot},
+ channel::mpsc,
Poll,
prelude::*,
select,
@@ -21,7 +21,7 @@
},
wlan_sme::{
DeviceInfo,
- mesh::{self as mesh_sme, UserEvent},
+ mesh as mesh_sme,
timer::TimeEntry,
},
crate::{
@@ -30,15 +30,8 @@
},
};
-struct Tokens;
-
-impl mesh_sme::Tokens for Tokens {
- type JoinToken = oneshot::Sender<mesh_sme::JoinMeshResult>;
- type LeaveToken = oneshot::Sender<mesh_sme::LeaveMeshResult>;
-}
-
pub type Endpoint = fidl::endpoints::ServerEnd<fidl_sme::MeshSmeMarker>;
-type Sme = mesh_sme::MeshSme<Tokens>;
+type Sme = mesh_sme::MeshSme;
pub async fn serve<S>(proxy: MlmeProxy,
device_info: DeviceInfo,
@@ -48,12 +41,12 @@
-> Result<(), failure::Error>
where S: Stream<Item = StatsRequest> + Send + Unpin
{
- let (sme, mlme_stream, user_stream) = Sme::new(device_info);
+ let (sme, mlme_stream) = Sme::new(device_info);
let sme = Arc::new(Mutex::new(sme));
let time_stream = stream::poll_fn::<TimeEntry<()>, _>(|_| Poll::Pending);
let mlme_sme = super::serve_mlme_sme(
proxy, event_stream, Arc::clone(&sme), mlme_stream, stats_requests, time_stream);
- let sme_fidl = serve_fidl(sme, new_fidl_clients, user_stream);
+ let sme_fidl = serve_fidl(&sme, new_fidl_clients);
pin_mut!(mlme_sme);
pin_mut!(sme_fidl);
select! {
@@ -63,22 +56,16 @@
Ok(())
}
-async fn serve_fidl(sme: Arc<Mutex<Sme>>,
- new_fidl_clients: mpsc::UnboundedReceiver<Endpoint>,
- user_stream: mesh_sme::UserStream<Tokens>)
+async fn serve_fidl(sme: &Mutex<Sme>,
+ new_fidl_clients: mpsc::UnboundedReceiver<Endpoint>)
-> Result<Never, failure::Error>
{
let mut fidl_clients = FuturesUnordered::new();
- let mut user_stream = user_stream.fuse();
let mut new_fidl_clients = new_fidl_clients.fuse();
loop {
select! {
- user_event = user_stream.next() => match user_event {
- Some(e) => handle_user_event(e),
- None => bail!("Stream of events from SME unexpectedly ended"),
- },
new_fidl_client = new_fidl_clients.next() => match new_fidl_client {
- Some(c) => fidl_clients.push(serve_fidl_endpoint(Arc::clone(&sme), c)),
+ Some(c) => fidl_clients.push(serve_fidl_endpoint(sme, c)),
None => bail!("New FIDL client stream unexpectedly ended"),
},
// Drive clients towards completion
@@ -87,14 +74,7 @@
}
}
-fn handle_user_event(e: UserEvent<Tokens>) {
- match e {
- UserEvent::JoinMeshFinished { token, result } => token.send(result).unwrap_or_else(|_| ()),
- UserEvent::LeaveMeshFinished { token, result } => token.send(result).unwrap_or_else(|_| ()),
- }
-}
-
-async fn serve_fidl_endpoint(sme: Arc<Mutex<Sme>>, endpoint: Endpoint) {
+async fn serve_fidl_endpoint(sme: &Mutex<Sme>, endpoint: Endpoint) {
const MAX_CONCURRENT_REQUESTS: usize = 1000;
let stream = match endpoint.into_stream() {
Ok(s) => s,
@@ -104,14 +84,14 @@
}
};
let r = await!(stream.try_for_each_concurrent(MAX_CONCURRENT_REQUESTS, move |request| {
- handle_fidl_request(Arc::clone(&sme), request)
+ handle_fidl_request(&sme, request)
}));
if let Err(e) = r {
error!("Error serving a FIDL client of Mesh SME: {}", e);
}
}
-async fn handle_fidl_request(sme: Arc<Mutex<Sme>>, request: fidl_sme::MeshSmeRequest)
+async fn handle_fidl_request(sme: &Mutex<Sme>, request: fidl_sme::MeshSmeRequest)
-> Result<(), ::fidl::Error>
{
match request {
@@ -126,11 +106,10 @@
}
}
-async fn join_mesh(sme: Arc<Mutex<Sme>>, config: fidl_sme::MeshConfig)
+async fn join_mesh(sme: &Mutex<Sme>, config: fidl_sme::MeshConfig)
-> fidl_sme::JoinMeshResultCode
{
- let (sender, receiver) = oneshot::channel();
- sme.lock().unwrap().on_join_command(sender, mesh_sme::Config {
+ let receiver = sme.lock().unwrap().on_join_command(mesh_sme::Config {
mesh_id: config.mesh_id,
channel: config.channel,
});
@@ -151,9 +130,8 @@
}
}
-async fn leave_mesh(sme: Arc<Mutex<Sme>>) -> fidl_sme::LeaveMeshResultCode {
- let (sender, receiver) = oneshot::channel();
- sme.lock().unwrap().on_leave_command(sender);
+async fn leave_mesh(sme: &Mutex<Sme>) -> fidl_sme::LeaveMeshResultCode {
+ let receiver = sme.lock().unwrap().on_leave_command();
let r = await!(receiver).unwrap_or_else(|_| {
error!("Responder for Leave Mesh command was dropped without sending a response");
mesh_sme::LeaveMeshResult::InternalError
diff --git a/lib/rust/wlan-sme/src/mesh/mod.rs b/lib/rust/wlan-sme/src/mesh/mod.rs
index 0ac253b..a11f28f 100644
--- a/lib/rust/wlan-sme/src/mesh/mod.rs
+++ b/lib/rust/wlan-sme/src/mesh/mod.rs
@@ -5,7 +5,7 @@
use {
fidl_fuchsia_wlan_common::{self as fidl_common},
fidl_fuchsia_wlan_mlme::{self as fidl_mlme, MlmeEvent},
- futures::channel::mpsc,
+ futures::channel::{mpsc, oneshot},
log::{error},
std::mem,
wlan_common::channel::{Channel, Cbw},
@@ -14,6 +14,7 @@
DeviceInfo,
MlmeRequest,
phy_selection::get_device_band_info,
+ responder::Responder,
sink::MlmeSink,
timer::TimedEvent,
},
@@ -22,79 +23,58 @@
const DEFAULT_BEACON_PERIOD: u16 = 1000;
const DEFAULT_DTIM_PERIOD: u8 = 1;
-
-// A token is an opaque value that identifies a particular request from a user.
-// To avoid parameterizing over many different token types, we introduce a helper
-// trait that enables us to group them into a single generic parameter.
-pub trait Tokens {
- type JoinToken;
- type LeaveToken;
-}
-
-mod internal {
- pub type UserSink<T> = crate::sink::UnboundedSink<super::UserEvent<T>>;
-}
-use self::internal::*;
-
-pub type UserStream<T> = mpsc::UnboundedReceiver<UserEvent<T>>;
-
// A list of pending join/leave requests to be maintained in the intermediate
// 'Joining'/'Leaving' states where we are waiting for a reply from MLME and cannot
// serve the requests immediately.
-struct PendingRequests<T: Tokens> {
- leave: Vec<T::LeaveToken>,
- join: Option<(T::JoinToken, Config)>,
+struct PendingRequests {
+ leave: Vec<Responder<LeaveMeshResult>>,
+ join: Option<(Responder<JoinMeshResult>, Config)>,
}
-impl<T: Tokens> PendingRequests<T> {
+impl PendingRequests {
pub fn new() -> Self {
PendingRequests { leave: Vec::new(), join: None }
}
- pub fn enqueue_leave(&mut self, user_sink: &UserSink<T>, token: T::LeaveToken) {
- self.replace_join_request(user_sink, None);
- self.leave.push(token);
+ pub fn enqueue_leave(&mut self, responder: Responder<LeaveMeshResult>) {
+ self.replace_join_request(None);
+ self.leave.push(responder);
}
- pub fn enqueue_join(&mut self, user_sink: &UserSink<T>, token: T::JoinToken, config: Config) {
- self.replace_join_request(user_sink, Some((token, config)));
+ pub fn enqueue_join(&mut self, responder: Responder<JoinMeshResult>, config: Config) {
+ self.replace_join_request(Some((responder, config)));
}
pub fn is_empty(&self) -> bool {
self.leave.is_empty() && self.join.is_none()
}
- fn replace_join_request(
- &mut self,
- user_sink: &UserSink<T>,
- req: Option<(T::JoinToken, Config)>)
- {
- if let Some((old_token, _)) = mem::replace(&mut self.join, req) {
- report_join_finished(user_sink, old_token, JoinMeshResult::Canceled);
+ fn replace_join_request(&mut self, req: Option<(Responder<JoinMeshResult>, Config)>) {
+ if let Some((old_responder, _)) = mem::replace(&mut self.join, req) {
+ old_responder.respond(JoinMeshResult::Canceled);
}
}
}
-enum State<T: Tokens> {
+enum State {
Idle,
Joining {
- token: T::JoinToken,
+ responder: Responder<JoinMeshResult>,
config: Config,
- pending: PendingRequests<T>,
+ pending: PendingRequests,
},
Joined {
config: Config,
},
Leaving {
config: Config,
- pending: PendingRequests<T>,
+ pending: PendingRequests,
}
}
-pub struct MeshSme<T: Tokens> {
+pub struct MeshSme {
mlme_sink: MlmeSink,
- user_sink: UserSink<T>,
- state: Option<State<T>>,
+ state: Option<State>,
device_info: DeviceInfo,
}
@@ -121,82 +101,69 @@
InternalError,
}
-// A message from the Mesh node to a user or a group of listeners
-#[derive(Debug)]
-pub enum UserEvent<T: Tokens> {
- JoinMeshFinished {
- token: T::JoinToken,
- result: JoinMeshResult,
- },
- LeaveMeshFinished {
- token: T::LeaveToken,
- result: LeaveMeshResult,
- }
-}
-
-impl<T: Tokens> MeshSme<T> {
- pub fn on_join_command(&mut self, token: T::JoinToken, config: Config) {
+impl MeshSme {
+ pub fn on_join_command(&mut self, config: Config) -> oneshot::Receiver<JoinMeshResult> {
+ let (responder, receiver) = Responder::new();
if let Err(result) = validate_config(&config) {
- report_join_finished(&self.user_sink, token, result);
- return;
+ responder.respond(result);
+ return receiver;
}
self.state = Some(match self.state.take().unwrap() {
State::Idle => {
self.mlme_sink.send(MlmeRequest::Start(create_start_request(&config)));
- State::Joining { token, pending: PendingRequests::new(), config }
+ State::Joining { responder, pending: PendingRequests::new(), config }
},
- State::Joining { token: cur_token, config: cur_config, mut pending } => {
- pending.enqueue_join(&self.user_sink, token, config);
- State::Joining { token: cur_token, config: cur_config, pending }
+ State::Joining { responder: cur_responder, config: cur_config, mut pending } => {
+ pending.enqueue_join(responder, config);
+ State::Joining { responder: cur_responder, config: cur_config, pending }
},
State::Joined { config: cur_config } => {
self.mlme_sink.send(MlmeRequest::Stop(create_stop_request()));
let mut pending = PendingRequests::new();
- pending.enqueue_join(&self.user_sink, token, config);
+ pending.enqueue_join(responder, config);
State::Leaving { config: cur_config, pending }
},
State::Leaving { config: cur_config, mut pending } => {
- pending.enqueue_join(&self.user_sink, token, config);
+ pending.enqueue_join(responder, config);
State::Leaving { config: cur_config, pending }
}
});
+ receiver
}
- pub fn on_leave_command(&mut self, token: T::LeaveToken) {
+ pub fn on_leave_command(&mut self) -> oneshot::Receiver<LeaveMeshResult> {
+ let (responder, receiver) = Responder::new();
self.state = Some(match self.state.take().unwrap() {
State::Idle => {
- report_leave_finished(&self.user_sink, token, LeaveMeshResult::Success);
+ responder.respond(LeaveMeshResult::Success);
State::Idle
},
- State::Joining { token: cur_token, config, mut pending } => {
- pending.enqueue_leave(&self.user_sink, token);
- State::Joining { token: cur_token, pending, config }
+ State::Joining { responder: cur_responder, config, mut pending } => {
+ pending.enqueue_leave(responder);
+ State::Joining { responder: cur_responder, pending, config }
},
State::Joined { config } => {
self.mlme_sink.send(MlmeRequest::Stop(create_stop_request()));
let mut pending = PendingRequests::new();
- pending.enqueue_leave(&self.user_sink, token);
+ pending.enqueue_leave(responder);
State::Leaving { config, pending }
},
State::Leaving { config, mut pending } => {
- pending.enqueue_leave(&self.user_sink, token);
+ pending.enqueue_leave(responder);
State::Leaving { config, pending }
}
});
+ receiver
}
}
-fn on_back_to_idle<T: Tokens>(
- pending: PendingRequests<T>,
- user_sink: &UserSink<T>,
- mlme_sink: &MlmeSink
-) -> State<T> {
- for token in pending.leave {
- report_leave_finished(user_sink, token, LeaveMeshResult::Success);
+fn on_back_to_idle(pending: PendingRequests, mlme_sink: &MlmeSink) -> State {
+ for responder in pending.leave {
+ responder.respond(LeaveMeshResult::Success);
}
- if let Some((token, config)) = pending.join {
+ if let Some((responder, config)) = pending.join {
mlme_sink.send(MlmeRequest::Start(create_start_request(&config)));
- State::Joining { token, config, pending: PendingRequests::new() }
+ State::Joining { responder, config, pending: PendingRequests::new() }
} else {
State::Idle
}
@@ -236,16 +203,16 @@
fidl_mlme::StopRequest { ssid: vec![], }
}
-impl<T: Tokens> super::Station for MeshSme<T> {
+impl super::Station for MeshSme {
type Event = ();
fn on_mlme_event(&mut self, event: MlmeEvent) {
self.state = Some(match self.state.take().unwrap() {
State::Idle => State::Idle,
- State::Joining { token, pending, config } => match event {
+ State::Joining { responder, pending, config } => match event {
MlmeEvent::StartConf { resp } => match resp.result_code {
fidl_mlme::StartResultCodes::Success => {
- report_join_finished(&self.user_sink, token, JoinMeshResult::Success);
+ responder.respond(JoinMeshResult::Success);
if pending.is_empty() {
State::Joined { config }
} else {
@@ -258,11 +225,11 @@
},
other => {
error!("failed to join mesh: {:?}", other);
- report_join_finished(&self.user_sink, token, JoinMeshResult::InternalError);
- on_back_to_idle(pending, &self.user_sink, &self.mlme_sink)
+ responder.respond(JoinMeshResult::InternalError);
+ on_back_to_idle(pending, &self.mlme_sink)
}
},
- _ => State::Joining { token, pending, config },
+ _ => State::Joining { responder, pending, config },
},
State::Joined { config } => match event {
MlmeEvent::IncomingMpOpenAction { action } => {
@@ -303,16 +270,14 @@
State::Leaving { config, pending } => match event {
MlmeEvent::StopConf { resp } => match resp.result_code {
fidl_mlme::StopResultCodes::Success =>
- on_back_to_idle(pending, &self.user_sink, &self.mlme_sink),
+ on_back_to_idle(pending, &self.mlme_sink),
other => {
error!("failed to leave mesh: {:?}", other);
- for token in pending.leave {
- report_leave_finished(
- &self.user_sink, token, LeaveMeshResult::InternalError);
+ for responder in pending.leave {
+ responder.respond(LeaveMeshResult::InternalError);
}
- if let Some((token, _)) = pending.join {
- report_join_finished(
- &self.user_sink, token, JoinMeshResult::InternalError);
+ if let Some((responder, _)) = pending.join {
+ responder.respond(JoinMeshResult::InternalError);
}
State::Joined { config }
}
@@ -348,30 +313,15 @@
})
}
-fn report_join_finished<T: Tokens>(user_sink: &UserSink<T>,
- token: T::JoinToken,
- result: JoinMeshResult)
-{
- user_sink.send(UserEvent::JoinMeshFinished { token, result });
-}
-
-fn report_leave_finished<T: Tokens>(user_sink: &UserSink<T>, token: T::LeaveToken,
- result: LeaveMeshResult)
-{
- user_sink.send(UserEvent::LeaveMeshFinished { token, result });
-}
-
-impl<T: Tokens> MeshSme<T> {
- pub fn new(device_info: DeviceInfo) -> (Self, crate::MlmeStream, UserStream<T>) {
+impl MeshSme {
+ pub fn new(device_info: DeviceInfo) -> (Self, crate::MlmeStream) {
let (mlme_sink, mlme_stream) = mpsc::unbounded();
- let (user_sink, user_stream) = mpsc::unbounded();
let sme = MeshSme {
mlme_sink: MlmeSink::new(mlme_sink),
- user_sink: UserSink::new(user_sink),
state: Some(State::Idle),
device_info,
};
- (sme, mlme_stream, user_stream)
+ (sme, mlme_stream)
}
}