blob: 75e6005f64993f8db6d0f26cfe5d0e42ffa03dc3 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
crate::executor::{EHandle, PacketReceiver, ReceiverRegistration},
fuchsia_zircon::{self as zx, AsHandleRef},
futures::task::{AtomicWaker, Context, Poll},
std::sync::{
atomic::{AtomicU32, Ordering},
Arc,
},
};
const OBJECT_PEER_CLOSED: zx::Signals = zx::Signals::OBJECT_PEER_CLOSED;
const OBJECT_READABLE: zx::Signals = zx::Signals::OBJECT_READABLE;
const OBJECT_WRITABLE: zx::Signals = zx::Signals::OBJECT_WRITABLE;
struct RWPacketReceiver {
signals: AtomicU32,
read_task: AtomicWaker,
write_task: AtomicWaker,
}
impl PacketReceiver for RWPacketReceiver {
fn receive_packet(&self, packet: zx::Packet) {
let new = if let zx::PacketContents::SignalOne(p) = packet.contents() {
p.observed()
} else {
return;
};
let old =
zx::Signals::from_bits_truncate(self.signals.fetch_or(new.bits(), Ordering::SeqCst));
let became_readable = new.contains(OBJECT_READABLE) && !old.contains(OBJECT_READABLE);
let became_writable = new.contains(OBJECT_WRITABLE) && !old.contains(OBJECT_WRITABLE);
let became_closed = new.contains(OBJECT_PEER_CLOSED) && !old.contains(OBJECT_PEER_CLOSED);
if became_readable || became_closed {
self.read_task.wake();
}
if became_writable || became_closed {
self.write_task.wake();
}
}
}
/// A `Handle` that receives notifications when it is readable/writable.
pub struct RWHandle<T> {
handle: T,
receiver: ReceiverRegistration<RWPacketReceiver>,
}
impl<T> RWHandle<T>
where
T: AsHandleRef,
{
/// Creates a new `RWHandle` object which will receive notifications when
/// the underlying handle becomes readable, writable, or closes.
pub fn new(handle: T) -> Result<Self, zx::Status> {
let ehandle = EHandle::local();
let initial_signals = OBJECT_READABLE | OBJECT_WRITABLE;
let receiver = ehandle.register_receiver(Arc::new(RWPacketReceiver {
// Optimistically assume that the handle is readable and writable.
// Reads and writes will be attempted before queueing a packet.
// This makes handles slightly faster to read/write the first time
// they're accessed after being created, provided they start off as
// readable or writable. In return, there will be an extra wasted
// syscall per read/write if the handle is not readable or writable.
signals: AtomicU32::new(initial_signals.bits()),
read_task: AtomicWaker::new(),
write_task: AtomicWaker::new(),
}));
let rwhandle = RWHandle { handle, receiver };
// Make sure we get notifications when the handle closes.
rwhandle.schedule_packet(OBJECT_PEER_CLOSED)?;
Ok(rwhandle)
}
/// Returns a reference to the underlying handle.
pub fn get_ref(&self) -> &T {
&self.handle
}
/// Returns a mutable reference to the underlying handle.
pub fn get_mut(&mut self) -> &mut T {
&mut self.handle
}
/// Consumes this type, returning the inner handle.
pub fn into_inner(self) -> T {
self.handle
}
/// Tests to see if the channel received a OBJECT_PEER_CLOSED signal
pub fn is_closed(&self) -> bool {
let signals =
zx::Signals::from_bits_truncate(self.receiver().signals.load(Ordering::Relaxed));
signals.contains(OBJECT_PEER_CLOSED)
}
/// Tests if the resource currently has either the provided `signal`
/// or the OBJECT_PEER_CLOSED signal set.
///
/// Returns `true` if the CLOSED signal was set.
fn poll_signal_or_closed(
&self,
cx: &mut Context<'_>,
task: &AtomicWaker,
signal: zx::Signals,
) -> Poll<Result<bool, zx::Status>> {
let signals =
zx::Signals::from_bits_truncate(self.receiver().signals.load(Ordering::SeqCst));
let was_closed = signals.contains(OBJECT_PEER_CLOSED);
let was_signal = signals.contains(signal);
if was_closed || was_signal {
Poll::Ready(Ok(was_closed))
} else {
self.need_signal(cx, task, signal, was_closed)?;
Poll::Pending
}
}
/// Tests to see if this resource is ready to be read from.
/// If it is not, it arranges for the current task to receive a notification
/// when a "readable" signal arrives. Returns `true` if the CLOSED
/// signal was set, in which case it should be reset if a successive
/// read shows that the object was not closed.
pub fn poll_read(&self, cx: &mut Context<'_>) -> Poll<Result<bool, zx::Status>> {
self.poll_signal_or_closed(cx, &self.receiver.read_task, OBJECT_READABLE)
}
/// Tests to see if this resource is ready to be written to.
/// If it is not, it arranges for the current task to receive a notification
/// when a "writable" signal arrives. Returns `true` if the CLOSED
/// signal was set, in which case it should be reset if a successive
/// write shows that the object was not closed.
pub fn poll_write(&self, cx: &mut Context<'_>) -> Poll<Result<bool, zx::Status>> {
self.poll_signal_or_closed(cx, &self.receiver.write_task, OBJECT_WRITABLE)
}
fn receiver(&self) -> &RWPacketReceiver {
self.receiver.receiver()
}
/// Arranges for the current task to receive a notification when a
/// given signal arrives.
///
/// `clear_closed` indicates that we previously mistakenly thought
/// the channel was closed due to a false signal, and we should
/// now reset the CLOSED bit. This value should often be passed in directly
/// from the output of `poll_XXX`.
fn need_signal(
&self,
cx: &mut Context<'_>,
task: &AtomicWaker,
signal: zx::Signals,
clear_closed: bool,
) -> Result<(), zx::Status> {
crate::executor::need_signal(
cx,
task,
&self.receiver.signals,
signal,
clear_closed,
self.handle.as_handle_ref(),
self.receiver.port(),
self.receiver.key(),
)
}
/// Arranges for the current task to receive a notification when a
/// "readable" signal arrives.
///
/// `clear_closed` indicates that we previously mistakenly thought
/// the channel was closed due to a false signal, and we should
/// now reset the CLOSED bit. This value should often be passed in directly
/// from the output of `poll_read`.
pub fn need_read(&self, cx: &mut Context<'_>, clear_closed: bool) -> Result<(), zx::Status> {
self.need_signal(cx, &self.receiver.read_task, OBJECT_READABLE, clear_closed)
}
/// Arranges for the current task to receive a notification when a
/// "writable" signal arrives.
pub fn need_write(&self, cx: &mut Context<'_>, clear_closed: bool) -> Result<(), zx::Status> {
self.need_signal(cx, &self.receiver.write_task, OBJECT_WRITABLE, clear_closed)
}
fn schedule_packet(&self, signals: zx::Signals) -> Result<(), zx::Status> {
crate::executor::schedule_packet(
self.handle.as_handle_ref(),
self.receiver.port(),
self.receiver.key(),
signals,
)
}
}