blob: 971ba7b9684cd732e8f5b04e0f563a36eaba6583 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
crate::{
runtime::{EHandle, PacketReceiver, ReceiverRegistration},
OnSignalsRef,
},
fuchsia_zircon::{self as zx, AsHandleRef},
std::{
sync::{Arc, Mutex},
task::{ready, Context, Poll, Waker},
},
};
const OBJECT_PEER_CLOSED: zx::Signals = zx::Signals::OBJECT_PEER_CLOSED;
const OBJECT_READABLE: zx::Signals = zx::Signals::OBJECT_READABLE;
const OBJECT_WRITABLE: zx::Signals = zx::Signals::OBJECT_WRITABLE;
/// State of an object when it is ready for reading.
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
pub enum ReadableState {
/// Received `OBJECT_READABLE`, or optimistically assuming the object is readable.
Readable,
/// Received `OBJECT_PEER_CLOSED`.
Closed,
/// Both `Readable` and `Closed` apply.
ReadableAndClosed,
}
/// State of an object when it is ready for writing.
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
pub enum WritableState {
/// Received `OBJECT_WRITABLE`, or optimistically assuming the object is writable.
Writable,
/// Received `OBJECT_PEER_CLOSED`.
Closed,
}
/// A `Handle` that receives notifications when it is readable.
///
/// # Examples
///
/// ```
/// loop {
/// ready!(self.poll_readable(cx))?;
/// match /* make read syscall */ {
/// Err(zx::Status::SHOULD_WAIT) => ready!(self.need_readable(cx)?),
/// status => return Poll::Ready(status),
/// }
/// }
/// ```
pub trait ReadableHandle {
/// If the object is ready for reading, returns `Ready` with the readable
/// state. If the implementor returns Pending, it should first ensure that
/// `need_readable` is called.
///
/// This should be called in a poll function. If the syscall returns
/// `SHOULD_WAIT`, you must call `need_readable` to schedule wakeup when the
/// object is readable.
///
/// The returned `ReadableState` does not necessarily reflect an observed
/// `OBJECT_READABLE` signal. We optimistically assume the object remains
/// readable until `need_readable` is called.
fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<Result<ReadableState, zx::Status>>;
/// Arranges for the current task to be woken when the object receives an
/// `OBJECT_READABLE` or `OBJECT_PEER_CLOSED` signal. This can return
/// Poll::Ready if the object has already been signaled in which case the
/// waker *will* not be woken and it is the caller's responsibility to not
/// lose the signal.
fn need_readable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>>;
}
/// A `Handle` that receives notifications when it is writable.
///
/// # Examples
///
/// ```
/// loop {
/// ready!(self.poll_writable(cx))?;
/// match /* make write syscall */ {
/// Err(zx::Status::SHOULD_WAIT) => ready!(self.need_writable(cx)?),
/// status => Poll::Ready(status),
/// }
/// }
/// ```
pub trait WritableHandle {
/// If the object is ready for writing, returns `Ready` with the writable
/// state. If the implementor returns Pending, it should first ensure that
/// `need_writable` is called.
///
/// This should be called in a poll function. If the syscall returns
/// `SHOULD_WAIT`, you must call `need_writable` to schedule wakeup when the
/// object is writable.
///
/// The returned `WritableState` does not necessarily reflect an observed
/// `OBJECT_WRITABLE` signal. We optimistically assume the object remains
/// writable until `need_writable` is called.
fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<Result<WritableState, zx::Status>>;
/// Arranges for the current task to be woken when the object receives an
/// `OBJECT_WRITABLE` or `OBJECT_PEER_CLOSED` signal. This can return
/// Poll::Ready if the object has already been signaled in which case the
/// waker *will* not be woken and it is the caller's responsibility to not
/// lose the signal.
fn need_writable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>>;
}
struct RWPacketReceiver(Mutex<Inner>);
struct Inner {
signals: zx::Signals,
read_task: Option<Waker>,
write_task: Option<Waker>,
}
impl PacketReceiver for RWPacketReceiver {
fn receive_packet(&self, packet: zx::Packet) {
let new = if let zx::PacketContents::SignalOne(p) = packet.contents() {
p.observed()
} else {
return;
};
// We wake the tasks when the lock isn't held in case the wakers need the same lock.
let mut read_task = None;
let mut write_task = None;
{
let mut inner = self.0.lock().unwrap();
let old = inner.signals;
inner.signals |= new;
let became_readable = new.contains(OBJECT_READABLE) && !old.contains(OBJECT_READABLE);
let became_writable = new.contains(OBJECT_WRITABLE) && !old.contains(OBJECT_WRITABLE);
let became_closed =
new.contains(OBJECT_PEER_CLOSED) && !old.contains(OBJECT_PEER_CLOSED);
if became_readable || became_closed {
read_task = inner.read_task.take();
}
if became_writable || became_closed {
write_task = inner.write_task.take();
}
}
// *NOTE*: This is the only safe place to wake wakers. In any other location, there is a
// risk that locks are held which might be required when the waker is woken. It is safe to
// wake here because this is called from the executor when no locks are held.
if let Some(read_task) = read_task {
read_task.wake();
}
if let Some(write_task) = write_task {
write_task.wake();
}
}
}
/// A `Handle` that receives notifications when it is readable/writable.
pub struct RWHandle<T> {
handle: T,
receiver: ReceiverRegistration<RWPacketReceiver>,
}
impl<T> RWHandle<T>
where
T: AsHandleRef,
{
/// Creates a new `RWHandle` object which will receive notifications when
/// the underlying handle becomes readable, writable, or closes.
///
/// # Panics
///
/// If called outside the context of an active async executor.
pub fn new(handle: T) -> Self {
let ehandle = EHandle::local();
let initial_signals = OBJECT_READABLE | OBJECT_WRITABLE;
let receiver = ehandle.register_receiver(Arc::new(RWPacketReceiver(Mutex::new(Inner {
// Optimistically assume that the handle is readable and writable.
// Reads and writes will be attempted before queueing a packet.
// This makes handles slightly faster to read/write the first time
// they're accessed after being created, provided they start off as
// readable or writable. In return, there will be an extra wasted
// syscall per read/write if the handle is not readable or writable.
signals: initial_signals,
read_task: None,
write_task: None,
}))));
RWHandle { handle, receiver }
}
/// Returns a reference to the underlying handle.
pub fn get_ref(&self) -> &T {
&self.handle
}
/// Returns a mutable reference to the underlying handle.
pub fn get_mut(&mut self) -> &mut T {
&mut self.handle
}
/// Consumes `self` and returns the underlying handle.
pub fn into_inner(self) -> T {
self.handle
}
/// Returns true if the object received the `OBJECT_PEER_CLOSED` signal.
pub fn is_closed(&self) -> bool {
let signals = self.receiver().0.lock().unwrap().signals;
if signals.contains(OBJECT_PEER_CLOSED) {
return true;
}
// The signals bitset might not be updated if we haven't gotten around to processing the
// packet telling us that yet. To provide an up-to-date response, we query the current
// state of the signal.
//
// Note: we _could_ update the bitset with what we find here, if we're careful to also
// update READABLE + WRITEABLE at the same time, and also wakeup the tasks as necessary.
// But having `is_closed` wakeup tasks if it discovered a signal change seems too weird, so
// we just leave the bitset as-is and let the regular notification mechanism get around to
// it when it gets around to it.
match self.handle.wait_handle(OBJECT_PEER_CLOSED, zx::Time::INFINITE_PAST) {
Ok(_) => true,
Err(zx::Status::TIMED_OUT) => false,
Err(status) => {
// None of the other documented error statuses should be possible, either the type
// system doesn't allow it or the wait from `RWHandle::new()` would have already
// failed.
unreachable!("status: {status}")
}
}
}
/// Returns a future that completes when `is_closed()` is true.
pub fn on_closed(&self) -> OnSignalsRef<'_> {
OnSignalsRef::new(self.handle.as_handle_ref(), OBJECT_PEER_CLOSED)
}
fn receiver(&self) -> &RWPacketReceiver {
self.receiver.receiver()
}
fn need_signal(
&self,
cx: &mut Context<'_>,
for_read: bool,
signal: zx::Signals,
) -> Poll<Result<(), zx::Status>> {
let mut inner = self.receiver.0.lock().unwrap();
let old = inner.signals;
if old.contains(zx::Signals::OBJECT_PEER_CLOSED) {
// We don't want to return an error here because even though the peer has closed, the
// object could still have queued messages that can be read.
Poll::Ready(Ok(()))
} else {
let waker = cx.waker().clone();
if for_read {
inner.read_task = Some(waker);
} else {
inner.write_task = Some(waker);
}
if old.contains(signal) {
inner.signals &= !signal;
std::mem::drop(inner);
self.handle.wait_async_handle(
self.receiver.port(),
self.receiver.key(),
signal | zx::Signals::OBJECT_PEER_CLOSED,
zx::WaitAsyncOpts::empty(),
)?;
}
Poll::Pending
}
}
}
impl<T> ReadableHandle for RWHandle<T>
where
T: AsHandleRef,
{
fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<Result<ReadableState, zx::Status>> {
loop {
let signals = self.receiver().0.lock().unwrap().signals;
match (signals.contains(OBJECT_READABLE), signals.contains(OBJECT_PEER_CLOSED)) {
(true, false) => return Poll::Ready(Ok(ReadableState::Readable)),
(false, true) => return Poll::Ready(Ok(ReadableState::Closed)),
(true, true) => return Poll::Ready(Ok(ReadableState::ReadableAndClosed)),
(false, false) => {
ready!(self.need_signal(cx, true, OBJECT_READABLE)?)
}
}
}
}
fn need_readable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>> {
self.need_signal(cx, true, OBJECT_READABLE)
}
}
impl<T> WritableHandle for RWHandle<T>
where
T: AsHandleRef,
{
fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<Result<WritableState, zx::Status>> {
loop {
let signals = self.receiver().0.lock().unwrap().signals;
match (signals.contains(OBJECT_WRITABLE), signals.contains(OBJECT_PEER_CLOSED)) {
(_, true) => return Poll::Ready(Ok(WritableState::Closed)),
(true, _) => return Poll::Ready(Ok(WritableState::Writable)),
(false, false) => {
ready!(self.need_signal(cx, false, OBJECT_WRITABLE)?)
}
}
}
}
fn need_writable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>> {
self.need_signal(cx, false, OBJECT_WRITABLE)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::TestExecutor;
use fuchsia_zircon as zx;
#[test]
fn is_closed_immediately_after_close() {
let mut exec = TestExecutor::new();
let (tx, rx) = zx::Channel::create();
let rx_rw_handle = RWHandle::new(rx);
let mut noop_ctx = Context::from_waker(futures::task::noop_waker_ref());
// Clear optimistic readable state
assert!(rx_rw_handle.need_readable(&mut noop_ctx).is_pending());
// Starting state: the channel is not closed (because we haven't closed it yet)
assert_eq!(rx_rw_handle.is_closed(), false);
// we will never set readable, so this should be Pending until we close
assert_eq!(rx_rw_handle.poll_readable(&mut noop_ctx), Poll::Pending);
drop(tx);
// Implementation note: the cached state will not be updated yet
assert_eq!(rx_rw_handle.poll_readable(&mut noop_ctx), Poll::Pending);
// But is_closed should return true immediately
assert_eq!(rx_rw_handle.is_closed(), true);
// Still not updated, and won't be until we let the executor process port packets
assert_eq!(rx_rw_handle.poll_readable(&mut noop_ctx), Poll::Pending);
// So we do
let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
// And now it is updated, so we observe Closed
assert_eq!(
rx_rw_handle.poll_readable(&mut noop_ctx),
Poll::Ready(Ok(ReadableState::Closed))
);
// And is_closed should still be true, of course.
assert_eq!(rx_rw_handle.is_closed(), true);
}
}