blob: f012326ecd20cb70fa9fa0e0ff4b54bcf3ff414b [file] [log] [blame] [edit]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//! An implementation of a client for a fidl interface.
use crate::encoding::{
decode_transaction_header, Decode, Decoder, DefaultFuchsiaResourceDialect, DynamicFlags,
Encode, Encoder, EpitaphBody, MessageBufFor, ProxyChannelBox, ProxyChannelFor, ResourceDialect,
TransactionHeader, TransactionMessage, TransactionMessageType, TypeMarker,
};
use crate::Error;
use fuchsia_sync::Mutex;
use futures::future::{self, FusedFuture, Future, FutureExt, Map, MaybeDone};
use futures::ready;
use futures::stream::{FusedStream, Stream};
use futures::task::{Context, Poll, Waker};
use slab::Slab;
use std::collections::VecDeque;
use std::mem;
use std::ops::ControlFlow;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{RawWaker, RawWakerVTable};
use zx_status;
/// Decodes the body of `buf` as the FIDL type `T`.
#[doc(hidden)] // only exported for use in macros or generated code
pub fn decode_transaction_body<T: TypeMarker, D: ResourceDialect, const EXPECTED_ORDINAL: u64>(
mut buf: D::MessageBufEtc,
) -> Result<T::Owned, Error>
where
T::Owned: Decode<T, D>,
{
let (bytes, handles) = buf.split_mut();
let (header, body_bytes) = decode_transaction_header(bytes)?;
if header.ordinal != EXPECTED_ORDINAL {
return Err(Error::InvalidResponseOrdinal);
}
let mut output = Decode::<T, D>::new_empty();
Decoder::<D>::decode_into::<T>(&header, body_bytes, handles, &mut output)?;
Ok(output)
}
/// A FIDL client which can be used to send buffers and receive responses via a channel.
#[derive(Debug, Clone)]
pub struct Client<D: ResourceDialect = DefaultFuchsiaResourceDialect> {
inner: Arc<ClientInner<D>>,
}
/// A future representing the decoded and transformed response to a FIDL query.
pub type DecodedQueryResponseFut<T, D = DefaultFuchsiaResourceDialect> = Map<
MessageResponse<D>,
fn(Result<<D as ResourceDialect>::MessageBufEtc, Error>) -> Result<T, Error>,
>;
/// A future representing the result of a FIDL query, with early error detection available if the
/// message couldn't be sent.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct QueryResponseFut<T, D: ResourceDialect = DefaultFuchsiaResourceDialect>(
pub MaybeDone<DecodedQueryResponseFut<T, D>>,
);
impl<T: Unpin, D: ResourceDialect> FusedFuture for QueryResponseFut<T, D> {
fn is_terminated(&self) -> bool {
matches!(self.0, MaybeDone::Gone)
}
}
impl<T: Unpin, D: ResourceDialect> Future for QueryResponseFut<T, D> {
type Output = Result<T, Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
ready!(self.0.poll_unpin(cx));
let maybe_done = Pin::new(&mut self.0);
Poll::Ready(maybe_done.take_output().unwrap_or(Err(Error::PollAfterCompletion)))
}
}
impl<T> QueryResponseFut<T> {
/// Check to see if the query has an error. If there was en error sending, this returns it and
/// the error is returned, otherwise it returns self, which can then be awaited on:
/// i.e. match echo_proxy.echo("something").check() {
/// Err(e) => error!("Couldn't send: {}", e),
/// Ok(fut) => fut.await
/// }
pub fn check(self) -> Result<Self, Error> {
match self.0 {
MaybeDone::Done(Err(e)) => Err(e),
x => Ok(QueryResponseFut(x)),
}
}
}
const TXID_INTEREST_MASK: u32 = 0xFFFFFF;
const TXID_GENERATION_SHIFT: usize = 24;
const TXID_GENERATION_MASK: u8 = 0x7F;
/// A FIDL transaction id. Will not be zero for a message that includes a response.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct Txid(u32);
/// A message interest id.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
struct InterestId(usize);
impl InterestId {
fn from_txid(txid: Txid) -> Self {
InterestId((txid.0 & TXID_INTEREST_MASK) as usize - 1)
}
}
impl Txid {
fn from_interest_id(int_id: InterestId, generation: u8) -> Self {
// Base the transaction id on the slab slot + 1
// (slab slots are zero-based and txid zero is special)
let id = (int_id.0 as u32 + 1) & TXID_INTEREST_MASK;
// And a 7-bit generation number.
let generation = (generation & TXID_GENERATION_MASK) as u32;
// Combine them:
// - top bit zero to indicate a userspace generated txid.
// - 7 bits of generation
// - 24 bits based on the interest id
let txid = (generation << TXID_GENERATION_SHIFT) | id;
Txid(txid)
}
/// Get the raw u32 transaction ID.
pub fn as_raw_id(&self) -> u32 {
self.0
}
}
impl From<u32> for Txid {
fn from(txid: u32) -> Self {
Self(txid)
}
}
impl<D: ResourceDialect> Client<D> {
/// Create a new client.
///
/// `channel` is the asynchronous channel over which data is sent and received.
/// `event_ordinals` are the ordinals on which events will be received.
pub fn new(channel: D::ProxyChannel, protocol_name: &'static str) -> Client<D> {
Client {
inner: Arc::new(ClientInner {
channel: channel.boxed(),
interests: Mutex::default(),
terminal_error: Mutex::default(),
protocol_name,
}),
}
}
/// Get a reference to the client's underlying channel.
pub fn as_channel(&self) -> &D::ProxyChannel {
self.inner.channel.as_channel()
}
/// Attempt to convert the `Client` back into a channel.
///
/// This will only succeed if there are no active clones of this `Client`,
/// no currently-alive `EventReceiver` or `MessageResponse`s that came from
/// this `Client`, and no outstanding messages awaiting a response, even if
/// that response will be discarded.
pub fn into_channel(self) -> Result<D::ProxyChannel, Self> {
// We need to check the message_interests table to make sure there are no outstanding
// interests, since an interest might still exist even if all EventReceivers and
// MessageResponses have been dropped. That would lead to returning an AsyncChannel which
// could then later receive the outstanding response unexpectedly.
//
// We do try_unwrap before checking the message_interests to avoid a race where another
// thread inserts a new value into message_interests after we check
// message_interests.is_empty(), but before we get to try_unwrap. This forces us to create a
// new Arc if message_interests isn't empty, since try_unwrap destroys the original Arc.
match Arc::try_unwrap(self.inner) {
Ok(inner) => {
if inner.interests.lock().messages.is_empty() || inner.channel.is_closed() {
Ok(inner.channel.unbox())
} else {
// This creates a new arc if there are outstanding interests. This will drop
// weak references, and whilst we do create a weak reference to ClientInner if
// we use it as a waker, it doesn't matter because if we have got this far, the
// waker is obsolete: no tasks are waiting.
Err(Self { inner: Arc::new(inner) })
}
}
Err(inner) => Err(Self { inner }),
}
}
/// Retrieve the stream of event messages for the `Client`.
/// Panics if the stream was already taken.
pub fn take_event_receiver(&self) -> EventReceiver<D> {
{
let mut lock = self.inner.interests.lock();
if let EventListener::None = lock.event_listener {
lock.event_listener = EventListener::WillPoll;
} else {
panic!("Event stream was already taken");
}
}
EventReceiver { inner: self.inner.clone(), state: EventReceiverState::Active }
}
/// Encodes and sends a request without expecting a response.
pub fn send<T: TypeMarker>(
&self,
body: impl Encode<T, D>,
ordinal: u64,
dynamic_flags: DynamicFlags,
) -> Result<(), Error> {
let msg =
TransactionMessage { header: TransactionHeader::new(0, ordinal, dynamic_flags), body };
crate::encoding::with_tls_encoded::<TransactionMessageType<T>, D, ()>(
msg,
|bytes, handles| self.send_raw(bytes, handles),
)
}
/// Encodes and sends a request. Returns a future that decodes the response.
pub fn send_query<Request: TypeMarker, Response: TypeMarker, const ORDINAL: u64>(
&self,
body: impl Encode<Request, D>,
dynamic_flags: DynamicFlags,
) -> QueryResponseFut<Response::Owned, D>
where
Response::Owned: Decode<Response, D>,
{
self.send_query_and_decode::<Request, Response::Owned>(
body,
ORDINAL,
dynamic_flags,
|buf| buf.and_then(decode_transaction_body::<Response, D, ORDINAL>),
)
}
/// Encodes and sends a request. Returns a future that decodes the response
/// using the given `decode` function.
pub fn send_query_and_decode<Request: TypeMarker, Output>(
&self,
body: impl Encode<Request, D>,
ordinal: u64,
dynamic_flags: DynamicFlags,
decode: fn(Result<D::MessageBufEtc, Error>) -> Result<Output, Error>,
) -> QueryResponseFut<Output, D> {
let send_result = self.send_raw_query(|tx_id, bytes, handles| {
let msg = TransactionMessage {
header: TransactionHeader::new(tx_id.as_raw_id(), ordinal, dynamic_flags),
body,
};
Encoder::encode::<TransactionMessageType<Request>>(bytes, handles, msg)?;
Ok(())
});
QueryResponseFut(match send_result {
Ok(res_fut) => future::maybe_done(res_fut.map(decode)),
Err(e) => MaybeDone::Done(Err(e)),
})
}
/// Sends a raw message without expecting a response.
pub fn send_raw(
&self,
bytes: &[u8],
handles: &mut [<D::ProxyChannel as ProxyChannelFor<D>>::HandleDisposition],
) -> Result<(), Error> {
match self.inner.channel.write_etc(bytes, handles) {
Ok(()) | Err(None) => Ok(()),
Err(Some(e)) => Err(Error::ClientWrite(e.into())),
}
}
/// Sends a raw query and receives a response future.
pub fn send_raw_query<F>(&self, encode_msg: F) -> Result<MessageResponse<D>, Error>
where
F: for<'a, 'b> FnOnce(
Txid,
&'a mut Vec<u8>,
&'b mut Vec<<D::ProxyChannel as ProxyChannelFor<D>>::HandleDisposition>,
) -> Result<(), Error>,
{
let id = self.inner.interests.lock().register_msg_interest();
crate::encoding::with_tls_encode_buf::<_, D>(|bytes, handles| {
encode_msg(id, bytes, handles)?;
self.send_raw(bytes, handles)
})?;
Ok(MessageResponse { id, client: Some(self.inner.clone()) })
}
}
#[must_use]
/// A future which polls for the response to a client message.
#[derive(Debug)]
pub struct MessageResponse<D: ResourceDialect = DefaultFuchsiaResourceDialect> {
id: Txid,
// `None` if the message response has been received
client: Option<Arc<ClientInner<D>>>,
}
impl<D: ResourceDialect> Unpin for MessageResponse<D> {}
impl<D: ResourceDialect> Future for MessageResponse<D> {
type Output = Result<D::MessageBufEtc, Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = &mut *self;
let res;
{
let client = this.client.as_ref().ok_or(Error::PollAfterCompletion)?;
res = client.poll_recv_msg_response(this.id, cx);
}
// Drop the client reference if the response has been received
if let Poll::Ready(Ok(_)) = res {
this.client.take().expect("MessageResponse polled after completion");
}
res
}
}
impl<D: ResourceDialect> Drop for MessageResponse<D> {
fn drop(&mut self) {
if let Some(client) = &self.client {
client.interests.lock().deregister(self.id);
}
}
}
/// An enum reprenting either a resolved message interest or a task on which to alert
/// that a response message has arrived.
#[derive(Debug)]
enum MessageInterest<D: ResourceDialect> {
/// A new `MessageInterest`
WillPoll,
/// A task is waiting to receive a response, and can be awoken with `Waker`.
Waiting(Waker),
/// A message has been received, and a task will poll to receive it.
Received(D::MessageBufEtc),
/// A message has not been received, but the person interested in the response
/// no longer cares about it, so the message should be discared upon arrival.
Discard,
}
impl<D: ResourceDialect> MessageInterest<D> {
/// Check if a message has been received.
fn is_received(&self) -> bool {
matches!(*self, MessageInterest::Received(_))
}
fn unwrap_received(self) -> D::MessageBufEtc {
if let MessageInterest::Received(buf) = self {
buf
} else {
panic!("EXPECTED received message")
}
}
}
#[derive(Debug)]
enum EventReceiverState {
Active,
Terminal,
Terminated,
}
/// A stream of events as `MessageBufEtc`s.
#[derive(Debug)]
pub struct EventReceiver<D: ResourceDialect = DefaultFuchsiaResourceDialect> {
inner: Arc<ClientInner<D>>,
state: EventReceiverState,
}
impl<D: ResourceDialect> Unpin for EventReceiver<D> {}
impl<D: ResourceDialect> FusedStream for EventReceiver<D> {
fn is_terminated(&self) -> bool {
matches!(self.state, EventReceiverState::Terminated)
}
}
/// This implementation holds up two invariants
/// (1) After `None` is returned, the next poll panics
/// (2) Until this instance is dropped, no other EventReceiver may claim the
/// event channel by calling Client::take_event_receiver.
impl<D: ResourceDialect> Stream for EventReceiver<D> {
type Item = Result<D::MessageBufEtc, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.state {
EventReceiverState::Active => {}
EventReceiverState::Terminated => {
panic!("polled EventReceiver after `None`");
}
EventReceiverState::Terminal => {
self.state = EventReceiverState::Terminated;
return Poll::Ready(None);
}
}
Poll::Ready(match ready!(self.inner.poll_recv_event(cx)) {
Ok(x) => Some(Ok(x)),
Err(Error::ClientChannelClosed { status: zx_status::Status::PEER_CLOSED, .. }) => {
// The channel is closed, with no epitaph. Set our internal state so that on
// the next poll_next() we panic and is_terminated() returns an appropriate value.
self.state = EventReceiverState::Terminated;
None
}
err @ Err(_) => {
// We've received a terminal error. Return it and set our internal state so that on
// the next poll_next() we return a None and terminate the stream.
self.state = EventReceiverState::Terminal;
Some(err)
}
})
}
}
impl<D: ResourceDialect> Drop for EventReceiver<D> {
fn drop(&mut self) {
self.inner.interests.lock().dropped_event_listener();
}
}
#[derive(Debug, Default)]
enum EventListener {
/// No one is listening for the event
#[default]
None,
/// Someone is listening for the event but has not yet polled
WillPoll,
/// Someone is listening for the event and can be woken via the `Waker`
Some(Waker),
}
impl EventListener {
fn is_some(&self) -> bool {
matches!(self, EventListener::Some(_))
}
}
/// A shared client channel which tracks EXPECTED and received responses
#[derive(Debug)]
struct ClientInner<D: ResourceDialect> {
/// The channel that leads to the server we are connected to.
channel: <D::ProxyChannel as ProxyChannelFor<D>>::Boxed,
/// Tracks the state of responses to two-way messages and events.
interests: Mutex<Interests<D>>,
/// A terminal error, which can be a server provided epitaph, or None if the channel is still
/// active.
terminal_error: Mutex<Option<Error>>,
/// The `ProtocolMarker::DEBUG_NAME` for the service this client connects to.
protocol_name: &'static str,
}
#[derive(Debug)]
struct Interests<D: ResourceDialect> {
messages: Slab<MessageInterest<D>>,
events: VecDeque<D::MessageBufEtc>,
event_listener: EventListener,
/// The number of wakers registered waiting for either a message or an event.
waker_count: usize,
/// Txid generation.
/// This is incremented every time we mint a new txid (see register_msg_interest).
/// The lower 7 bits are incorporated into the txid.
/// This is so that a client repeatedly making calls will have distinct txids for each call.
/// Not necessary for correctness but _very_ useful for tracing and debugging.
generation: u8,
}
impl<D: ResourceDialect> Default for Interests<D> {
fn default() -> Self {
Interests {
messages: Slab::new(),
events: Default::default(),
event_listener: Default::default(),
waker_count: 0,
generation: 0,
}
}
}
impl<D: ResourceDialect> Interests<D> {
/// Receives an event and returns a waker, if any.
fn push_event(&mut self, buf: D::MessageBufEtc) -> Option<Waker> {
self.events.push_back(buf);
self.take_event_waker()
}
/// Returns the waker for the task waiting for events, if any.
fn take_event_waker(&mut self) -> Option<Waker> {
if self.event_listener.is_some() {
let EventListener::Some(waker) =
mem::replace(&mut self.event_listener, EventListener::WillPoll)
else {
unreachable!()
};
// Matches the +1 in `register_event_listener`.
self.waker_count -= 1;
Some(waker)
} else {
None
}
}
/// Returns a reference to the waker.
fn event_waker(&self) -> Option<&Waker> {
match &self.event_listener {
EventListener::Some(waker) => Some(waker),
_ => None,
}
}
/// Receive a message, waking the waiter if they are waiting to poll and `wake` is true.
/// Returns an error of the message isn't found.
fn push_message(&mut self, txid: Txid, buf: D::MessageBufEtc) -> Result<Option<Waker>, Error> {
let InterestId(raw_id) = InterestId::from_txid(txid);
// Look for a message interest with the given ID.
// If one is found, store the message so that it can be picked up later.
let Some(interest) = self.messages.get_mut(raw_id) else {
// TODO(https://fxbug.dev/42066009): Should close the channel.
return Err(Error::InvalidResponseTxid);
};
let mut waker = None;
if let MessageInterest::Discard = interest {
self.messages.remove(raw_id);
} else if let MessageInterest::Waiting(w) =
mem::replace(interest, MessageInterest::Received(buf))
{
waker = Some(w);
// Matches the +1 in `register`.
self.waker_count -= 1;
}
Ok(waker)
}
/// Registers the waker from `cx` if the message has not already been received, replacing any
/// previous waker registered. Returns the message if it has been received.
fn register(&mut self, txid: Txid, cx: &Context<'_>) -> Option<D::MessageBufEtc> {
let InterestId(raw_id) = InterestId::from_txid(txid);
let interest = self.messages.get_mut(raw_id).expect("Polled unregistered interest");
match interest {
MessageInterest::Received(_) => {
return Some(self.messages.remove(raw_id).unwrap_received())
}
MessageInterest::Discard => panic!("Polled a discarded MessageReceiver?!"),
MessageInterest::WillPoll => self.waker_count += 1,
MessageInterest::Waiting(_) => {}
}
*interest = MessageInterest::Waiting(cx.waker().clone());
None
}
/// Deregisters an interest.
fn deregister(&mut self, txid: Txid) {
let InterestId(raw_id) = InterestId::from_txid(txid);
match self.messages[raw_id] {
MessageInterest::Received(_) => {
self.messages.remove(raw_id);
return;
}
MessageInterest::WillPoll => {}
MessageInterest::Waiting(_) => self.waker_count -= 1,
MessageInterest::Discard => unreachable!(),
}
self.messages[raw_id] = MessageInterest::Discard;
}
/// Registers an event listener.
fn register_event_listener(&mut self, cx: &Context<'_>) -> Option<D::MessageBufEtc> {
self.events.pop_front().or_else(|| {
if !mem::replace(&mut self.event_listener, EventListener::Some(cx.waker().clone()))
.is_some()
{
self.waker_count += 1;
}
None
})
}
/// Indicates the event listener has been dropped.
fn dropped_event_listener(&mut self) {
if self.event_listener.is_some() {
// Matches the +1 in register_event_listener.
self.waker_count -= 1;
}
self.event_listener = EventListener::None;
}
/// Registers interest in a response message.
///
/// This function returns a new transaction ID which should be used to send a message
/// via the channel. Responses are then received using `poll_recv_msg_response`.
fn register_msg_interest(&mut self) -> Txid {
self.generation = self.generation.wrapping_add(1);
// TODO(cramertj) use `try_from` here and assert that the conversion from
// `usize` to `u32` hasn't overflowed.
Txid::from_interest_id(
InterestId(self.messages.insert(MessageInterest::WillPoll)),
self.generation,
)
}
}
impl<D: ResourceDialect> ClientInner<D> {
fn poll_recv_event(
self: &Arc<Self>,
cx: &Context<'_>,
) -> Poll<Result<D::MessageBufEtc, Error>> {
// Update the EventListener with the latest waker, remove any stale WillPoll state
if let Some(msg_buf) = self.interests.lock().register_event_listener(cx) {
return Poll::Ready(Ok(msg_buf));
}
// Process any data on the channel, registering any tasks still waiting to wake when the
// channel becomes ready.
let maybe_terminal_error = self.recv_all(Some(Txid(0)));
let mut lock = self.interests.lock();
if let Some(msg_buf) = lock.events.pop_front() {
Poll::Ready(Ok(msg_buf))
} else {
maybe_terminal_error?;
Poll::Pending
}
}
/// Poll for the response to `txid`, registering the waker associated with `cx` to be awoken,
/// or returning the response buffer if it has been received.
fn poll_recv_msg_response(
self: &Arc<Self>,
txid: Txid,
cx: &Context<'_>,
) -> Poll<Result<D::MessageBufEtc, Error>> {
// Register our waker with the interest if we haven't received a message yet.
if let Some(buf) = self.interests.lock().register(txid, cx) {
return Poll::Ready(Ok(buf));
}
// Process any data on the channel, registering tasks still waiting for wake when the
// channel becomes ready.
let maybe_terminal_error = self.recv_all(Some(txid));
let InterestId(raw_id) = InterestId::from_txid(txid);
let mut interests = self.interests.lock();
if interests.messages.get(raw_id).expect("Polled unregistered interest").is_received() {
// If we got the result remove the received buffer and return, freeing up the
// space for a new message.
let buf = interests.messages.remove(raw_id).unwrap_received();
Poll::Ready(Ok(buf))
} else {
maybe_terminal_error?;
Poll::Pending
}
}
/// Poll for the receipt of any response message or an event.
/// Wakers present in any MessageInterest or the EventReceiver when this is called will be
/// notified when their message arrives or when there is new data if the channel is empty.
///
/// All errors are terminal, so once an error has been encountered, all subsequent calls will
/// produce the same error. The error might be due to the reception of an epitaph, the peer end
/// of the channel being closed, a decode error or some other error. Before using this terminal
/// error, callers *should* check to see if a response or event has been received as they
/// should normally, at least for the PEER_CLOSED case, be delivered before the terminal error.
fn recv_all(self: &Arc<Self>, want_txid: Option<Txid>) -> Result<(), Error> {
// Acquire a mutex so that only one thread can read from the underlying channel
// at a time. Channel is already synchronized, but we need to also decode the
// FIDL message header atomically so that epitaphs can be properly handled.
let mut terminal_error = self.terminal_error.lock();
if let Some(error) = terminal_error.as_ref() {
return Err(error.clone());
}
let recv_once = |waker| {
let cx = &mut Context::from_waker(&waker);
let mut buf = D::MessageBufEtc::new();
let result = self.channel.recv_etc_from(cx, &mut buf);
match result {
Poll::Ready(Ok(())) => {}
Poll::Ready(Err(None)) => {
// The channel has been closed, and no epitaph was received.
// Set the epitaph to PEER_CLOSED.
return Err(Error::ClientChannelClosed {
status: zx_status::Status::PEER_CLOSED,
protocol_name: self.protocol_name,
epitaph: None,
#[cfg(not(target_os = "fuchsia"))]
reason: self.channel.closed_reason(),
});
}
Poll::Ready(Err(Some(e))) => return Err(Error::ClientRead(e.into())),
Poll::Pending => return Ok(ControlFlow::Break(())),
};
let (bytes, _) = buf.split_mut();
let (header, body_bytes) = decode_transaction_header(bytes)?;
if header.is_epitaph() {
// Received an epitaph. Record this so that everyone receives the same epitaph.
let handles = &mut [];
let mut epitaph_body = Decode::<EpitaphBody, D>::new_empty();
Decoder::<D>::decode_into::<EpitaphBody>(
&header,
body_bytes,
handles,
&mut epitaph_body,
)?;
return Err(Error::ClientChannelClosed {
status: epitaph_body.error,
protocol_name: self.protocol_name,
epitaph: Some(epitaph_body.error.into_raw() as u32),
#[cfg(not(target_os = "fuchsia"))]
reason: self.channel.closed_reason(),
});
}
let txid = Txid(header.tx_id);
let waker = {
buf.shrink_bytes_to_fit();
let mut interests = self.interests.lock();
if txid == Txid(0) {
interests.push_event(buf)
} else {
interests.push_message(txid, buf)?
}
};
// Skip waking if the message was for the caller.
if want_txid != Some(txid) {
if let Some(waker) = waker {
waker.wake();
}
}
Ok(ControlFlow::Continue(()))
};
loop {
let waker = {
let interests = self.interests.lock();
if interests.waker_count == 0 {
return Ok(());
} else if interests.waker_count == 1 {
// There's only one waker, so just use the waker for the one interest. This
// is also required to allow `into_channel` to work, which relies on
// `Arc::try_into` which won't always work if we use a waker based on
// `ClientInner` (even if it's weak), because there can be races where the
// reference count on ClientInner is > 1.
if let Some(waker) = interests.event_waker() {
waker.clone()
} else {
interests
.messages
.iter()
.find_map(|(_, interest)| {
if let MessageInterest::Waiting(waker) = interest {
Some(waker.clone())
} else {
None
}
})
.unwrap()
}
} else {
let weak = Arc::downgrade(self);
let waker = ClientWaker(Arc::new(move || {
if let Some(strong) = weak.upgrade() {
// On host, we can't call recv_all because there are reentrancy issues; the waker is
// woken whilst locks are held on the channel which recv_all needs.
#[cfg(target_os = "fuchsia")]
if strong.recv_all(None).is_ok() {
return;
}
strong.wake_all();
}
}));
// If there's more than one waker, use a waker that points to
// `ClientInner` which will read the message and figure out which is
// the correct task to wake.
// SAFETY: We meet the requirements specified by RawWaker.
unsafe {
Waker::from_raw(RawWaker::new(
Arc::into_raw(Arc::new(waker)) as *const (),
&WAKER_VTABLE,
))
}
}
};
match recv_once(waker) {
Ok(ControlFlow::Continue(())) => {}
Ok(ControlFlow::Break(())) => return Ok(()),
Err(error) => {
// Broadcast all errors.
self.wake_all();
return Err(terminal_error.insert(error).clone());
}
}
}
}
/// Wakes all tasks that have polled on this channel.
fn wake_all(&self) {
let mut lock = self.interests.lock();
for (_, interest) in &mut lock.messages {
if let MessageInterest::Waiting(_) = interest {
let MessageInterest::Waiting(waker) =
mem::replace(interest, MessageInterest::WillPoll)
else {
unreachable!()
};
waker.wake();
}
}
if let Some(waker) = lock.take_event_waker() {
waker.wake();
}
lock.waker_count = 0;
}
}
#[derive(Clone)]
struct ClientWaker(Arc<dyn Fn() + Send + Sync + 'static>);
static WAKER_VTABLE: RawWakerVTable =
RawWakerVTable::new(clone_waker, wake, wake_by_ref, drop_waker);
unsafe fn clone_waker(data: *const ()) -> RawWaker {
Arc::increment_strong_count(data as *const ClientWaker);
RawWaker::new(data, &WAKER_VTABLE)
}
unsafe fn wake(data: *const ()) {
Arc::from_raw(data as *const ClientWaker).0();
}
unsafe fn wake_by_ref(data: *const ()) {
mem::ManuallyDrop::new(Arc::from_raw(data as *const ClientWaker)).0();
}
unsafe fn drop_waker(data: *const ()) {
Arc::from_raw(data as *const ClientWaker);
}
#[cfg(target_os = "fuchsia")]
pub mod sync {
//! Synchronous FIDL Client
use super::*;
use std::mem::MaybeUninit;
use zx::{self as zx, AsHandleRef, MessageBufEtc};
/// A synchronous client for making FIDL calls.
#[derive(Debug)]
pub struct Client {
// Underlying channel
channel: zx::Channel,
// The `ProtocolMarker::DEBUG_NAME` for the service this client connects to.
protocol_name: &'static str,
}
impl Client {
/// Create a new synchronous FIDL client.
pub fn new(channel: zx::Channel, protocol_name: &'static str) -> Self {
Client { channel, protocol_name }
}
/// Return a reference to the underlying channel for the client.
pub fn as_channel(&self) -> &zx::Channel {
&self.channel
}
/// Get the underlying channel out of the client.
pub fn into_channel(self) -> zx::Channel {
self.channel
}
/// Send a new message.
pub fn send<T: TypeMarker>(
&self,
body: impl Encode<T, DefaultFuchsiaResourceDialect>,
ordinal: u64,
dynamic_flags: DynamicFlags,
) -> Result<(), Error> {
let mut write_bytes = Vec::new();
let mut write_handles = Vec::new();
let msg = TransactionMessage {
header: TransactionHeader::new(0, ordinal, dynamic_flags),
body,
};
Encoder::encode::<TransactionMessageType<T>>(
&mut write_bytes,
&mut write_handles,
msg,
)?;
match self.channel.write_etc(&write_bytes, &mut write_handles) {
Ok(()) | Err(zx_status::Status::PEER_CLOSED) => Ok(()),
Err(e) => Err(Error::ClientWrite(e.into())),
}
}
/// Send a new message expecting a response.
pub fn send_query<Request: TypeMarker, Response: TypeMarker>(
&self,
body: impl Encode<Request, DefaultFuchsiaResourceDialect>,
ordinal: u64,
dynamic_flags: DynamicFlags,
deadline: zx::MonotonicInstant,
) -> Result<Response::Owned, Error>
where
Response::Owned: Decode<Response, DefaultFuchsiaResourceDialect>,
{
let mut write_bytes = Vec::new();
let mut write_handles = Vec::new();
let msg = TransactionMessage {
header: TransactionHeader::new(0, ordinal, dynamic_flags),
body,
};
Encoder::encode::<TransactionMessageType<Request>>(
&mut write_bytes,
&mut write_handles,
msg,
)?;
// Stack-allocate these buffers to avoid the heap and reuse any populated pages from
// previous function calls. Use uninitialized memory so that the only writes to this
// array will be by the kernel for whatever's actually used for the reply.
let bytes_out =
&mut [MaybeUninit::<u8>::uninit(); zx::sys::ZX_CHANNEL_MAX_MSG_BYTES as usize];
let handles_out = &mut [const { MaybeUninit::<zx::HandleInfo>::uninit() };
zx::sys::ZX_CHANNEL_MAX_MSG_HANDLES as usize];
// TODO: We should be able to use the same memory to back the bytes we use for writing
// and reading.
let (bytes_out, handles_out) = self
.channel
.call_etc_uninit(deadline, &write_bytes, &mut write_handles, bytes_out, handles_out)
.map_err(|e| self.wrap_error(Error::ClientCall, e))?;
let (header, body_bytes) = decode_transaction_header(bytes_out)?;
if header.ordinal != ordinal {
return Err(Error::InvalidResponseOrdinal);
}
let mut output = Decode::<Response, DefaultFuchsiaResourceDialect>::new_empty();
Decoder::<DefaultFuchsiaResourceDialect>::decode_into::<Response>(
&header,
body_bytes,
handles_out,
&mut output,
)?;
Ok(output)
}
/// Wait for an event to arrive on the underlying channel.
pub fn wait_for_event(
&self,
deadline: zx::MonotonicInstant,
) -> Result<MessageBufEtc, Error> {
let mut buf = zx::MessageBufEtc::new();
buf.ensure_capacity_bytes(zx::sys::ZX_CHANNEL_MAX_MSG_BYTES as usize);
buf.ensure_capacity_handle_infos(zx::sys::ZX_CHANNEL_MAX_MSG_HANDLES as usize);
loop {
self.channel
.wait_handle(
zx::Signals::CHANNEL_READABLE | zx::Signals::CHANNEL_PEER_CLOSED,
deadline,
)
.map_err(|e| self.wrap_error(Error::ClientEvent, e))?;
match self.channel.read_etc(&mut buf) {
Ok(()) => {
// We succeeded in reading the message. Check that it is
// an event not a two-way method reply.
let (header, body_bytes) = decode_transaction_header(buf.bytes())
.map_err(|_| Error::InvalidHeader)?;
if header.is_epitaph() {
// Received an epitaph. For the sync bindings, epitaphs are only
// reported by wait_for_event.
let handles = &mut [];
let mut epitaph_body =
Decode::<EpitaphBody, DefaultFuchsiaResourceDialect>::new_empty();
Decoder::<DefaultFuchsiaResourceDialect>::decode_into::<EpitaphBody>(
&header,
body_bytes,
handles,
&mut epitaph_body,
)?;
return Err(Error::ClientChannelClosed {
status: epitaph_body.error,
protocol_name: self.protocol_name,
epitaph: Some(epitaph_body.error.into_raw() as u32),
});
}
if header.tx_id != 0 {
return Err(Error::UnexpectedSyncResponse);
}
return Ok(buf);
}
Err(zx::Status::SHOULD_WAIT) => {
// Some other thread read the message we woke up to read.
continue;
}
Err(e) => {
return Err(self.wrap_error(|x| Error::ClientRead(x.into()), e));
}
}
}
}
/// Wraps an error in the given `variant` of the `Error` enum, except
/// for `zx_status::Status::PEER_CLOSED`, in which case it uses the
/// `Error::ClientChannelClosed` variant.
fn wrap_error<T: Fn(zx_status::Status) -> Error>(
&self,
variant: T,
err: zx_status::Status,
) -> Error {
if err == zx_status::Status::PEER_CLOSED {
Error::ClientChannelClosed {
status: zx_status::Status::PEER_CLOSED,
protocol_name: self.protocol_name,
epitaph: None,
}
} else {
variant(err)
}
}
}
}
#[cfg(all(test, target_os = "fuchsia"))]
mod tests {
use super::*;
use crate::encoding::MAGIC_NUMBER_INITIAL;
use crate::epitaph::{self, ChannelEpitaphExt};
use anyhow::{Context as _, Error};
use assert_matches::assert_matches;
use fuchsia_async as fasync;
use fuchsia_async::{Channel as AsyncChannel, DurationExt, TimeoutExt};
use futures::channel::oneshot;
use futures::stream::FuturesUnordered;
use futures::task::{noop_waker, waker, ArcWake};
use futures::{join, StreamExt, TryFutureExt};
use futures_test::task::new_count_waker;
use std::future::pending;
use std::thread;
use zx::{AsHandleRef, MessageBufEtc};
const SEND_ORDINAL_HIGH_BYTE: u8 = 42;
const SEND_ORDINAL: u64 = 42 << 32;
const SEND_DATA: u8 = 55;
const EVENT_ORDINAL: u64 = 854 << 23;
#[rustfmt::skip]
fn expected_sent_bytes(txid_index: u8, txid_generation: u8) -> [u8; 24] {
[
txid_index, 0, 0, txid_generation, // 32 bit tx_id
2, 0, 0, // flags
MAGIC_NUMBER_INITIAL,
0, 0, 0, 0, // low bytes of 64 bit ordinal
SEND_ORDINAL_HIGH_BYTE, 0, 0, 0, // high bytes of 64 bit ordinal
SEND_DATA, // 8 bit data
0, 0, 0, 0, 0, 0, 0, // 7 bytes of padding after our 1 byte of data
]
}
fn expected_sent_bytes_oneway() -> [u8; 24] {
expected_sent_bytes(0, 0)
}
fn send_transaction(header: TransactionHeader, channel: &zx::Channel) {
let (bytes, handles) = (&mut vec![], &mut vec![]);
encode_transaction(header, bytes, handles);
channel.write_etc(bytes, handles).expect("Server channel write failed");
}
fn encode_transaction(
header: TransactionHeader,
bytes: &mut Vec<u8>,
handles: &mut Vec<zx::HandleDisposition<'static>>,
) {
let event = TransactionMessage { header, body: SEND_DATA };
Encoder::<DefaultFuchsiaResourceDialect>::encode::<TransactionMessageType<u8>>(
bytes, handles, event,
)
.expect("Encoding failure");
}
#[test]
fn sync_client() -> Result<(), Error> {
let (client_end, server_end) = zx::Channel::create();
let client = sync::Client::new(client_end, "test_protocol");
client.send::<u8>(SEND_DATA, SEND_ORDINAL, DynamicFlags::empty()).context("sending")?;
let mut received = MessageBufEtc::new();
server_end.read_etc(&mut received).context("reading")?;
assert_eq!(received.bytes(), expected_sent_bytes_oneway());
Ok(())
}
#[test]
fn sync_client_with_response() -> Result<(), Error> {
let (client_end, server_end) = zx::Channel::create();
let client = sync::Client::new(client_end, "test_protocol");
thread::spawn(move || {
// Server
let mut received = MessageBufEtc::new();
server_end
.wait_handle(
zx::Signals::CHANNEL_READABLE,
zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(5)),
)
.expect("failed to wait for channel readable");
server_end.read_etc(&mut received).expect("failed to read on server end");
let (buf, _handles) = received.split_mut();
let (header, _body_bytes) = decode_transaction_header(buf).expect("server decode");
assert_eq!(header.ordinal, SEND_ORDINAL);
send_transaction(
TransactionHeader::new(header.tx_id, header.ordinal, DynamicFlags::empty()),
&server_end,
);
});
let response_data = client
.send_query::<u8, u8>(
SEND_DATA,
SEND_ORDINAL,
DynamicFlags::empty(),
zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(5)),
)
.context("sending query")?;
assert_eq!(SEND_DATA, response_data);
Ok(())
}
#[test]
fn sync_client_with_event_and_response() -> Result<(), Error> {
let (client_end, server_end) = zx::Channel::create();
let client = sync::Client::new(client_end, "test_protocol");
thread::spawn(move || {
// Server
let mut received = MessageBufEtc::new();
server_end
.wait_handle(
zx::Signals::CHANNEL_READABLE,
zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(5)),
)
.expect("failed to wait for channel readable");
server_end.read_etc(&mut received).expect("failed to read on server end");
let (buf, _handles) = received.split_mut();
let (header, _body_bytes) = decode_transaction_header(buf).expect("server decode");
assert_ne!(header.tx_id, 0);
assert_eq!(header.ordinal, SEND_ORDINAL);
// First, send an event.
send_transaction(
TransactionHeader::new(0, EVENT_ORDINAL, DynamicFlags::empty()),
&server_end,
);
// Then send the reply. The kernel should pick the correct message to deliver based
// on the tx_id.
send_transaction(
TransactionHeader::new(header.tx_id, header.ordinal, DynamicFlags::empty()),
&server_end,
);
});
let response_data = client
.send_query::<u8, u8>(
SEND_DATA,
SEND_ORDINAL,
DynamicFlags::empty(),
zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(5)),
)
.context("sending query")?;
assert_eq!(SEND_DATA, response_data);
let event_buf = client
.wait_for_event(zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(5)))
.context("waiting for event")?;
let (bytes, _handles) = event_buf.split();
let (header, _body) = decode_transaction_header(&bytes).expect("event decode");
assert_eq!(header.ordinal, EVENT_ORDINAL);
Ok(())
}
#[test]
fn sync_client_with_racing_events() -> Result<(), Error> {
let (client_end, server_end) = zx::Channel::create();
let client1 = Arc::new(sync::Client::new(client_end, "test_protocol"));
let client2 = client1.clone();
let thread1 = thread::spawn(move || {
let result = client1.wait_for_event(zx::MonotonicInstant::after(
zx::MonotonicDuration::from_seconds(5),
));
assert!(result.is_ok());
});
let thread2 = thread::spawn(move || {
let result = client2.wait_for_event(zx::MonotonicInstant::after(
zx::MonotonicDuration::from_seconds(5),
));
assert!(result.is_ok());
});
send_transaction(
TransactionHeader::new(0, EVENT_ORDINAL, DynamicFlags::empty()),
&server_end,
);
send_transaction(
TransactionHeader::new(0, EVENT_ORDINAL, DynamicFlags::empty()),
&server_end,
);
assert!(thread1.join().is_ok());
assert!(thread2.join().is_ok());
Ok(())
}
#[test]
fn sync_client_wait_for_event_gets_method_response() -> Result<(), Error> {
let (client_end, server_end) = zx::Channel::create();
let client = sync::Client::new(client_end, "test_protocol");
send_transaction(
TransactionHeader::new(3902304923, SEND_ORDINAL, DynamicFlags::empty()),
&server_end,
);
assert_matches!(
client.wait_for_event(zx::MonotonicInstant::after(
zx::MonotonicDuration::from_seconds(5)
)),
Err(crate::Error::UnexpectedSyncResponse)
);
Ok(())
}
#[test]
fn sync_client_one_way_call_suceeds_after_peer_closed() -> Result<(), Error> {
let (client_end, server_end) = zx::Channel::create();
let client = sync::Client::new(client_end, "test_protocol");
drop(server_end);
assert_matches!(client.send::<u8>(SEND_DATA, SEND_ORDINAL, DynamicFlags::empty()), Ok(()));
Ok(())
}
#[test]
fn sync_client_two_way_call_fails_after_peer_closed() -> Result<(), Error> {
let (client_end, server_end) = zx::Channel::create();
let client = sync::Client::new(client_end, "test_protocol");
drop(server_end);
assert_matches!(
client.send_query::<u8, u8>(
SEND_DATA,
SEND_ORDINAL,
DynamicFlags::empty(),
zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(5))
),
Err(crate::Error::ClientChannelClosed {
status: zx_status::Status::PEER_CLOSED,
protocol_name: "test_protocol",
epitaph: None,
})
);
Ok(())
}
// TODO(https://fxbug.dev/42153053): When the sync client supports epitaphs, rename
// these tests and change the asserts to expect zx_status::Status::UNAVAILABLE.
#[test]
fn sync_client_send_does_not_receive_epitaphs() -> Result<(), Error> {
let (client_end, server_end) = zx::Channel::create();
let client = sync::Client::new(client_end, "test_protocol");
// Close the server channel with an epitaph.
server_end
.close_with_epitaph(zx_status::Status::UNAVAILABLE)
.expect("failed to write epitaph");
assert_matches!(
client.send_query::<u8, u8>(
SEND_DATA,
SEND_ORDINAL,
DynamicFlags::empty(),
zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(5))
),
Err(crate::Error::ClientChannelClosed {
status: zx_status::Status::PEER_CLOSED,
protocol_name: "test_protocol",
epitaph: None,
})
);
Ok(())
}
#[test]
fn sync_client_wait_for_events_does_receive_epitaphs() -> Result<(), Error> {
let (client_end, server_end) = zx::Channel::create();
let client = sync::Client::new(client_end, "test_protocol");
// Close the server channel with an epitaph.
server_end
.close_with_epitaph(zx_status::Status::UNAVAILABLE)
.expect("failed to write epitaph");
assert_matches!(
client.wait_for_event(zx::MonotonicInstant::after(
zx::MonotonicDuration::from_seconds(5)
)),
Err(crate::Error::ClientChannelClosed {
status: zx_status::Status::UNAVAILABLE,
protocol_name: "test_protocol",
epitaph: Some(epitaph),
}) if epitaph == zx_types::ZX_ERR_UNAVAILABLE as u32
);
Ok(())
}
#[test]
fn sync_client_into_channel() -> Result<(), Error> {
let (client_end, _server_end) = zx::Channel::create();
let client_end_raw = client_end.raw_handle();
let client = sync::Client::new(client_end, "test_protocol");
assert_eq!(client.into_channel().raw_handle(), client_end_raw);
Ok(())
}
#[fasync::run_singlethreaded(test)]
async fn client() {
let (client_end, server_end) = zx::Channel::create();
let client_end = AsyncChannel::from_channel(client_end);
let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
let server = AsyncChannel::from_channel(server_end);
let receiver = async move {
let mut buffer = MessageBufEtc::new();
server.recv_etc_msg(&mut buffer).await.expect("failed to recv msg");
assert_eq!(buffer.bytes(), expected_sent_bytes_oneway());
};
// add a timeout to receiver so if test is broken it doesn't take forever
let receiver = receiver
.on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || {
panic!("did not receive message in time!")
});
client
.send::<u8>(SEND_DATA, SEND_ORDINAL, DynamicFlags::empty())
.expect("failed to send msg");
receiver.await;
}
#[fasync::run_singlethreaded(test)]
async fn client_with_response() {
let (client_end, server_end) = zx::Channel::create();
let client_end = AsyncChannel::from_channel(client_end);
let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
let server = AsyncChannel::from_channel(server_end);
let mut buffer = MessageBufEtc::new();
let receiver = async move {
server.recv_etc_msg(&mut buffer).await.expect("failed to recv msg");
let two_way_tx_id = 1u8;
assert_eq!(buffer.bytes(), expected_sent_bytes(two_way_tx_id, 1));
let (bytes, handles) = (&mut vec![], &mut vec![]);
let header =
TransactionHeader::new(two_way_tx_id as u32, SEND_ORDINAL, DynamicFlags::empty());
encode_transaction(header, bytes, handles);
server.write_etc(bytes, handles).expect("Server channel write failed");
};
// add a timeout to receiver so if test is broken it doesn't take forever
let receiver = receiver
.on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || {
panic!("did not receiver message in time!")
});
let sender = client
.send_query::<u8, u8, SEND_ORDINAL>(SEND_DATA, DynamicFlags::empty())
.map_ok(|x| assert_eq!(x, SEND_DATA))
.unwrap_or_else(|e| panic!("fidl error: {e:?}"));
// add a timeout to receiver so if test is broken it doesn't take forever
let sender = sender.on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || {
panic!("did not receive response in time!")
});
let ((), ()) = join!(receiver, sender);
}
#[fasync::run_singlethreaded(test)]
async fn client_with_response_receives_epitaph() {
let (client_end, server_end) = zx::Channel::create();
let client_end = AsyncChannel::from_channel(client_end);
let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
let server = AsyncChannel::from_channel(server_end);
let mut buffer = zx::MessageBufEtc::new();
let receiver = async move {
server.recv_etc_msg(&mut buffer).await.expect("failed to recv msg");
server
.close_with_epitaph(zx_status::Status::UNAVAILABLE)
.expect("failed to write epitaph");
};
// add a timeout to receiver so if test is broken it doesn't take forever
let receiver = receiver
.on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || {
panic!("did not receive message in time!")
});
let sender = async move {
const ORDINAL: u64 = 42 << 32;
let result = client.send_query::<u8, u8, ORDINAL>(55, DynamicFlags::empty()).await;
assert_matches!(
result,
Err(crate::Error::ClientChannelClosed {
status: zx_status::Status::UNAVAILABLE,
protocol_name: "test_protocol",
epitaph: Some(epitaph),
}) if epitaph == zx_types::ZX_ERR_UNAVAILABLE as u32
);
};
// add a timeout to sender so if test is broken it doesn't take forever
let sender = sender.on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || {
panic!("did not receive response in time!")
});
let ((), ()) = join!(receiver, sender);
}
#[fasync::run_singlethreaded(test)]
#[should_panic]
async fn event_cant_be_taken_twice() {
let (client_end, _) = zx::Channel::create();
let client_end = AsyncChannel::from_channel(client_end);
let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
let _foo = client.take_event_receiver();
client.take_event_receiver();
}
#[fasync::run_singlethreaded(test)]
async fn event_can_be_taken_after_drop() {
let (client_end, _) = zx::Channel::create();
let client_end = AsyncChannel::from_channel(client_end);
let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
let foo = client.take_event_receiver();
drop(foo);
client.take_event_receiver();
}
#[fasync::run_singlethreaded(test)]
async fn receiver_termination_test() {
let (client_end, _) = zx::Channel::create();
let client_end = AsyncChannel::from_channel(client_end);
let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
let mut foo = client.take_event_receiver();
assert!(!foo.is_terminated(), "receiver should not report terminated before being polled");
let _ = foo.next().await;
assert!(
foo.is_terminated(),
"receiver should report terminated after seeing channel is closed"
);
}
#[fasync::run_singlethreaded(test)]
#[should_panic(expected = "polled EventReceiver after `None`")]
async fn receiver_cant_be_polled_more_than_once_on_closed_stream() {
let (client_end, _) = zx::Channel::create();
let client_end = AsyncChannel::from_channel(client_end);
let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
let foo = client.take_event_receiver();
drop(foo);
let mut bar = client.take_event_receiver();
assert!(bar.next().await.is_none(), "read on closed channel should return none");
// this should panic
let _ = bar.next().await;
}
#[fasync::run_singlethreaded(test)]
#[should_panic(expected = "polled EventReceiver after `None`")]
async fn receiver_panics_when_polled_after_receiving_epitaph_then_none() {
let (client_end, server_end) = zx::Channel::create();
let client_end = AsyncChannel::from_channel(client_end);
let server_end = AsyncChannel::from_channel(server_end);
let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
let mut stream = client.take_event_receiver();
epitaph::write_epitaph_impl(&server_end, zx_status::Status::UNAVAILABLE)
.expect("wrote epitaph");
drop(server_end);
assert_matches!(
stream.next().await,
Some(Err(crate::Error::ClientChannelClosed {
status: zx_status::Status::UNAVAILABLE,
protocol_name: "test_protocol",
epitaph: Some(epitaph),
})) if epitaph == zx_types::ZX_ERR_UNAVAILABLE as u32
);
assert_matches!(stream.next().await, None);
// this should panic
let _ = stream.next().await;
}
#[fasync::run_singlethreaded(test)]
async fn event_can_be_taken() {
let (client_end, _) = zx::Channel::create();
let client_end = AsyncChannel::from_channel(client_end);
let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
client.take_event_receiver();
}
#[fasync::run_singlethreaded(test)]
async fn event_received() {
let (client_end, server_end) = zx::Channel::create();
let client_end = AsyncChannel::from_channel(client_end);
let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
// Send the event from the server
let server = AsyncChannel::from_channel(server_end);
let (bytes, handles) = (&mut vec![], &mut vec![]);
const ORDINAL: u64 = 5;
let header = TransactionHeader::new(0, ORDINAL, DynamicFlags::empty());
encode_transaction(header, bytes, handles);
server.write_etc(bytes, handles).expect("Server channel write failed");
drop(server);
let recv = client
.take_event_receiver()
.into_future()
.then(|(x, stream)| {
let x = x.expect("should contain one element");
let x = x.expect("fidl error");
let x: i32 =
decode_transaction_body::<i32, DefaultFuchsiaResourceDialect, ORDINAL>(x)
.expect("failed to decode event");
assert_eq!(x, 55);
stream.into_future()
})
.map(|(x, _stream)| assert!(x.is_none(), "should have emptied"));
// add a timeout to receiver so if test is broken it doesn't take forever
let recv = recv.on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || {
panic!("did not receive event in time!")
});
recv.await;
}
/// Tests that the event receiver can be taken, the stream read to the end,
/// the receiver dropped, and then a new receiver gotten from taking the
/// stream again.
#[fasync::run_singlethreaded(test)]
async fn receiver_can_be_taken_after_end_of_stream() {
let (client_end, server_end) = zx::Channel::create();
let client_end = AsyncChannel::from_channel(client_end);
let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
// Send the event from the server
let server = AsyncChannel::from_channel(server_end);
let (bytes, handles) = (&mut vec![], &mut vec![]);
const ORDINAL: u64 = 5;
let header = TransactionHeader::new(0, ORDINAL, DynamicFlags::empty());
encode_transaction(header, bytes, handles);
server.write_etc(bytes, handles).expect("Server channel write failed");
drop(server);
// Create a block to make sure the first event receiver is dropped.
// Creating the block is a bit of paranoia, because awaiting the
// future moves the receiver anyway.
{
let recv = client
.take_event_receiver()
.into_future()
.then(|(x, stream)| {
let x = x.expect("should contain one element");
let x = x.expect("fidl error");
let x: i32 =
decode_transaction_body::<i32, DefaultFuchsiaResourceDialect, ORDINAL>(x)
.expect("failed to decode event");
assert_eq!(x, 55);
stream.into_future()
})
.map(|(x, _stream)| assert!(x.is_none(), "should have emptied"));
// add a timeout to receiver so if test is broken it doesn't take forever
let recv = recv.on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || {
panic!("did not receive event in time!")
});
recv.await;
}
// if we take the event stream again, we should be able to get the next
// without a panic, but that should be none
let mut c = client.take_event_receiver();
assert!(
c.next().await.is_none(),
"receiver on closed channel should return none on first call"
);
}
#[fasync::run_singlethreaded(test)]
async fn event_incompatible_format() {
let (client_end, server_end) = zx::Channel::create();
let client_end = AsyncChannel::from_channel(client_end);
let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
// Send the event from the server
let server = AsyncChannel::from_channel(server_end);
let (bytes, handles) = (&mut vec![], &mut vec![]);
let header = TransactionHeader::new_full(
0,
5,
crate::encoding::Context {
wire_format_version: crate::encoding::WireFormatVersion::V2,
},
DynamicFlags::empty(),
0,
);
encode_transaction(header, bytes, handles);
server.write_etc(bytes, handles).expect("Server channel write failed");
drop(server);
let mut event_receiver = client.take_event_receiver();
let recv = event_receiver.next().map(|event| {
assert_matches!(event, Some(Err(crate::Error::IncompatibleMagicNumber(0))))
});
// add a timeout to receiver so if test is broken it doesn't take forever
let recv = recv.on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || {
panic!("did not receive event in time!")
});
recv.await;
}
#[test]
fn client_always_wakes_pending_futures() {
let mut executor = fasync::TestExecutor::new();
let (client_end, server_end) = zx::Channel::create();
let client_end = AsyncChannel::from_channel(client_end);
let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
let mut event_receiver = client.take_event_receiver();
// first poll on a response
let (response_waker, response_waker_count) = new_count_waker();
let response_cx = &mut Context::from_waker(&response_waker);
let mut response_txid = Txid(0);
let mut response_future = client
.send_raw_query(|tx_id, bytes, handles| {
response_txid = tx_id;
let header = TransactionHeader::new(
response_txid.as_raw_id(),
SEND_ORDINAL,
DynamicFlags::empty(),
);
encode_transaction(header, bytes, handles);
Ok(())
})
.expect("Couldn't send query");
assert!(response_future.poll_unpin(response_cx).is_pending());
// then, poll on an event
let (event_waker, event_waker_count) = new_count_waker();
let event_cx = &mut Context::from_waker(&event_waker);
assert!(event_receiver.poll_next_unpin(event_cx).is_pending());
// at this point, nothing should have been woken
assert_eq!(response_waker_count.get(), 0);
assert_eq!(event_waker_count.get(), 0);
// next, simulate an event coming in
send_transaction(TransactionHeader::new(0, 5, DynamicFlags::empty()), &server_end);
// get event loop to deliver readiness notifications to channels
let _ = executor.run_until_stalled(&mut future::pending::<()>());
// The event wake should be woken but not the response_waker.
assert_eq!(response_waker_count.get(), 0);
assert_eq!(event_waker_count.get(), 1);
// we'll pretend event_waker was woken, and have that poll out the event
assert!(event_receiver.poll_next_unpin(event_cx).is_ready());
// next, simulate a response coming in
send_transaction(
TransactionHeader::new(response_txid.as_raw_id(), SEND_ORDINAL, DynamicFlags::empty()),
&server_end,
);
// get event loop to deliver readiness notifications to channels
let _ = executor.run_until_stalled(&mut future::pending::<()>());
// response waker should now get woken.
assert_eq!(response_waker_count.get(), 1);
}
#[test]
fn client_always_wakes_pending_futures_on_epitaph() {
let mut executor = fasync::TestExecutor::new();
let (client_end, server_end) = zx::Channel::create();
let client_end = AsyncChannel::from_channel(client_end);
let server_end = AsyncChannel::from_channel(server_end);
let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
let mut event_receiver = client.take_event_receiver();
// first poll on a response
let (response1_waker, response1_waker_count) = new_count_waker();
let response1_cx = &mut Context::from_waker(&response1_waker);
let mut response1_future = client
.send_raw_query(|tx_id, bytes, handles| {
let header =
TransactionHeader::new(tx_id.as_raw_id(), SEND_ORDINAL, DynamicFlags::empty());
encode_transaction(header, bytes, handles);
Ok(())
})
.expect("Couldn't send query");
assert!(response1_future.poll_unpin(response1_cx).is_pending());
// then, poll on an event
let (event_waker, event_waker_count) = new_count_waker();
let event_cx = &mut Context::from_waker(&event_waker);
assert!(event_receiver.poll_next_unpin(event_cx).is_pending());
// poll on another response
let (response2_waker, response2_waker_count) = new_count_waker();
let response2_cx = &mut Context::from_waker(&response2_waker);
let mut response2_future = client
.send_raw_query(|tx_id, bytes, handles| {
let header =
TransactionHeader::new(tx_id.as_raw_id(), SEND_ORDINAL, DynamicFlags::empty());
encode_transaction(header, bytes, handles);
Ok(())
})
.expect("Couldn't send query");
assert!(response2_future.poll_unpin(response2_cx).is_pending());
let wakers = vec![response1_waker_count, response2_waker_count, event_waker_count];
// get event loop to deliver readiness notifications to channels
let _ = executor.run_until_stalled(&mut future::pending::<()>());
// at this point, nothing should have been woken
assert_eq!(0, wakers.iter().fold(0, |acc, x| acc + x.get()));
// next, simulate an epitaph without closing
epitaph::write_epitaph_impl(&server_end, zx_status::Status::UNAVAILABLE)
.expect("wrote epitaph");
// get event loop to deliver readiness notifications to channels
let _ = executor.run_until_stalled(&mut future::pending::<()>());
// All the wakers should be woken up because the channel is ready to read, and the message
// could be for any of them.
for wake_count in &wakers {
assert_eq!(wake_count.get(), 1);
}
// pretend that response1 woke and poll that to completion.
assert_matches!(
response1_future.poll_unpin(response1_cx),
Poll::Ready(Err(crate::Error::ClientChannelClosed {
status: zx_status::Status::UNAVAILABLE,
protocol_name: "test_protocol",
epitaph: Some(epitaph),
})) if epitaph == zx_types::ZX_ERR_UNAVAILABLE as u32
);
// get event loop to deliver readiness notifications to channels
let _ = executor.run_until_stalled(&mut future::pending::<()>());
// poll response2 to completion.
assert_matches!(
response2_future.poll_unpin(response2_cx),
Poll::Ready(Err(crate::Error::ClientChannelClosed {
status: zx_status::Status::UNAVAILABLE,
protocol_name: "test_protocol",
epitaph: Some(epitaph),
})) if epitaph == zx_types::ZX_ERR_UNAVAILABLE as u32
);
// poll the event stream to completion.
assert!(event_receiver.poll_next_unpin(event_cx).is_ready());
}
#[fasync::run_singlethreaded(test)]
async fn client_allows_take_event_stream_even_if_event_delivered() {
let (client_end, server_end) = zx::Channel::create();
let client_end = AsyncChannel::from_channel(client_end);
let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
// first simulate an event coming in, even though nothing has polled
send_transaction(TransactionHeader::new(0, 5, DynamicFlags::empty()), &server_end);
// next, poll on a response
let (response_waker, _response_waker_count) = new_count_waker();
let response_cx = &mut Context::from_waker(&response_waker);
let mut response_future =
client.send_query::<u8, u8, SEND_ORDINAL>(55, DynamicFlags::empty());
assert!(response_future.poll_unpin(response_cx).is_pending());
// then, make sure we can still take the event receiver without panicking
let mut _event_receiver = client.take_event_receiver();
}
#[fasync::run_singlethreaded(test)]
async fn client_reports_epitaph_from_all_read_actions() {
#[derive(Debug, PartialEq)]
enum Action {
SendMsg, // send a one-way message
SendQuery, // send a two-way message and just call .check()
WaitQuery, // send a two-way message and wait for the response
RecvEvent, // wait to receive an event
}
impl Action {
fn should_report_epitaph(&self) -> bool {
match self {
Action::SendMsg | Action::SendQuery => false,
Action::WaitQuery | Action::RecvEvent => true,
}
}
}
use Action::*;
// Test all permutations of two actions. Verify the epitaph is reported
// twice (2 reads), once (1 read, 1 write), or not at all (2 writes).
for two_actions in &[
[SendMsg, SendMsg],
[SendMsg, SendQuery],
[SendMsg, WaitQuery],
[SendMsg, RecvEvent],
[SendQuery, SendMsg],
[SendQuery, SendQuery],
[SendQuery, WaitQuery],
[SendQuery, RecvEvent],
[WaitQuery, SendMsg],
[WaitQuery, SendQuery],
[WaitQuery, WaitQuery],
[WaitQuery, RecvEvent],
[RecvEvent, SendMsg],
[RecvEvent, SendQuery],
[RecvEvent, WaitQuery],
// No [RecvEvent, RecvEvent] because it behaves differently: after
// reporting an epitaph, the next call returns None.
] {
let (client_end, server_end) = zx::Channel::create();
let client_end = AsyncChannel::from_channel(client_end);
let client = Client::new(client_end, "test_protocol");
// Immediately close the FIDL channel with an epitaph.
let server_end = AsyncChannel::from_channel(server_end);
server_end
.close_with_epitaph(zx_status::Status::UNAVAILABLE)
.expect("failed to write epitaph");
let mut event_receiver = client.take_event_receiver();
// Assert that each action reports the epitaph.
for (index, action) in two_actions.iter().enumerate() {
let err = match action {
SendMsg => {
client.send::<u8>(SEND_DATA, SEND_ORDINAL, DynamicFlags::empty()).err()
}
WaitQuery => client
.send_query::<u8, u8, SEND_ORDINAL>(SEND_DATA, DynamicFlags::empty())
.await
.err(),
SendQuery => client
.send_query::<u8, u8, SEND_ORDINAL>(SEND_DATA, DynamicFlags::empty())
.check()
.err(),
RecvEvent => event_receiver.next().await.unwrap().err(),
};
let details = format!("index: {index:?}, two_actions: {two_actions:?}");
match err {
None => assert!(
!action.should_report_epitaph(),
"expected epitaph, but succeeded.\n{details}"
),
Some(crate::Error::ClientChannelClosed {
status: zx_status::Status::UNAVAILABLE,
protocol_name: "test_protocol",
epitaph: Some(epitaph),
}) if epitaph == zx_types::ZX_ERR_UNAVAILABLE as u32 => assert!(
action.should_report_epitaph(),
"got epitaph unexpectedly.\n{details}",
),
Some(err) => panic!("unexpected error: {err:#?}.\n{details}"),
}
}
// If we got the epitaph from RecvEvent, the next should return None.
if two_actions.contains(&RecvEvent) {
assert_matches!(event_receiver.next().await, None);
}
}
}
#[test]
fn client_query_result_check() {
let mut executor = fasync::TestExecutor::new();
let (client_end, server_end) = zx::Channel::create();
let client_end = AsyncChannel::from_channel(client_end);
let client = Client::new(client_end, "test_protocol");
let server = AsyncChannel::from_channel(server_end);
// Sending works, and checking when a message successfully sends returns itself.
let active_fut =
client.send_query::<u8, u8, SEND_ORDINAL>(SEND_DATA, DynamicFlags::empty());
let mut checked_fut = active_fut.check().expect("failed to check future");
// Should be able to complete the query even after checking.
let mut buffer = MessageBufEtc::new();
executor.run_singlethreaded(server.recv_etc_msg(&mut buffer)).expect("failed to recv msg");
let two_way_tx_id = 1u8;
assert_eq!(buffer.bytes(), expected_sent_bytes(two_way_tx_id, 1));
let (bytes, handles) = (&mut vec![], &mut vec![]);
let header =
TransactionHeader::new(two_way_tx_id as u32, SEND_ORDINAL, DynamicFlags::empty());
encode_transaction(header, bytes, handles);
server.write_etc(bytes, handles).expect("Server channel write failed");
executor
.run_singlethreaded(&mut checked_fut)
.map(|x| assert_eq!(x, SEND_DATA))
.unwrap_or_else(|e| panic!("fidl error: {e:?}"));
// Close the server channel, meaning the next query will fail.
drop(server);
let query_fut = client.send_query::<u8, u8, SEND_ORDINAL>(SEND_DATA, DynamicFlags::empty());
// The check succeeds, because we do not expose PEER_CLOSED on writes.
let mut checked_fut = query_fut.check().expect("failed to check future");
// But the query will fail when it tries to read the response.
assert_matches!(
executor.run_singlethreaded(&mut checked_fut),
Err(crate::Error::ClientChannelClosed {
status: zx_status::Status::PEER_CLOSED,
protocol_name: "test_protocol",
epitaph: None,
})
);
}
#[fasync::run_singlethreaded(test)]
async fn client_into_channel() {
// This test doesn't actually do any async work, but the fuchsia
// executor must be set up in order to create the channel.
let (client_end, _server_end) = zx::Channel::create();
let client_end = AsyncChannel::from_channel(client_end);
let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
assert!(client.into_channel().is_ok());
}
#[fasync::run_singlethreaded(test)]
async fn client_into_channel_outstanding_messages() {
// This test doesn't actually do any async work, but the fuchsia
// executor must be set up in order to create the channel.
let (client_end, _server_end) = zx::Channel::create();
let client_end = AsyncChannel::from_channel(client_end);
let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
{
// Create a send future to insert a message interest but drop it
// before a response can be received.
let _sender =
client.send_query::<u8, u8, SEND_ORDINAL>(SEND_DATA, DynamicFlags::empty());
}
assert!(client.into_channel().is_err());
}
#[fasync::run_singlethreaded(test)]
async fn client_into_channel_active_clone() {
// This test doesn't actually do any async work, but the fuchsia
// executor must be set up in order to create the channel.
let (client_end, _server_end) = zx::Channel::create();
let client_end = AsyncChannel::from_channel(client_end);
let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
let _cloned_client = client.clone();
assert!(client.into_channel().is_err());
}
#[fasync::run_singlethreaded(test)]
async fn client_into_channel_outstanding_messages_get_received() {
let (client_end, server_end) = zx::Channel::create();
let client_end = AsyncChannel::from_channel(client_end);
let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
let server = AsyncChannel::from_channel(server_end);
let mut buffer = MessageBufEtc::new();
let receiver = async move {
server.recv_etc_msg(&mut buffer).await.expect("failed to recv msg");
let two_way_tx_id = 1u8;
assert_eq!(buffer.bytes(), expected_sent_bytes(two_way_tx_id, 1));
let (bytes, handles) = (&mut vec![], &mut vec![]);
let header =
TransactionHeader::new(two_way_tx_id as u32, SEND_ORDINAL, DynamicFlags::empty());
encode_transaction(header, bytes, handles);
server.write_etc(bytes, handles).expect("Server channel write failed");
};
// add a timeout to receiver so if test is broken it doesn't take forever
let receiver = receiver
.on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || {
panic!("did not receiver message in time!")
});
let sender = client
.send_query::<u8, u8, SEND_ORDINAL>(SEND_DATA, DynamicFlags::empty())
.map_ok(|x| assert_eq!(x, SEND_DATA))
.unwrap_or_else(|e| panic!("fidl error: {e:?}"));
// add a timeout to receiver so if test is broken it doesn't take forever
let sender = sender.on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || {
panic!("did not receive response in time!")
});
let ((), ()) = join!(receiver, sender);
assert!(client.into_channel().is_ok());
}
#[fasync::run_singlethreaded(test)]
async fn client_decode_errors_are_broadcast() {
let (client_end, server_end) = zx::Channel::create();
let client_end = AsyncChannel::from_channel(client_end);
let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
let server = AsyncChannel::from_channel(server_end);
let _server = fasync::Task::spawn(async move {
let mut buffer = MessageBufEtc::new();
server.recv_etc_msg(&mut buffer).await.expect("failed to recv msg");
let two_way_tx_id = 1u8;
assert_eq!(buffer.bytes(), expected_sent_bytes(two_way_tx_id, 1));
let (bytes, handles) = (&mut vec![], &mut vec![]);
let header =
TransactionHeader::new(two_way_tx_id as u32, SEND_ORDINAL, DynamicFlags::empty());
encode_transaction(header, bytes, handles);
// Zero out the at-rest flags which will give this message an invalid version.
bytes[4] = 0;
server.write_etc(bytes, handles).expect("Server channel write failed");
// Wait forever to stop the channel from being closed.
pending::<()>().await;
});
let futures = FuturesUnordered::new();
for _ in 0..4 {
futures.push(async {
assert_matches!(
client
.send_query::<u8, u8, SEND_ORDINAL>(SEND_DATA, DynamicFlags::empty())
.map_ok(|x| assert_eq!(x, SEND_DATA))
.await,
Err(crate::Error::UnsupportedWireFormatVersion)
);
});
}
futures
.collect::<Vec<_>>()
.on_timeout(zx::MonotonicDuration::from_seconds(1).after_now(), || panic!("timed out!"))
.await;
}
#[fasync::run_singlethreaded(test)]
async fn into_channel_from_waker_succeeds() {
let (client_end, server_end) = zx::Channel::create();
let client_end = AsyncChannel::from_channel(client_end);
let client = Client::<DefaultFuchsiaResourceDialect>::new(client_end, "test_protocol");
let server = AsyncChannel::from_channel(server_end);
let mut buffer = MessageBufEtc::new();
let receiver = async move {
server.recv_etc_msg(&mut buffer).await.expect("failed to recv msg");
let two_way_tx_id = 1u8;
assert_eq!(buffer.bytes(), expected_sent_bytes(two_way_tx_id, 1));
let (bytes, handles) = (&mut vec![], &mut vec![]);
let header =
TransactionHeader::new(two_way_tx_id as u32, SEND_ORDINAL, DynamicFlags::empty());
encode_transaction(header, bytes, handles);
server.write_etc(bytes, handles).expect("Server channel write failed");
};
struct Sender {
future: Mutex<Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>>>,
}
let (done_tx, done_rx) = oneshot::channel();
let sender = Arc::new(Sender {
future: Mutex::new(Box::pin(async move {
client
.send_query::<u8, u8, SEND_ORDINAL>(SEND_DATA, DynamicFlags::empty())
.map_ok(|x| assert_eq!(x, SEND_DATA))
.unwrap_or_else(|e| panic!("fidl error: {e:?}"))
.await;
assert!(client.into_channel().is_ok());
let _ = done_tx.send(());
})),
});
// This test isn't typically how this would work; normally, the future would get woken and
// an executor would be responsible for running the task. We do it this way because if this
// works, then it means the case where `into_channel` is used after a response is received
// on a multi-threaded executor will always work (which isn't easy to test directly).
impl ArcWake for Sender {
fn wake_by_ref(arc_self: &Arc<Self>) {
assert!(arc_self
.future
.lock()
.poll_unpin(&mut Context::from_waker(&noop_waker()))
.is_ready());
}
}
let waker = waker(sender.clone());
assert!(sender.future.lock().poll_unpin(&mut Context::from_waker(&waker)).is_pending());
receiver.await;
done_rx.await.unwrap();
}
}