blob: 6cdf12926ef6727b460b25c1908c86f27153efa4 [file] [log] [blame]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use fuchsia_zircon::{self as zx, AsHandleRef};
use futures::task::AtomicWaker;
use rustc_hash::FxHashMap as HashMap;
use std::{
ops::Deref,
sync::{
atomic::{AtomicU32, Ordering},
Arc,
},
task::{Context, Poll},
u64, usize,
};
use super::common::EHandle;
/// Clears `signal` on `atomic_signals`, then schedules a packet to wake `task` when the object
/// referred to by `handle` asserts `signal` or `OBJECT_PEER_CLOSED`. If `atomic_signals` contains
/// `OBJECT_PEER_CLOSED` already, wakes `task` immediately. To avoid unnecessary packets, does
/// nothing if `signal` was already cleared. If a packet is scheduled, the `OBJECT_PEER_CLOSED`
/// signal is _always_ included as a signal of interest.
pub fn need_signal_or_peer_closed(
cx: &mut Context<'_>,
task: &AtomicWaker,
atomic_signals: &AtomicU32,
signal: zx::Signals,
handle: zx::HandleRef<'_>,
port: &zx::Port,
key: u64,
) -> Poll<Result<(), zx::Status>> {
task.register(cx.waker());
let old =
zx::Signals::from_bits_truncate(atomic_signals.fetch_and(!signal.bits(), Ordering::SeqCst));
if old.contains(zx::Signals::OBJECT_PEER_CLOSED) {
// We don't want to return an error here because even though the peer has closed, the
// object could still have queued messages that can be read.
Poll::Ready(Ok(()))
} else {
if old.contains(signal) {
schedule_packet(handle, port, key, signal | zx::Signals::OBJECT_PEER_CLOSED)?;
}
Poll::Pending
}
}
/// A trait for handling the arrival of a packet on a `zx::Port`.
///
/// This trait should be implemented by users who wish to write their own
/// types which receive asynchronous notifications from a `zx::Port`.
/// Implementors of this trait generally contain a `futures::task::AtomicWaker` which
/// is used to wake up the task which can make progress due to the arrival of
/// the packet.
///
/// `PacketReceiver`s should be registered with a `Core` using the
/// `register_receiver` method on `Core`, `Handle`, or `Remote`.
/// Upon registration, users will receive a `ReceiverRegistration`
/// which provides `key` and `port` methods. These methods can be used to wait on
/// asynchronous signals.
///
/// Note that `PacketReceiver`s may receive false notifications intended for a
/// previous receiver, and should handle these gracefully.
pub trait PacketReceiver: Send + Sync + 'static {
/// Receive a packet when one arrives.
fn receive_packet(&self, packet: zx::Packet);
}
// Simple slab::Slab replacement that doesn't re-use keys
// TODO(https://fxbug.dev/42119369): figure out how to safely cancel async waits so we can re-use keys again.
pub(crate) struct PacketReceiverMap<T> {
next_key: usize,
pub mapping: HashMap<usize, T>,
}
impl<T> PacketReceiverMap<T> {
pub fn new() -> Self {
Self { next_key: 0, mapping: HashMap::default() }
}
pub fn get(&self, key: usize) -> Option<&T> {
self.mapping.get(&key)
}
pub fn insert(&mut self, val: T) -> usize {
let key = self.next_key;
self.next_key = self.next_key.checked_add(1).expect("ran out of keys");
self.mapping.insert(key, val);
key
}
pub fn remove(&mut self, key: usize) -> T {
self.mapping.remove(&key).unwrap_or_else(|| panic!("invalid key"))
}
pub fn contains(&self, key: usize) -> bool {
self.mapping.contains_key(&key)
}
}
pub fn schedule_packet(
handle: zx::HandleRef<'_>,
port: &zx::Port,
key: u64,
signals: zx::Signals,
) -> Result<(), zx::Status> {
handle.wait_async_handle(port, key, signals, zx::WaitAsyncOpts::empty())
}
/// A registration of a `PacketReceiver`.
/// When dropped, it will automatically deregister the `PacketReceiver`.
// NOTE: purposefully does not implement `Clone`.
#[derive(Debug)]
pub struct ReceiverRegistration<T: PacketReceiver> {
pub(super) receiver: Arc<T>,
pub(super) ehandle: EHandle,
pub(super) key: u64,
}
impl<T> ReceiverRegistration<T>
where
T: PacketReceiver,
{
/// The key with which `Packet`s destined for this receiver should be sent on the `zx::Port`.
pub fn key(&self) -> u64 {
self.key
}
/// The internal `PacketReceiver`.
pub fn receiver(&self) -> &T {
&*self.receiver
}
/// The `zx::Port` on which packets destined for this `PacketReceiver` should be queued.
pub fn port(&self) -> &zx::Port {
self.ehandle.port()
}
}
impl<T: PacketReceiver> Deref for ReceiverRegistration<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
self.receiver()
}
}
impl<T> Drop for ReceiverRegistration<T>
where
T: PacketReceiver,
{
fn drop(&mut self) {
self.ehandle.deregister_receiver(self.key);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn packet_receiver_map_does_not_reuse_keys() {
#[derive(Debug, Copy, Clone, PartialEq)]
struct DummyPacketReceiver {
id: i32,
}
let mut map = PacketReceiverMap::<DummyPacketReceiver>::new();
let e1 = DummyPacketReceiver { id: 1 };
assert_eq!(map.insert(e1), 0);
assert_eq!(map.insert(e1), 1);
// Still doesn't reuse IDs after one is removed
map.remove(1);
assert_eq!(map.insert(e1), 2);
}
}