blob: 611c811f3b32cfe3dd77c85feb86557b746104d8 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use std::fmt;
use std::future::Future;
use std::marker::PhantomData;
use std::mem;
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::task::Poll;
use crate::runtime::{EHandle, PacketReceiver, ReceiverRegistration};
use fuchsia_zircon::{self as zx, AsHandleRef};
use futures::task::{AtomicWaker, Context};
struct OnSignalsReceiver {
maybe_signals: AtomicUsize,
task: AtomicWaker,
}
impl OnSignalsReceiver {
fn get_signals(&self, cx: &mut Context<'_>) -> Poll<zx::Signals> {
let mut signals = self.maybe_signals.load(Ordering::Relaxed);
if signals == 0 {
// No signals were received-- register to receive a wakeup when they arrive.
self.task.register(cx.waker());
// Check again for signals after registering for a wakeup in case signals
// arrived between registering and the initial load of signals
signals = self.maybe_signals.load(Ordering::SeqCst);
}
if signals == 0 {
Poll::Pending
} else {
Poll::Ready(zx::Signals::from_bits_truncate(signals as u32))
}
}
fn set_signals(&self, signals: zx::Signals) {
self.maybe_signals.store(signals.bits() as usize, Ordering::SeqCst);
self.task.wake();
}
}
impl PacketReceiver for OnSignalsReceiver {
fn receive_packet(&self, packet: zx::Packet) {
let observed = if let zx::PacketContents::SignalOne(p) = packet.contents() {
p.observed()
} else {
return;
};
self.set_signals(observed);
}
}
/// A future that completes when some set of signals become available on a Handle.
#[must_use = "futures do nothing unless polled"]
pub struct OnSignals<'a, H: AsHandleRef> {
handle: H,
signals: zx::Signals,
registration: Option<ReceiverRegistration<OnSignalsReceiver>>,
phantom: PhantomData<&'a H>,
}
/// Alias for the common case where OnSignals is used with zx::HandleRef.
pub type OnSignalsRef<'a> = OnSignals<'a, zx::HandleRef<'a>>;
impl<'a, H: AsHandleRef + 'a> OnSignals<'a, H> {
/// Creates a new `OnSignals` object which will receive notifications when
/// any signals in `signals` occur on `handle`.
pub fn new(handle: H, signals: zx::Signals) -> Self {
// We don't register for the signals until first polled. When we are first polled, we'll
// check to see if the signals are set and if they are, we're done. If they aren't, we then
// register for an asynchronous notification via the port.
//
// We could change the code to register for the asynchronous notification here, but then
// when first polled, if the notification hasn't arrived, we'll still check to see if the
// signals are set (see below for the reason why). Given that the time between construction
// and when we first poll is typically small, registering here probably won't make much
// difference (and on a single-threaded executor, a notification is unlikely to be processed
// before the first poll anyway). The way we have it now means we don't have to register at
// all if the signals are already set, which will be a win some of the time.
OnSignals { handle, signals, registration: None, phantom: PhantomData }
}
/// Takes the handle.
pub fn take_handle(mut self) -> H
where
H: zx::HandleBased,
{
self.unregister();
std::mem::replace(&mut self.handle, zx::Handle::invalid().into())
}
/// This function allows the `OnSignals` object to live for the `'static` lifetime, at the cost
/// of disabling automatic cleanup of the port wait.
///
/// WARNING: Do not use unless you can guarantee that either:
/// - The future is not dropped before it completes, or
/// - The handle is dropped without creating additional OnSignals futures for it.
///
/// Creating an OnSignals calls zx_object_wait_async, which consumes a small amount of kernel
/// resources. Dropping the OnSignals calls zx_port_cancel to clean up. But calling
/// extend_lifetime disables this cleanup, since the zx_port_wait call requires a reference to
/// the handle. The port registration can also be cleaned up by closing the handle or by
/// waiting for the signal to be triggered. But if neither of these happens, the registration
/// is leaked. This wastes kernel memory and the kernel will eventually kill your process to
/// force a cleanup.
///
/// Note that `OnSignals` will not fire if the handle that was used to create it is dropped or
/// transferred to another process.
// TODO(https://fxbug.dev/42182035): Try to remove this footgun.
pub fn extend_lifetime(mut self) -> LeakedOnSignals {
match self.registration.take() {
Some(r) => LeakedOnSignals { registration: Ok(r) },
None => LeakedOnSignals { registration: self.register(None) },
}
}
fn register(
&self,
cx: Option<&mut Context<'_>>,
) -> Result<ReceiverRegistration<OnSignalsReceiver>, zx::Status> {
let registration = EHandle::local().register_receiver(Arc::new(OnSignalsReceiver {
maybe_signals: AtomicUsize::new(0),
task: AtomicWaker::new(),
}));
// If a context has been supplied, we must register it now before calling
// `wait_async_handle` below to avoid races.
if let Some(cx) = cx {
registration.task.register(cx.waker());
}
self.handle.wait_async_handle(
registration.port(),
registration.key(),
self.signals,
zx::WaitAsyncOpts::empty(),
)?;
Ok(registration)
}
fn unregister(&mut self) {
if let Some(registration) = self.registration.take() {
if registration.receiver().maybe_signals.load(Ordering::SeqCst) == 0 {
// Ignore the error from zx_port_cancel, because it might just be a race condition.
// If the packet is handled between the above maybe_signals check and the port
// cancel, it will fail with ZX_ERR_NOT_FOUND, and we can't do anything about it.
let _ = registration.port().cancel(&self.handle, registration.key());
}
}
}
}
impl<H: AsHandleRef + Unpin> Future for OnSignals<'_, H> {
type Output = Result<zx::Signals, zx::Status>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match &self.registration {
None => match self.handle.wait_handle(self.signals, zx::Time::INFINITE_PAST) {
Ok(signals) => Poll::Ready(Ok(signals)),
Err(zx::Status::TIMED_OUT) => {
let registration = self.register(Some(cx))?;
self.get_mut().registration = Some(registration);
Poll::Pending
}
Err(e) => Poll::Ready(Err(e)),
},
Some(r) => match r.receiver().get_signals(cx) {
Poll::Ready(signals) => Poll::Ready(Ok(signals)),
Poll::Pending => {
// We haven't received a notification for the signals, but we still want to poll
// the kernel in case the notification hasn't been processed yet by the
// executor. This behaviour is relied upon in some cases: in Component Manager,
// in some shutdown paths, it wants to drain and process all messages in
// channels before it closes them. There is no other reliable way to flush a
// pending notification (particularly on a multi-threaded executor). This will
// incur a small performance penalty in the case that this future has been
// polled when no notification was actually received (such as can be the case
// with some futures combinators).
match self.handle.wait_handle(self.signals, zx::Time::INFINITE_PAST) {
Ok(signals) => Poll::Ready(Ok(signals)),
Err(_) => Poll::Pending,
}
}
},
}
}
}
impl<H: AsHandleRef> fmt::Debug for OnSignals<'_, H> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "OnSignals")
}
}
impl<H: AsHandleRef> Drop for OnSignals<'_, H> {
fn drop(&mut self) {
self.unregister();
}
}
impl<H: AsHandleRef> AsHandleRef for OnSignals<'_, H> {
fn as_handle_ref(&self) -> zx::HandleRef<'_> {
self.handle.as_handle_ref()
}
}
impl<H: AsHandleRef> AsRef<H> for OnSignals<'_, H> {
fn as_ref(&self) -> &H {
&self.handle
}
}
pub struct LeakedOnSignals {
registration: Result<ReceiverRegistration<OnSignalsReceiver>, zx::Status>,
}
impl Future for LeakedOnSignals {
type Output = Result<zx::Signals, zx::Status>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let reg = self.registration.as_mut().map_err(|e| mem::replace(e, zx::Status::OK))?;
reg.receiver().get_signals(cx).map(Ok)
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::TestExecutor;
use assert_matches::assert_matches;
use futures::{
future::{pending, FutureExt},
task::{waker, ArcWake},
};
use std::pin::pin;
#[test]
fn wait_for_event() -> Result<(), zx::Status> {
let mut exec = crate::TestExecutor::new();
let mut deliver_events =
|| assert!(exec.run_until_stalled(&mut pending::<()>()).is_pending());
let event = zx::Event::create();
let mut signals = OnSignals::new(&event, zx::Signals::EVENT_SIGNALED);
let (waker, waker_count) = futures_test::task::new_count_waker();
let cx = &mut std::task::Context::from_waker(&waker);
// Check that `signals` is still pending before the event has been signaled
assert_eq!(signals.poll_unpin(cx), Poll::Pending);
deliver_events();
assert_eq!(waker_count, 0);
assert_eq!(signals.poll_unpin(cx), Poll::Pending);
// signal the event and check that `signals` has been woken up and is
// no longer pending
event.signal_handle(zx::Signals::NONE, zx::Signals::EVENT_SIGNALED)?;
deliver_events();
assert_eq!(waker_count, 1);
assert_eq!(signals.poll_unpin(cx), Poll::Ready(Ok(zx::Signals::EVENT_SIGNALED)));
Ok(())
}
#[test]
fn drop_before_event() {
let mut fut = std::pin::pin!(async {
let ehandle = EHandle::local();
let event = zx::Event::create();
let mut signals = OnSignals::new(&event, zx::Signals::EVENT_SIGNALED);
assert_eq!(futures::poll!(&mut signals), Poll::Pending);
let key = signals.registration.as_ref().unwrap().key();
std::mem::drop(signals);
assert!(ehandle.port().cancel(&event, key) == Err(zx::Status::NOT_FOUND));
// try again but with extend_lifetime
let signals = OnSignals::new(&event, zx::Signals::EVENT_SIGNALED).extend_lifetime();
let key = signals.registration.as_ref().unwrap().key();
std::mem::drop(signals);
assert!(ehandle.port().cancel(&event, key) == Ok(()));
});
assert!(TestExecutor::new().run_until_stalled(&mut fut).is_ready());
}
#[test]
fn test_always_polls() {
let mut exec = TestExecutor::new();
let (rx, tx) = zx::Channel::create();
let mut fut = pin!(OnSignals::new(&rx, zx::Signals::CHANNEL_READABLE));
assert_eq!(exec.run_until_stalled(&mut fut), Poll::Pending);
tx.write(b"hello", &mut []).expect("write failed");
struct Waker;
impl ArcWake for Waker {
fn wake_by_ref(_arc_self: &Arc<Self>) {}
}
// Poll the future directly which guarantees the port notification for the write hasn't
// arrived.
assert_matches!(
fut.poll(&mut Context::from_waker(&waker(Arc::new(Waker)))),
Poll::Ready(Ok(signals)) if signals.contains(zx::Signals::CHANNEL_READABLE)
);
}
#[test]
fn test_take_handle() {
let mut exec = TestExecutor::new();
let (rx, tx) = zx::Channel::create();
let mut fut = OnSignals::new(rx, zx::Signals::CHANNEL_READABLE);
assert_eq!(exec.run_until_stalled(&mut fut), Poll::Pending);
tx.write(b"hello", &mut []).expect("write failed");
assert_matches!(exec.run_until_stalled(&mut fut), Poll::Ready(Ok(_)));
let mut message = zx::MessageBuf::new();
fut.take_handle().read(&mut message).unwrap();
assert_eq!(message.bytes(), b"hello");
}
}