| // Copyright 2018 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| use { |
| crate::executor::{EHandle, PacketReceiver, ReceiverRegistration}, |
| fuchsia_zircon::{self as zx, AsHandleRef}, |
| futures::task::{AtomicWaker, LocalWaker, Poll}, |
| std::sync::{ |
| atomic::{AtomicUsize, Ordering}, |
| Arc, |
| }, |
| }; |
| |
| const READABLE: usize = 0b001; |
| const WRITABLE: usize = 0b010; |
| const CLOSED: usize = 0b100; |
| |
| struct RWPacketReceiver { |
| signals: AtomicUsize, |
| read_task: AtomicWaker, |
| write_task: AtomicWaker, |
| } |
| |
| impl PacketReceiver for RWPacketReceiver { |
| fn receive_packet(&self, packet: zx::Packet) { |
| let observed = if let zx::PacketContents::SignalOne(p) = packet.contents() { |
| p.observed() |
| } else { |
| return; |
| }; |
| |
| let new = |
| 0 | (if observed.contains(zx::Signals::OBJECT_READABLE) { |
| READABLE |
| } else { |
| 0 |
| }) | (if observed.contains(zx::Signals::OBJECT_WRITABLE) { |
| WRITABLE |
| } else { |
| 0 |
| }) | (if observed.contains(zx::Signals::OBJECT_PEER_CLOSED) { |
| CLOSED |
| } else { |
| 0 |
| }); |
| |
| let old = self.signals.fetch_or(new, Ordering::SeqCst); |
| |
| let became_readable = ((new & READABLE) != 0) && ((old & READABLE) == 0); |
| let became_writable = ((new & WRITABLE) != 0) && ((old & WRITABLE) == 0); |
| let became_closed = ((new & CLOSED) != 0) && ((old & CLOSED) == 0); |
| |
| if became_readable || became_closed { |
| self.read_task.wake(); |
| } |
| if became_writable || became_closed { |
| self.write_task.wake(); |
| } |
| } |
| } |
| |
| /// A `Handle` that receives notifications when it is readable/writable. |
| pub struct RWHandle<T> { |
| handle: T, |
| receiver: ReceiverRegistration<RWPacketReceiver>, |
| } |
| |
| impl<T> RWHandle<T> |
| where |
| T: AsHandleRef, |
| { |
| /// Creates a new `RWHandle` object which will receive notifications when |
| /// the underlying handle becomes readable, writable, or closes. |
| pub fn new(handle: T) -> Result<Self, zx::Status> { |
| let ehandle = EHandle::local(); |
| |
| let receiver = ehandle.register_receiver(Arc::new(RWPacketReceiver { |
| // Optimistically assume that the handle is readable and writable. |
| // Reads and writes will be attempted before queueing a packet. |
| // This makes handles slightly faster to read/write the first time |
| // they're accessed after being created, provided they start off as |
| // readable or writable. In return, there will be an extra wasted |
| // syscall per read/write if the handle is not readable or writable. |
| signals: AtomicUsize::new(READABLE | WRITABLE), |
| read_task: AtomicWaker::new(), |
| write_task: AtomicWaker::new(), |
| })); |
| |
| let rwhandle = RWHandle { handle, receiver }; |
| |
| // Make sure we get notifications when the handle closes. |
| rwhandle.schedule_packet(zx::Signals::OBJECT_PEER_CLOSED)?; |
| |
| Ok(rwhandle) |
| } |
| |
| /// Returns a reference to the underlying handle. |
| pub fn get_ref(&self) -> &T { |
| &self.handle |
| } |
| |
| /// Returns a mutable reference to the underlying handle. |
| pub fn get_mut(&mut self) -> &mut T { |
| &mut self.handle |
| } |
| |
| /// Consumes this type, returning the inner handle. |
| pub fn into_inner(self) -> T { |
| self.handle |
| } |
| |
| /// Tests to see if the channel received a OBJECT_PEER_CLOSED signal |
| pub fn is_closed(&self) -> bool { |
| (self.receiver().signals.load(Ordering::Relaxed) & CLOSED) != 0 |
| } |
| |
| /// Tests to see if this resource is ready to be read from. |
| /// If it is not, it arranges for the current task to receive a notification |
| /// when a "readable" signal arrives. |
| pub fn poll_read(&self, lw: &LocalWaker) -> Poll<Result<(), zx::Status>> { |
| if (self.receiver().signals.load(Ordering::SeqCst) & (READABLE | CLOSED)) != 0 { |
| Poll::Ready(Ok(())) |
| } else { |
| self.need_read(lw)?; |
| Poll::Pending |
| } |
| } |
| |
| /// Tests to see if this resource is ready to be read from. |
| /// If it is not, it arranges for the current task to receive a notification |
| /// when a "writable" signal arrives. |
| pub fn poll_write(&self, lw: &LocalWaker) -> Poll<Result<(), zx::Status>> { |
| if (self.receiver().signals.load(Ordering::SeqCst) & (WRITABLE | CLOSED)) != 0 { |
| Poll::Ready(Ok(())) |
| } else { |
| self.need_write(lw)?; |
| Poll::Pending |
| } |
| } |
| |
| fn receiver(&self) -> &RWPacketReceiver { |
| self.receiver.receiver() |
| } |
| |
| /// Arranges for the current task to receive a notification when a |
| /// "readable" signal arrives. |
| pub fn need_read(&self, lw: &LocalWaker) -> Result<(), zx::Status> { |
| self.receiver().read_task.register(lw); |
| let old = self |
| .receiver() |
| .signals |
| .fetch_and(!READABLE, Ordering::SeqCst); |
| // We only need to schedule a new packet if one isn't already scheduled. |
| // If READABLE was already false, a packet was already scheduled. |
| if (old & READABLE) != 0 { |
| self.schedule_packet(zx::Signals::OBJECT_READABLE)?; |
| } |
| if (old & CLOSED) != 0 { |
| // We just missed a channel close-- go around again. |
| lw.wake(); |
| } |
| Ok(()) |
| } |
| |
| /// Arranges for the current task to receive a notification when a |
| /// "writable" signal arrives. |
| pub fn need_write(&self, lw: &LocalWaker) -> Result<(), zx::Status> { |
| self.receiver().write_task.register(lw); |
| let old = self |
| .receiver() |
| .signals |
| .fetch_and(!WRITABLE, Ordering::SeqCst); |
| // We only need to schedule a new packet if one isn't already scheduled. |
| // If WRITABLE was already false, a packet was already scheduled. |
| if (old & WRITABLE) != 0 { |
| self.schedule_packet(zx::Signals::OBJECT_WRITABLE)?; |
| } |
| if (old & CLOSED) != 0 { |
| // We just missed a channel close-- go around again. |
| lw.wake(); |
| } |
| Ok(()) |
| } |
| |
| fn schedule_packet(&self, signals: zx::Signals) -> Result<(), zx::Status> { |
| self.handle.wait_async_handle( |
| self.receiver.port(), |
| self.receiver.key(), |
| signals, |
| zx::WaitAsyncOpts::Once, |
| ) |
| } |
| } |