|  | // Copyright 2020 The Fuchsia Authors. All rights reserved. | 
|  | // Use of this source code is governed by a BSD-style license that can be | 
|  | // found in the LICENSE file. | 
|  |  | 
|  | use { | 
|  | crate::injectors::ProtocolInjector, | 
|  | anyhow::Error, | 
|  | async_trait::async_trait, | 
|  | fidl_fidl_examples_routing_echo as fecho, | 
|  | futures::{channel::*, lock::Mutex, sink::SinkExt, StreamExt}, | 
|  | std::sync::Arc, | 
|  | }; | 
|  |  | 
|  | #[must_use = "invoke resume() otherwise the client will be halted indefinitely!"] | 
|  | pub struct Echo { | 
|  | pub message: String, | 
|  | // This Sender is used to unblock the client that sent the echo. | 
|  | responder: Option<oneshot::Sender<()>>, | 
|  | } | 
|  |  | 
|  | impl Echo { | 
|  | pub fn resume(mut self) { | 
|  | if let Some(responder) = self.responder.take() { | 
|  | responder.send(()).unwrap(); | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | impl Drop for Echo { | 
|  | fn drop(&mut self) { | 
|  | if let Some(responder) = self.responder.take() { | 
|  | responder.send(()).unwrap(); | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | #[derive(Clone)] | 
|  | pub struct EchoSender { | 
|  | tx: Arc<Mutex<mpsc::Sender<Echo>>>, | 
|  | } | 
|  |  | 
|  | impl EchoSender { | 
|  | fn new(tx: mpsc::Sender<Echo>) -> Self { | 
|  | Self { tx: Arc::new(Mutex::new(tx)) } | 
|  | } | 
|  |  | 
|  | /// Sends the event to a receiver. Returns a responder which can be blocked on. | 
|  | async fn send(&self, message: String) -> Result<oneshot::Receiver<()>, Error> { | 
|  | let (responder_tx, responder_rx) = oneshot::channel(); | 
|  | { | 
|  | let mut tx = self.tx.lock().await; | 
|  | tx.send(Echo { message, responder: Some(responder_tx) }).await?; | 
|  | } | 
|  | Ok(responder_rx) | 
|  | } | 
|  | } | 
|  |  | 
|  | pub struct EchoReceiver { | 
|  | rx: mpsc::Receiver<Echo>, | 
|  | } | 
|  |  | 
|  | impl EchoReceiver { | 
|  | fn new(rx: mpsc::Receiver<Echo>) -> Self { | 
|  | Self { rx } | 
|  | } | 
|  |  | 
|  | /// Receives the next invocation from the sender. | 
|  | pub async fn next(&mut self) -> Option<Echo> { | 
|  | self.rx.next().await | 
|  | } | 
|  | } | 
|  |  | 
|  | /// Capability that serves the Echo FIDL protocol in one task and allows | 
|  | /// another task to wait on a echo arriving via a EchoReceiver. | 
|  | #[derive(Clone)] | 
|  | pub struct EchoCapability { | 
|  | tx: EchoSender, | 
|  | } | 
|  |  | 
|  | impl EchoCapability { | 
|  | pub fn new() -> (Arc<Self>, EchoReceiver) { | 
|  | let (tx, rx) = mpsc::channel(0); | 
|  | let sender = EchoSender::new(tx); | 
|  | let receiver = EchoReceiver::new(rx); | 
|  | (Arc::new(Self { tx: sender }), receiver) | 
|  | } | 
|  | } | 
|  |  | 
|  | #[async_trait] | 
|  | impl ProtocolInjector for EchoCapability { | 
|  | type Marker = fecho::EchoMarker; | 
|  |  | 
|  | async fn serve( | 
|  | self: Arc<Self>, | 
|  | mut request_stream: fecho::EchoRequestStream, | 
|  | ) -> Result<(), Error> { | 
|  | // Start listening to requests from the client. | 
|  | while let Some(Ok(fecho::EchoRequest::EchoString { value: Some(input), responder })) = | 
|  | request_stream.next().await | 
|  | { | 
|  | let echo = self.tx.send(input.clone()).await?; | 
|  | echo.await?; | 
|  | // Respond to the client with the echo string. | 
|  | responder.send(Some(&input))?; | 
|  | } | 
|  | Ok(()) | 
|  | } | 
|  | } |