[wlan][sme] Replace Tokens with oneshot::Sender in mesh.rs

Same idea as the client.rs change. Get rid of extra abstraction and the
UserEvent queue, instead use "oneshot" channels directly.

TEST=CQ

Change-Id: I7467f711974d6cd53f0a78ad61da906fc4106d40
diff --git a/bin/wlan/wlanstack/src/station/mesh.rs b/bin/wlan/wlanstack/src/station/mesh.rs
index 96cece2..18ae775 100644
--- a/bin/wlan/wlanstack/src/station/mesh.rs
+++ b/bin/wlan/wlanstack/src/station/mesh.rs
@@ -7,7 +7,7 @@
     fidl_fuchsia_wlan_mlme::{MlmeEventStream, MlmeProxy},
     fidl_fuchsia_wlan_sme as fidl_sme,
     futures::{
-        channel::{mpsc, oneshot},
+        channel::mpsc,
         Poll,
         prelude::*,
         select,
@@ -21,7 +21,7 @@
     },
     wlan_sme::{
         DeviceInfo,
-        mesh::{self as mesh_sme, UserEvent},
+        mesh as mesh_sme,
         timer::TimeEntry,
     },
     crate::{
@@ -30,15 +30,8 @@
     },
 };
 
-struct Tokens;
-
-impl mesh_sme::Tokens for Tokens {
-    type JoinToken = oneshot::Sender<mesh_sme::JoinMeshResult>;
-    type LeaveToken = oneshot::Sender<mesh_sme::LeaveMeshResult>;
-}
-
 pub type Endpoint = fidl::endpoints::ServerEnd<fidl_sme::MeshSmeMarker>;
-type Sme = mesh_sme::MeshSme<Tokens>;
+type Sme = mesh_sme::MeshSme;
 
 pub async fn serve<S>(proxy: MlmeProxy,
                       device_info: DeviceInfo,
@@ -48,12 +41,12 @@
                       -> Result<(), failure::Error>
     where S: Stream<Item = StatsRequest> + Send + Unpin
 {
-    let (sme, mlme_stream, user_stream) = Sme::new(device_info);
+    let (sme, mlme_stream) = Sme::new(device_info);
     let sme = Arc::new(Mutex::new(sme));
     let time_stream = stream::poll_fn::<TimeEntry<()>, _>(|_| Poll::Pending);
     let mlme_sme = super::serve_mlme_sme(
         proxy, event_stream, Arc::clone(&sme), mlme_stream, stats_requests, time_stream);
-    let sme_fidl = serve_fidl(sme, new_fidl_clients, user_stream);
+    let sme_fidl = serve_fidl(&sme, new_fidl_clients);
     pin_mut!(mlme_sme);
     pin_mut!(sme_fidl);
     select! {
@@ -63,22 +56,16 @@
     Ok(())
 }
 
-async fn serve_fidl(sme: Arc<Mutex<Sme>>,
-                    new_fidl_clients: mpsc::UnboundedReceiver<Endpoint>,
-                    user_stream: mesh_sme::UserStream<Tokens>)
+async fn serve_fidl(sme: &Mutex<Sme>,
+                    new_fidl_clients: mpsc::UnboundedReceiver<Endpoint>)
     -> Result<Never, failure::Error>
 {
     let mut fidl_clients = FuturesUnordered::new();
-    let mut user_stream = user_stream.fuse();
     let mut new_fidl_clients = new_fidl_clients.fuse();
     loop {
         select! {
-            user_event = user_stream.next() => match user_event {
-                Some(e) => handle_user_event(e),
-                None => bail!("Stream of events from SME unexpectedly ended"),
-            },
             new_fidl_client = new_fidl_clients.next() => match new_fidl_client {
-                Some(c) => fidl_clients.push(serve_fidl_endpoint(Arc::clone(&sme), c)),
+                Some(c) => fidl_clients.push(serve_fidl_endpoint(sme, c)),
                 None => bail!("New FIDL client stream unexpectedly ended"),
             },
             // Drive clients towards completion
@@ -87,14 +74,7 @@
     }
 }
 
-fn handle_user_event(e: UserEvent<Tokens>) {
-    match e {
-        UserEvent::JoinMeshFinished { token, result } => token.send(result).unwrap_or_else(|_| ()),
-        UserEvent::LeaveMeshFinished { token, result } => token.send(result).unwrap_or_else(|_| ()),
-    }
-}
-
-async fn serve_fidl_endpoint(sme: Arc<Mutex<Sme>>, endpoint: Endpoint) {
+async fn serve_fidl_endpoint(sme: &Mutex<Sme>, endpoint: Endpoint) {
     const MAX_CONCURRENT_REQUESTS: usize = 1000;
     let stream = match endpoint.into_stream() {
         Ok(s) => s,
@@ -104,14 +84,14 @@
         }
     };
     let r = await!(stream.try_for_each_concurrent(MAX_CONCURRENT_REQUESTS, move |request| {
-        handle_fidl_request(Arc::clone(&sme), request)
+        handle_fidl_request(&sme, request)
     }));
     if let Err(e) = r {
         error!("Error serving a FIDL client of Mesh SME: {}", e);
     }
 }
 
-async fn handle_fidl_request(sme: Arc<Mutex<Sme>>, request: fidl_sme::MeshSmeRequest)
+async fn handle_fidl_request(sme: &Mutex<Sme>, request: fidl_sme::MeshSmeRequest)
     -> Result<(), ::fidl::Error>
 {
     match request {
@@ -126,11 +106,10 @@
     }
 }
 
-async fn join_mesh(sme: Arc<Mutex<Sme>>, config: fidl_sme::MeshConfig)
+async fn join_mesh(sme: &Mutex<Sme>, config: fidl_sme::MeshConfig)
     -> fidl_sme::JoinMeshResultCode
 {
-    let (sender, receiver) = oneshot::channel();
-    sme.lock().unwrap().on_join_command(sender, mesh_sme::Config {
+    let receiver = sme.lock().unwrap().on_join_command(mesh_sme::Config {
         mesh_id: config.mesh_id,
         channel: config.channel,
     });
@@ -151,9 +130,8 @@
     }
 }
 
-async fn leave_mesh(sme: Arc<Mutex<Sme>>) -> fidl_sme::LeaveMeshResultCode {
-    let (sender, receiver) = oneshot::channel();
-    sme.lock().unwrap().on_leave_command(sender);
+async fn leave_mesh(sme: &Mutex<Sme>) -> fidl_sme::LeaveMeshResultCode {
+    let receiver = sme.lock().unwrap().on_leave_command();
     let r = await!(receiver).unwrap_or_else(|_| {
         error!("Responder for Leave Mesh command was dropped without sending a response");
         mesh_sme::LeaveMeshResult::InternalError
diff --git a/lib/rust/wlan-sme/src/mesh/mod.rs b/lib/rust/wlan-sme/src/mesh/mod.rs
index 0ac253b..a11f28f 100644
--- a/lib/rust/wlan-sme/src/mesh/mod.rs
+++ b/lib/rust/wlan-sme/src/mesh/mod.rs
@@ -5,7 +5,7 @@
 use {
     fidl_fuchsia_wlan_common::{self as fidl_common},
     fidl_fuchsia_wlan_mlme::{self as fidl_mlme, MlmeEvent},
-    futures::channel::mpsc,
+    futures::channel::{mpsc, oneshot},
     log::{error},
     std::mem,
     wlan_common::channel::{Channel, Cbw},
@@ -14,6 +14,7 @@
         DeviceInfo,
         MlmeRequest,
         phy_selection::get_device_band_info,
+        responder::Responder,
         sink::MlmeSink,
         timer::TimedEvent,
     },
@@ -22,79 +23,58 @@
 const DEFAULT_BEACON_PERIOD: u16 = 1000;
 const DEFAULT_DTIM_PERIOD: u8 = 1;
 
-
-// A token is an opaque value that identifies a particular request from a user.
-// To avoid parameterizing over many different token types, we introduce a helper
-// trait that enables us to group them into a single generic parameter.
-pub trait Tokens {
-    type JoinToken;
-    type LeaveToken;
-}
-
-mod internal {
-    pub type UserSink<T> = crate::sink::UnboundedSink<super::UserEvent<T>>;
-}
-use self::internal::*;
-
-pub type UserStream<T> = mpsc::UnboundedReceiver<UserEvent<T>>;
-
 // A list of pending join/leave requests to be maintained in the intermediate
 // 'Joining'/'Leaving' states where we are waiting for a reply from MLME and cannot
 // serve the requests immediately.
-struct PendingRequests<T: Tokens> {
-    leave: Vec<T::LeaveToken>,
-    join: Option<(T::JoinToken, Config)>,
+struct PendingRequests {
+    leave: Vec<Responder<LeaveMeshResult>>,
+    join: Option<(Responder<JoinMeshResult>, Config)>,
 }
 
-impl<T: Tokens> PendingRequests<T> {
+impl PendingRequests {
     pub fn new() -> Self {
         PendingRequests { leave: Vec::new(), join: None }
     }
 
-    pub fn enqueue_leave(&mut self, user_sink: &UserSink<T>, token: T::LeaveToken) {
-        self.replace_join_request(user_sink, None);
-        self.leave.push(token);
+    pub fn enqueue_leave(&mut self, responder: Responder<LeaveMeshResult>) {
+        self.replace_join_request(None);
+        self.leave.push(responder);
     }
 
-    pub fn enqueue_join(&mut self, user_sink: &UserSink<T>, token: T::JoinToken, config: Config) {
-        self.replace_join_request(user_sink, Some((token, config)));
+    pub fn enqueue_join(&mut self, responder: Responder<JoinMeshResult>, config: Config) {
+        self.replace_join_request(Some((responder, config)));
     }
 
     pub fn is_empty(&self) -> bool {
         self.leave.is_empty() && self.join.is_none()
     }
 
-    fn replace_join_request(
-        &mut self,
-        user_sink: &UserSink<T>,
-        req: Option<(T::JoinToken, Config)>)
-    {
-        if let Some((old_token, _)) = mem::replace(&mut self.join, req) {
-            report_join_finished(user_sink, old_token, JoinMeshResult::Canceled);
+    fn replace_join_request(&mut self, req: Option<(Responder<JoinMeshResult>, Config)>) {
+        if let Some((old_responder, _)) = mem::replace(&mut self.join, req) {
+            old_responder.respond(JoinMeshResult::Canceled);
         }
     }
 }
 
-enum State<T: Tokens> {
+enum State {
     Idle,
     Joining {
-        token: T::JoinToken,
+        responder: Responder<JoinMeshResult>,
         config: Config,
-        pending: PendingRequests<T>,
+        pending: PendingRequests,
     },
     Joined {
         config: Config,
     },
     Leaving {
         config: Config,
-        pending: PendingRequests<T>,
+        pending: PendingRequests,
     }
 }
 
-pub struct MeshSme<T: Tokens> {
+pub struct MeshSme {
     mlme_sink: MlmeSink,
-    user_sink: UserSink<T>,
-    state: Option<State<T>>,
+    state: Option<State>,
     device_info: DeviceInfo,
 }
 
@@ -121,82 +101,69 @@
     InternalError,
 }
 
-// A message from the Mesh node to a user or a group of listeners
-#[derive(Debug)]
-pub enum UserEvent<T: Tokens> {
-    JoinMeshFinished {
-        token: T::JoinToken,
-        result: JoinMeshResult,
-    },
-    LeaveMeshFinished {
-        token: T::LeaveToken,
-        result: LeaveMeshResult,
-    }
-}
-
-impl<T: Tokens> MeshSme<T> {
-    pub fn on_join_command(&mut self, token: T::JoinToken, config: Config) {
+impl MeshSme {
+    pub fn on_join_command(&mut self, config: Config) -> oneshot::Receiver<JoinMeshResult> {
+        let (responder, receiver) = Responder::new();
         if let Err(result) = validate_config(&config) {
-            report_join_finished(&self.user_sink, token, result);
-            return;
+            responder.respond(result);
+            return receiver;
         }
         self.state = Some(match self.state.take().unwrap() {
             State::Idle => {
                 self.mlme_sink.send(MlmeRequest::Start(create_start_request(&config)));
-                State::Joining { token, pending: PendingRequests::new(), config }
+                State::Joining { responder, pending: PendingRequests::new(), config }
             },
-            State::Joining { token: cur_token, config: cur_config, mut pending } => {
-                pending.enqueue_join(&self.user_sink, token, config);
-                State::Joining { token: cur_token, config: cur_config, pending }
+            State::Joining { responder: cur_responder, config: cur_config, mut pending } => {
+                pending.enqueue_join(responder, config);
+                State::Joining { responder: cur_responder, config: cur_config, pending }
             },
             State::Joined { config: cur_config } => {
                 self.mlme_sink.send(MlmeRequest::Stop(create_stop_request()));
                 let mut pending = PendingRequests::new();
-                pending.enqueue_join(&self.user_sink, token, config);
+                pending.enqueue_join(responder, config);
                 State::Leaving { config: cur_config, pending }
             },
             State::Leaving { config: cur_config, mut pending } => {
-                pending.enqueue_join(&self.user_sink, token, config);
+                pending.enqueue_join(responder, config);
                 State::Leaving { config: cur_config, pending }
             }
         });
+        receiver
     }
 
-    pub fn on_leave_command(&mut self, token: T::LeaveToken) {
+    pub fn on_leave_command(&mut self) -> oneshot::Receiver<LeaveMeshResult> {
+        let (responder, receiver) = Responder::new();
         self.state = Some(match self.state.take().unwrap() {
             State::Idle => {
-                report_leave_finished(&self.user_sink, token, LeaveMeshResult::Success);
+                responder.respond(LeaveMeshResult::Success);
                 State::Idle
             },
-            State::Joining { token: cur_token, config, mut pending } => {
-                pending.enqueue_leave(&self.user_sink, token);
-                State::Joining { token: cur_token, pending, config }
+            State::Joining { responder: cur_responder, config, mut pending } => {
+                pending.enqueue_leave(responder);
+                State::Joining { responder: cur_responder, pending, config }
             },
             State::Joined { config } => {
                 self.mlme_sink.send(MlmeRequest::Stop(create_stop_request()));
                 let mut pending = PendingRequests::new();
-                pending.enqueue_leave(&self.user_sink, token);
+                pending.enqueue_leave(responder);
                 State::Leaving { config, pending }
             },
             State::Leaving { config, mut pending } => {
-                pending.enqueue_leave(&self.user_sink, token);
+                pending.enqueue_leave(responder);
                 State::Leaving { config, pending }
             }
         });
+        receiver
     }
 }
 
-fn on_back_to_idle<T: Tokens>(
-    pending: PendingRequests<T>,
-    user_sink: &UserSink<T>,
-    mlme_sink: &MlmeSink
-) -> State<T> {
-    for token in pending.leave {
-        report_leave_finished(user_sink, token, LeaveMeshResult::Success);
+fn on_back_to_idle(pending: PendingRequests, mlme_sink: &MlmeSink) -> State {
+    for responder in pending.leave {
+        responder.respond(LeaveMeshResult::Success);
     }
-    if let Some((token, config)) = pending.join {
+    if let Some((responder, config)) = pending.join {
         mlme_sink.send(MlmeRequest::Start(create_start_request(&config)));
-        State::Joining { token, config, pending: PendingRequests::new() }
+        State::Joining { responder, config, pending: PendingRequests::new() }
     } else {
         State::Idle
     }
@@ -236,16 +203,16 @@
     fidl_mlme::StopRequest { ssid: vec![], }
 }
 
-impl<T: Tokens> super::Station for MeshSme<T> {
+impl super::Station for MeshSme {
     type Event = ();
 
     fn on_mlme_event(&mut self, event: MlmeEvent) {
         self.state = Some(match self.state.take().unwrap() {
             State::Idle => State::Idle,
-            State::Joining { token, pending, config } => match event {
+            State::Joining { responder, pending, config } => match event {
                 MlmeEvent::StartConf { resp } => match resp.result_code {
                     fidl_mlme::StartResultCodes::Success => {
-                        report_join_finished(&self.user_sink, token, JoinMeshResult::Success);
+                        responder.respond(JoinMeshResult::Success);
                         if pending.is_empty() {
                             State::Joined { config }
                         } else {
@@ -258,11 +225,11 @@
                     },
                     other => {
                         error!("failed to join mesh: {:?}", other);
-                        report_join_finished(&self.user_sink, token, JoinMeshResult::InternalError);
-                        on_back_to_idle(pending, &self.user_sink, &self.mlme_sink)
+                        responder.respond(JoinMeshResult::InternalError);
+                        on_back_to_idle(pending, &self.mlme_sink)
                     }
                 },
-                _ => State::Joining { token, pending, config },
+                _ => State::Joining { responder, pending, config },
             },
             State::Joined { config } => match event {
                 MlmeEvent::IncomingMpOpenAction { action } => {
@@ -303,16 +270,14 @@
             State::Leaving { config, pending } => match event {
                 MlmeEvent::StopConf { resp } => match resp.result_code {
                     fidl_mlme::StopResultCodes::Success =>
-                        on_back_to_idle(pending, &self.user_sink, &self.mlme_sink),
+                        on_back_to_idle(pending, &self.mlme_sink),
                     other => {
                         error!("failed to leave mesh: {:?}", other);
-                        for token in pending.leave {
-                            report_leave_finished(
-                                    &self.user_sink, token, LeaveMeshResult::InternalError);
+                        for responder in pending.leave {
+                            responder.respond(LeaveMeshResult::InternalError);
                         }
-                        if let Some((token, _)) = pending.join {
-                            report_join_finished(
-                                    &self.user_sink, token, JoinMeshResult::InternalError);
+                        if let Some((responder, _)) = pending.join {
+                            responder.respond(JoinMeshResult::InternalError);
                         }
                         State::Joined { config }
                     }
@@ -348,30 +313,15 @@
     })
 }
 
-fn report_join_finished<T: Tokens>(user_sink: &UserSink<T>,
-                                   token: T::JoinToken,
-                                   result: JoinMeshResult)
-{
-    user_sink.send(UserEvent::JoinMeshFinished { token, result });
-}
-
-fn report_leave_finished<T: Tokens>(user_sink: &UserSink<T>, token: T::LeaveToken,
-                                    result: LeaveMeshResult)
-{
-    user_sink.send(UserEvent::LeaveMeshFinished { token, result });
-}
-
-impl<T: Tokens> MeshSme<T> {
-    pub fn new(device_info: DeviceInfo) -> (Self, crate::MlmeStream, UserStream<T>) {
+impl MeshSme {
+    pub fn new(device_info: DeviceInfo) -> (Self, crate::MlmeStream) {
         let (mlme_sink, mlme_stream) = mpsc::unbounded();
-        let (user_sink, user_stream) = mpsc::unbounded();
         let sme = MeshSme {
             mlme_sink: MlmeSink::new(mlme_sink),
-            user_sink: UserSink::new(user_sink),
             state: Some(State::Idle),
             device_info,
         };
-        (sme, mlme_stream, user_stream)
+        (sme, mlme_stream)
     }
 }