| // Copyright 2018 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| // This module contains the bulk of the logic for connecting user applications to a |
| // vsock driver. |
| // |
| // Handling user requests is complicated as there are multiple communication channels |
| // involved. For example a request to 'connect' will result in sending a message |
| // to the driver over the single DeviceProxy. If this returns with success then |
| // eventually a message will come over the single Callbacks stream indicating |
| // whether the remote accepted or rejected. |
| // |
| // Fundamentally then there needs to be mutual exclusion in accessing DeviceProxy, |
| // and de-multiplexing of incoming messages on the Callbacks stream. There are |
| // a two high level options for doing this. |
| // 1. Force a single threaded event driver model. This would mean that additional |
| // asynchronous executions are never spawned, and any use of await! or otherwise |
| // blocking with additional futures requires collection futures in future sets |
| // or having custom polling logic etc. Whilst this is probably the most resource |
| // efficient it restricts the service to be single threaded forever by its design, |
| // is harder to reason about as cannot be written very idiomatically with futures |
| // and is even more complicated to avoid blocking other requests whilst waiting |
| // on responses from the driver. |
| // 2. Allow multiple asynchronous executions and use some form of message passing |
| // and locking to handle DeviceProxy access and sharing access to the Callbacks |
| // stream. Potentially more resource intensive with 'unneccessary' locking etc, |
| // but allows for the potential to have actual parallel execution and is much |
| // simpler to write the logic. |
| // The chosen option is (2) and the access to DeviceProxy is handled with an Arc<Mutex<State>>, |
| // and de-multiplexing of the Callbacks is done by registering an event whilst holding |
| // the mutex, and having a single asynchronous thread that is dedicated to converting |
| // incoming Callbacks to signaling registered events. |
| |
| use { |
| crate::{addr, port}, |
| crossbeam, |
| failure::{err_msg, Fail}, |
| fidl::endpoints, |
| fidl_fuchsia_hardware_vsock::{ |
| CallbacksMarker, CallbacksRequest, CallbacksRequestStream, DeviceProxy, |
| }, |
| fidl_fuchsia_vsock::{ |
| AcceptorProxy, ConnectionRequest, ConnectionRequestStream, ConnectionTransport, |
| ConnectorRequest, ConnectorRequestStream, |
| }, |
| fuchsia_async as fasync, |
| fuchsia_syslog::{fx_log_info, fx_log_warn}, |
| fuchsia_zircon as zx, |
| futures::{ |
| channel::{mpsc, oneshot}, |
| future, select, Future, FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, |
| }, |
| parking_lot::Mutex, |
| std::{ |
| collections::HashMap, |
| ops::Deref, |
| pin::Pin, |
| sync::Arc, |
| task::{LocalWaker, Poll}, |
| }, |
| }; |
| |
| #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] |
| enum EventType { |
| Shutdown, |
| VmoComplete, |
| Response, |
| } |
| |
| #[derive(Debug, Clone, PartialEq, Eq, Hash)] |
| struct Event { |
| action: EventType, |
| addr: addr::Vsock, |
| } |
| |
| #[derive(Debug, Clone, Eq, PartialEq, Hash)] |
| enum Deregister { |
| Event(Event), |
| Listen(u32), |
| Port(u32), |
| } |
| |
| #[derive(Fail, Debug)] |
| enum Error { |
| #[fail(display = "Driver returned failure status {}", _0)] |
| Driver(#[fail(cause)] zx::Status), |
| #[fail(display = "All ephemeral ports are allocated")] |
| OutOfPorts, |
| #[fail(display = "Addr has already been bound")] |
| AlreadyBound, |
| #[fail(display = "Connection refused by remote")] |
| ConnectionRefused, |
| #[fail(display = "Error whilst communication with client")] |
| ClientCommunication(#[fail(cause)] failure::Error), |
| #[fail(display = "Error whilst communication with client")] |
| DriverCommunication(#[fail(cause)] failure::Error), |
| #[fail(display = "Driver reset the connection")] |
| ConnectionReset, |
| } |
| |
| impl From<oneshot::Canceled> for Error { |
| fn from(_: oneshot::Canceled) -> Error { |
| Error::ConnectionReset |
| } |
| } |
| |
| impl Error { |
| pub fn into_status(&self) -> zx::Status { |
| match self { |
| Error::Driver(status) => *status, |
| Error::OutOfPorts => zx::Status::NO_RESOURCES, |
| Error::AlreadyBound => zx::Status::ALREADY_BOUND, |
| Error::ConnectionRefused => zx::Status::UNAVAILABLE, |
| Error::ClientCommunication(err) | Error::DriverCommunication(err) => *err |
| .downcast_ref::<zx::Status>() |
| .unwrap_or(&zx::Status::INTERNAL), |
| Error::ConnectionReset => zx::Status::PEER_CLOSED, |
| } |
| } |
| pub fn is_comm_failure(&self) -> bool { |
| match self { |
| Error::ClientCommunication(_) | Error::DriverCommunication(_) => true, |
| _ => false, |
| } |
| } |
| } |
| |
| fn map_driver_result(result: Result<i32, fidl::Error>) -> Result<(), Error> { |
| result |
| .map_err(|x| Error::DriverCommunication(x.into())) |
| .and_then(|x| zx::Status::ok(x).map_err(Error::Driver)) |
| } |
| |
| fn send_result<T>( |
| result: Result<T, Error>, send: impl FnOnce(i32, Option<T>) -> Result<(), fidl::Error>, |
| ) -> Result<(), Error> { |
| match result { |
| Ok(v) => send(zx::Status::OK.into_raw(), Some(v)) |
| .map_err(|e| Error::ClientCommunication(e.into())), |
| Err(e) => { |
| send(e.into_status().into_raw(), None) |
| .map_err(|e| Error::ClientCommunication(e.into()))?; |
| Err(e) |
| } |
| } |
| } |
| |
| pub enum Never {} |
| |
| struct State { |
| device: DeviceProxy, |
| events: HashMap<Event, oneshot::Sender<()>>, |
| used_ports: port::Tracker, |
| listens: HashMap<u32, mpsc::UnboundedSender<addr::Vsock>>, |
| } |
| |
| pub struct LockedState { |
| inner: Mutex<State>, |
| deregister_tx: crossbeam::channel::Sender<Deregister>, |
| deregister_rx: crossbeam::channel::Receiver<Deregister>, |
| } |
| |
| #[derive(Clone)] |
| pub struct Vsock { |
| inner: Arc<LockedState>, |
| } |
| |
| impl Vsock { |
| /// Creates a new vsock service connected to the given `DeviceProxy` |
| /// |
| /// The creation is asynchronous due to need to invoke methods on the given `DeviceProxy`. On |
| /// success a pair of `Self, impl Future<Result<_, Error>>` is returned. The `impl Future` is |
| /// a future that is listening for and processing messages from the `device`. This future needs |
| /// to be evaluated for other methods on the returned `Self` to complete successfully. Unless |
| /// a fatal error occurs the future will never yield a result and will execute infinitely. |
| pub async fn new( |
| device: DeviceProxy, |
| ) -> Result<(Self, impl Future<Output = Result<Never, failure::Error>>), failure::Error> { |
| let (callbacks_client, callbacks_server) = |
| endpoints::create_endpoints::<CallbacksMarker>()?; |
| let server_stream = callbacks_server.into_stream()?; |
| |
| await!(device |
| .start(callbacks_client) |
| .map(|x| map_driver_result(x)) |
| .err_into::<failure::Error>())?; |
| |
| let service = State { |
| device, |
| events: HashMap::new(), |
| used_ports: port::Tracker::new(), |
| listens: HashMap::new(), |
| }; |
| let (tx, rx) = crossbeam::channel::unbounded(); |
| let service = LockedState { |
| inner: Mutex::new(service), |
| deregister_tx: tx, |
| deregister_rx: rx, |
| }; |
| let service = Vsock { |
| inner: Arc::new(service), |
| }; |
| let callback_loop = service.clone().run_callbacks(server_stream); |
| Ok((service, callback_loop)) |
| } |
| async fn run_callbacks( |
| self, mut callbacks: CallbacksRequestStream, |
| ) -> Result<Never, failure::Error> { |
| while let Some(Ok(cb)) = await!(callbacks.next()) { |
| self.lock().do_callback(cb); |
| } |
| // The only way to get here is if our callbacks stream ended, since our notifications |
| // cannot disconnect as we are holding a reference to them in |service|. |
| Err(err_msg("Driver disconnected")) |
| } |
| |
| // Spawns a new asynchronous thread for listening for incoming connections on a port. |
| fn start_listener( |
| &self, acceptor: fidl::endpoints::ClientEnd<fidl_fuchsia_vsock::AcceptorMarker>, |
| local_port: u32, |
| ) -> Result<(), Error> { |
| let acceptor = acceptor |
| .into_proxy() |
| .map_err(|x| Error::ClientCommunication(x.into()))?; |
| let stream = self.listen_port(local_port)?; |
| fasync::spawn( |
| self.clone() |
| .run_connection_listener(stream, acceptor) |
| .unwrap_or_else(|err| fx_log_warn!("Error {} running connection listener", err)), |
| ); |
| Ok(()) |
| } |
| |
| // Handles a single incoming client request. |
| async fn handle_request(&self, request: ConnectorRequest) -> Result<(), Error> { |
| match request { |
| ConnectorRequest::Connect { |
| remote_cid, |
| remote_port, |
| con, |
| responder, |
| } => send_result( |
| await!(self.make_connection(remote_cid, remote_port, con)), |
| |r, v| responder.send(r, v.unwrap_or(0)), |
| ), |
| ConnectorRequest::Listen { |
| local_port, |
| acceptor, |
| responder, |
| } => send_result(self.start_listener(acceptor, local_port), |r, _| { |
| responder.send(r) |
| }), |
| } |
| } |
| |
| /// Evaluates messages on a `ConnectorRequestStream` until completion or error |
| /// |
| /// Takes ownership of a RequestStream that is most likely created from a ServicesServer |
| /// and processes any incoming requests on it. |
| pub async fn run_client_connection( |
| self, request: ConnectorRequestStream, |
| ) -> Result<(), failure::Error> { |
| let self_ref = &self; |
| await!(request |
| .map_err(|err| Error::ClientCommunication(err.into())) |
| // TODO: The parallel limit of 4 is currently invented with no basis and should |
| // made something more sensible. |
| .try_for_each_concurrent(4, async move |request| await!( |
| self_ref.handle_request(request) |
| ) |
| .or_else(|e| if e.is_comm_failure() { Err(e) } else { Ok(()) })) |
| .err_into()) |
| } |
| fn alloc_ephemeral_port(self) -> Option<AllocatedPort> { |
| let p = self.lock().used_ports.allocate(); |
| p.map(|p| AllocatedPort { |
| port: p, |
| service: self, |
| }) |
| } |
| // Creates a `ListenStream` that will retrieve raw incoming connection requests. |
| // These requests come from the device via the run_callbacks future. |
| fn listen_port(&self, port: u32) -> Result<ListenStream, Error> { |
| if port::is_ephemeral(port) { |
| fx_log_info!("Rejecting request to listen on ephemeral port {}", port); |
| return Err(Error::ConnectionRefused); |
| } |
| match self.lock().listens.entry(port) { |
| std::collections::hash_map::Entry::Vacant(entry) => { |
| let (sender, receiver) = mpsc::unbounded(); |
| let listen = ListenStream { |
| local_port: port, |
| service: self.clone(), |
| stream: receiver, |
| }; |
| entry.insert(sender); |
| Ok(listen) |
| } |
| _ => { |
| fx_log_info!("Attempt to listen on already bound port {}", port); |
| Err(Error::AlreadyBound) |
| } |
| } |
| } |
| |
| // Helper for inserting an event into the events hashmap |
| fn register_event(&self, event: Event) -> Result<OneshotEvent, Error> { |
| match self.lock().events.entry(event) { |
| std::collections::hash_map::Entry::Vacant(entry) => { |
| let (sender, receiver) = oneshot::channel(); |
| let event = OneshotEvent { |
| event: Some(entry.key().clone()), |
| service: self.clone(), |
| oneshot: receiver, |
| }; |
| entry.insert(sender); |
| Ok(event) |
| } |
| _ => Err(Error::AlreadyBound), |
| } |
| } |
| |
| // These helpers are wrappers around sending a message to the device, and creating events that |
| // will be signaled by the run_callbacks future when it receives a message from the device. |
| fn send_request( |
| &self, addr: &addr::Vsock, data: zx::Socket, |
| ) -> Result<impl Future<Output = Result<(OneshotEvent, OneshotEvent), Error>>, Error> { |
| let shutdown_callback = self.register_event(Event { |
| action: EventType::Shutdown, |
| addr: addr.clone(), |
| })?; |
| let response_callback = self.register_event(Event { |
| action: EventType::Response, |
| addr: addr.clone(), |
| })?; |
| |
| let send_request_fut = self.lock().device.send_request(&mut addr.clone(), data); |
| |
| Ok(async move { |
| map_driver_result(await!(send_request_fut))?; |
| Ok((shutdown_callback, response_callback)) |
| }) |
| } |
| fn send_response( |
| &self, addr: &addr::Vsock, data: zx::Socket, |
| ) -> Result<impl Future<Output = Result<OneshotEvent, Error>>, Error> { |
| let shutdown_callback = self.register_event(Event { |
| action: EventType::Shutdown, |
| addr: addr.clone(), |
| })?; |
| |
| let send_request_fut = self.lock().device.send_response(&mut addr.clone(), data); |
| |
| Ok(async move { |
| map_driver_result(await!(send_request_fut))?; |
| Ok(shutdown_callback) |
| }) |
| } |
| fn send_vmo( |
| &self, addr: &addr::Vsock, vmo: zx::Vmo, off: u64, len: u64, |
| ) -> Result<impl Future<Output = Result<OneshotEvent, Error>>, Error> { |
| let vmo_callback = self.register_event(Event { |
| action: EventType::VmoComplete, |
| addr: addr.clone(), |
| })?; |
| |
| let send_request_fut = self |
| .lock() |
| .device |
| .send_vmo(&mut addr.clone(), vmo, off, len); |
| |
| Ok(async move { |
| map_driver_result(await!(send_request_fut))?; |
| Ok(vmo_callback) |
| }) |
| } |
| |
| // Runs a connected socket until completion. Processes any VMO sends and shutdown events. |
| async fn run_connection<ShutdownFut>( |
| self, addr: addr::Vsock, shutdown_event: ShutdownFut, |
| mut requests: ConnectionRequestStream, _port: Option<AllocatedPort>, |
| ) -> Result<(), Error> |
| where |
| ShutdownFut: |
| Future<Output = Result<(), futures::channel::oneshot::Canceled>> + std::marker::Unpin, |
| { |
| // This extremely awkward function definition is to temporarily work around select! not being |
| // nestable. Once this is fixed then this should be re-inlined into the single call site below. |
| // Until then don't look closely at this. |
| async fn wait_vmo_complete<ShutdownFut>( |
| mut shutdown_event: &mut futures::future::Fuse<ShutdownFut>, cb: OneshotEvent, |
| ) -> Result<zx::Status, Result<(), Error>> |
| where |
| ShutdownFut: Future<Output = Result<(), futures::channel::oneshot::Canceled>> |
| + std::marker::Unpin, |
| { |
| select! { |
| shutdown_event = shutdown_event => {Err(shutdown_event.map_err(|e| e.into()))}, |
| cb = cb.fuse() => match cb { |
| Ok(_) => Ok(zx::Status::OK), |
| Err(_) => Ok(Error::ConnectionReset.into_status()), |
| }, |
| } |
| } |
| let mut shutdown_event = shutdown_event.fuse(); |
| loop { |
| select! { |
| shutdown_event = shutdown_event => { |
| return await!(future::ready(shutdown_event) |
| .err_into() |
| .and_then(|()| self.lock().send_rst(&addr)) |
| ); |
| }, |
| request = requests.next() => { |
| match request { |
| Some(Ok(ConnectionRequest::Shutdown{control_handle: _control_handle})) => { |
| return await!( |
| self.lock().send_shutdown(&addr) |
| // Wait to either receive the RST for the client or to be |
| // shut down for some other reason |
| .and_then(|()| shutdown_event.err_into()) |
| ); |
| }, |
| Some(Ok(ConnectionRequest::SendVmo{vmo, off, len, responder})) => { |
| // Acquire the potential future from send_vmo in a temporary so we |
| // can await! on it without holding the lock. |
| let result = self.send_vmo(&addr, vmo, off, len); |
| // Equivalent of and_then to expand the Ok future case. |
| let result = match result { |
| Ok(fut) => await!(fut), |
| Err(e) => Err(e), |
| }; |
| let status = match result { |
| Ok(cb) => { |
| match await!(wait_vmo_complete(&mut shutdown_event, cb)) { |
| Err(e) => return e, |
| Ok(o) => o, |
| } |
| }, |
| Err(e) => e.into_status(), |
| }; |
| |
| let _ = responder.send(status.into_raw()); |
| }, |
| // Generate a RST for a non graceful client disconnect. |
| Some(Err(e)) => { |
| await!(self.lock().send_rst(&addr))?; |
| return Err(Error::ClientCommunication(e.into())); |
| }, |
| None => { |
| return await!(self.lock().send_rst(&addr)); |
| }, |
| } |
| }, |
| } |
| } |
| } |
| |
| // Waits for incoming connections on the given `ListenStream`, checks with the |
| // user via the `acceptor` if it should be accepted, and if so spawns a new |
| // asynchronous thread to run the connection. |
| async fn run_connection_listener( |
| self, incoming: ListenStream, acceptor: AcceptorProxy, |
| ) -> Result<(), Error> { |
| await!(incoming |
| .then(|addr| acceptor |
| .accept(&mut *addr.clone()) |
| .map_ok(|maybe_con| (maybe_con, addr))) |
| .map_err(|e| Error::ClientCommunication(e.into())) |
| .try_for_each(|(maybe_con, addr)| async { |
| match maybe_con { |
| Some(con) => { |
| let data = con.data; |
| let con = con |
| .con |
| .into_stream() |
| .map_err(|x| Error::ClientCommunication(x.into()))?; |
| let shutdown_event = await!(self.send_response(&addr, data)?)?; |
| fasync::spawn( |
| self.clone() |
| .run_connection(addr, shutdown_event, con, None) |
| .map_err(|err| { |
| fx_log_warn!("Error {} whilst running connection", err) |
| }) |
| .map(|_| ()), |
| ); |
| Ok(()) |
| } |
| None => await!(self.lock().send_rst(&addr)), |
| } |
| })) |
| } |
| |
| // Attempts to connect to the given remote cid/port. If successful spawns a new |
| // asynchronous thread to run the connection until completion. |
| async fn make_connection( |
| &self, remote_cid: u32, remote_port: u32, con: ConnectionTransport, |
| ) -> Result<u32, Error> { |
| let data = con.data; |
| let con = con |
| .con |
| .into_stream() |
| .map_err(|x| Error::ClientCommunication(x.into()))?; |
| let port = self |
| .clone() |
| .alloc_ephemeral_port() |
| .ok_or(Error::OutOfPorts)?; |
| let port_value = *port; |
| let addr = addr::Vsock::new(port_value, remote_port, remote_cid); |
| let (shutdown_event, response_event) = await!(self.send_request(&addr, data)?)?; |
| let mut shutdown_event = shutdown_event.fuse(); |
| select! { |
| _shutdown_event = shutdown_event => { |
| // Getting a RST here just indicates a rejection and |
| // not any underlying issues. |
| return Err(Error::ConnectionRefused); |
| }, |
| response_event = response_event.fuse() => response_event?, |
| } |
| |
| fasync::spawn( |
| self.clone() |
| .run_connection(addr, shutdown_event, con, Some(port)) |
| .unwrap_or_else(|err| fx_log_warn!("Error {} whilst running connection", err)), |
| ); |
| Ok(port_value) |
| } |
| } |
| |
| impl Deref for Vsock { |
| type Target = LockedState; |
| |
| fn deref(&self) -> &LockedState { |
| &self.inner |
| } |
| } |
| |
| impl LockedState { |
| // Acquires the lock on `inner`, and processes any pending messages |
| fn lock(&self) -> parking_lot::MutexGuard<State> { |
| let mut guard = self.inner.lock(); |
| self.deregister_rx |
| .try_iter() |
| .for_each(|e| guard.deregister(e)); |
| guard |
| } |
| // Tries to acquire the lock on `inner`, and processes any pending messages |
| // if successful |
| fn try_lock(&self) -> Option<parking_lot::MutexGuard<State>> { |
| if let Some(mut guard) = self.inner.try_lock() { |
| self.deregister_rx |
| .try_iter() |
| .for_each(|e| guard.deregister(e)); |
| Some(guard) |
| } else { |
| None |
| } |
| } |
| // Deregisters the specified event, or queues it for later deregistration if |
| // lock acquisition fails. |
| fn deregister(&self, event: Deregister) { |
| if let Some(mut service) = self.try_lock() { |
| service.deregister(event); |
| } else { |
| // Should not fail as we expect to be using an unbounded channel |
| let _ = self.deregister_tx.try_send(event); |
| } |
| } |
| } |
| |
| impl State { |
| // Remove the `event` from the `events` `HashMap` |
| fn deregister(&mut self, event: Deregister) { |
| match event { |
| Deregister::Event(e) => { |
| self.events.remove(&e); |
| } |
| Deregister::Listen(p) => { |
| self.listens.remove(&p); |
| } |
| Deregister::Port(p) => { |
| self.used_ports.free(p); |
| } |
| } |
| } |
| |
| // Wrappers around device functions with nicer type signatures |
| fn send_rst(&mut self, addr: &addr::Vsock) -> impl Future<Output = Result<(), Error>> { |
| self.device |
| .send_rst(&mut addr.clone()) |
| .map(|x| map_driver_result(x)) |
| } |
| fn send_shutdown(&mut self, addr: &addr::Vsock) -> impl Future<Output = Result<(), Error>> { |
| self.device |
| .send_shutdown(&mut addr.clone()) |
| .map(|x| map_driver_result(x)) |
| } |
| |
| // Processes a single callback from the `device`. This is intended to be used by |
| // `Vsock::run_callbacks` |
| fn do_callback(&mut self, callback: CallbacksRequest) { |
| match callback { |
| CallbacksRequest::Response { |
| addr, |
| control_handle: _control_handle, |
| } => { |
| self.events |
| .remove(&Event { |
| action: EventType::Response, |
| addr: addr::Vsock::from(addr), |
| }) |
| .map(|channel| channel.send(())); |
| } |
| CallbacksRequest::Rst { |
| addr, |
| control_handle: _control_handle, |
| } => { |
| self.events.remove(&Event { |
| action: EventType::Shutdown, |
| addr: addr::Vsock::from(addr), |
| }); |
| } |
| CallbacksRequest::SendVmoComplete { |
| addr, |
| control_handle: _control_handle, |
| } => { |
| self.events |
| .remove(&Event { |
| action: EventType::VmoComplete, |
| addr: addr::Vsock::from(addr), |
| }) |
| .map(|channel| channel.send(())); |
| } |
| CallbacksRequest::Request { |
| addr, |
| control_handle: _control_handle, |
| } => { |
| let addr = addr::Vsock::from(addr); |
| match self.listens.get(&addr.local_port) { |
| Some(sender) => { |
| let _ = sender.unbounded_send(addr.clone()); |
| } |
| None => { |
| fx_log_warn!("Request on port {} with no listener", addr.local_port); |
| fasync::spawn(self.send_rst(&addr).map(|_| ())); |
| } |
| } |
| } |
| CallbacksRequest::Shutdown { |
| addr, |
| control_handle: _control_handle, |
| } => { |
| self.events |
| .remove(&Event { |
| action: EventType::Shutdown, |
| addr: addr::Vsock::from(addr), |
| }) |
| .map(|channel| channel.send(())); |
| } |
| CallbacksRequest::TransportReset { |
| new_cid: _new_cid, |
| responder, |
| } => { |
| self.events.clear(); |
| let _ = responder.send(); |
| } |
| } |
| } |
| } |
| |
| struct AllocatedPort { |
| service: Vsock, |
| port: u32, |
| } |
| |
| impl Deref for AllocatedPort { |
| type Target = u32; |
| |
| fn deref(&self) -> &u32 { |
| &self.port |
| } |
| } |
| |
| impl Drop for AllocatedPort { |
| fn drop(&mut self) { |
| self.service.deregister(Deregister::Port(self.port)); |
| } |
| } |
| |
| struct OneshotEvent { |
| event: Option<Event>, |
| service: Vsock, |
| oneshot: oneshot::Receiver<()>, |
| } |
| |
| impl Drop for OneshotEvent { |
| fn drop(&mut self) { |
| self.event |
| .take() |
| .map(|e| self.service.deregister(Deregister::Event(e))); |
| } |
| } |
| |
| impl Future for OneshotEvent { |
| type Output = <oneshot::Receiver<()> as Future>::Output; |
| |
| fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> { |
| match self.oneshot.poll_unpin(lw) { |
| Poll::Ready(x) => { |
| // Take the event so that we don't try to deregister it later, |
| // as by having sent the message we just received the callbacks |
| // thread will already have removed it |
| self.event.take(); |
| Poll::Ready(x) |
| } |
| p => p, |
| } |
| } |
| } |
| |
| struct ListenStream { |
| local_port: u32, |
| service: Vsock, |
| stream: mpsc::UnboundedReceiver<addr::Vsock>, |
| } |
| |
| impl Drop for ListenStream { |
| fn drop(&mut self) { |
| self.service.deregister(Deregister::Listen(self.local_port)); |
| } |
| } |
| |
| impl Stream for ListenStream { |
| type Item = addr::Vsock; |
| |
| fn poll_next(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Option<Self::Item>> { |
| self.stream.poll_next_unpin(lw) |
| } |
| } |