blob: 82fdb4ec9326633cbd5ea65d38061ad421a0b6e7 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//! An implementation of a client for a fidl interface.
#[allow(unused_imports)] // for AsHandleRef (see https://github.com/rust-lang/rust/issues/53682)
use {
crate::{
encoding::{
Encodable,
Decodable,
Encoder,
Decoder,
decode_transaction_header,
TransactionHeader,
TransactionMessage
},
Error,
},
fuchsia_async::{
self as fasync,
temp::{TempFutureExt, Either},
},
fuchsia_zircon::{self as zx, AsHandleRef},
futures::{
future::{self, Future, Ready, AndThen, TryFutureExt},
ready,
stream::{FusedStream, Stream},
task::{LocalWaker, Poll, Waker},
},
parking_lot::Mutex,
slab::Slab,
std::{
collections::VecDeque,
marker::Unpin,
mem,
ops::Deref,
pin::Pin,
sync::Arc,
},
};
/// Decode a new value of a decodable type from a transaction.
fn decode_transaction_body<D: Decodable>(mut buf: zx::MessageBuf) -> Result<D, Error> {
let (bytes, handles) = buf.split_mut();
let header_len = <TransactionHeader as Decodable>::inline_size();
if bytes.len() < header_len { return Err(Error::OutOfRange); }
let (_header_bytes, body_bytes) = bytes.split_at(header_len);
let mut output = D::new_empty();
Decoder::decode_into(body_bytes, handles, &mut output)?;
Ok(output)
}
fn decode_transaction_body_fut<D: Decodable>(buf: zx::MessageBuf) -> Ready<Result<D, Error>> {
future::ready(decode_transaction_body(buf))
}
/// A FIDL client which can be used to send buffers and receive responses via a channel.
#[derive(Debug, Clone)]
pub struct Client {
inner: Arc<ClientInner>,
}
/// A future representing the raw response to a FIDL query.
pub type RawQueryResponseFut = Either<Ready<Result<zx::MessageBuf, Error>>, MessageResponse>;
/// A future representing the decoded response to a FIDL query.
pub type QueryResponseFut<D> =
AndThen<
RawQueryResponseFut,
Ready<Result<D, Error>>,
fn(zx::MessageBuf) -> Ready<Result<D, Error>>>;
/// A FIDL transaction id. Will not be zero for a message that includes a response.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct Txid(u32);
/// A message interest id.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
struct InterestId(usize);
impl InterestId {
fn from_txid(txid: Txid) -> Self {
InterestId(txid.0 as usize - 1)
}
fn as_raw_id(&self) -> usize {
self.0
}
}
impl Txid {
fn from_interest_id(int_id: InterestId) -> Self {
Txid((int_id.0 + 1) as u32)
}
fn as_raw_id(&self) -> u32 {
self.0
}
}
impl Client {
/// Create a new client.
///
/// `channel` is the asynchronous channel over which data is sent and received.
/// `event_ordinals` are the ordinals on which events will be received.
pub fn new(channel: fasync::Channel) -> Client {
Client {
inner: Arc::new(ClientInner {
channel: channel,
message_interests: Mutex::new(Slab::<MessageInterest>::new()),
event_channel: Mutex::<EventChannel>::default(),
})
}
}
/// Attempt to convert the `Client` back into a channel.
///
/// This will only succeed if there are no active clones of this `Client`
/// and no currently-alive `EventReceiver` or `MessageResponse`s that
/// came from this `Client`.
pub fn into_channel(self) -> Result<fasync::Channel, Self> {
match Arc::try_unwrap(self.inner) {
Ok(ClientInner { channel, .. }) => Ok(channel),
Err(inner) => Err(Self { inner })
}
}
/// Retrieve the stream of event messages for the `Client`.
/// Panics if the stream was already taken.
pub fn take_event_receiver(&self) -> EventReceiver {
{
let mut lock = self.inner.event_channel.lock();
if let EventListener::None = lock.listener {
lock.listener = EventListener::New;
} else {
panic!("Event stream was already taken");
}
}
EventReceiver {
inner: Some(self.inner.clone()),
}
}
/// Send an encodable message without expecting a response.
pub fn send<T: Encodable>(&self, msg: &mut T, ordinal: u32) -> Result<(), Error> {
let (buf, handles) = (&mut vec![], &mut vec![]);
let msg = &mut TransactionMessage {
header: TransactionHeader {
tx_id: 0,
flags: 0,
ordinal,
},
body: msg,
};
Encoder::encode(buf, handles, msg)?;
self.send_raw_msg(&**buf, handles)
}
/// Send an encodable query and receive a decodable response.
pub fn send_query<E: Encodable, D: Decodable>(&self, msg: &mut E, ordinal: u32)
-> QueryResponseFut<D>
{
let (buf, handles) = (&mut vec![], &mut vec![]);
let res_fut = self.send_raw_query(|tx_id| {
let msg = &mut TransactionMessage {
header: TransactionHeader {
tx_id: tx_id.as_raw_id(),
flags: 0,
ordinal,
},
body: msg,
};
Encoder::encode(buf, handles, msg)?;
Ok((buf, handles))
});
res_fut.and_then(decode_transaction_body_fut::<D>)
}
/// Send a raw message without expecting a response.
pub fn send_raw_msg(&self, buf: &[u8], handles: &mut Vec<zx::Handle>) -> Result<(), Error> {
Ok(self.inner.channel.write(buf, handles).map_err(Error::ClientWrite)?)
}
/// Send a raw query and receive a response future.
pub fn send_raw_query<'a, F>(&'a self, msg_from_id: F)
-> RawQueryResponseFut
where F: FnOnce(Txid) -> Result<(&'a mut [u8], &'a mut Vec<zx::Handle>), Error>
{
let id = self.inner.register_msg_interest();
let (out_buf, handles) = match msg_from_id(Txid::from_interest_id(id)) {
Ok(x) => x,
Err(e) => return future::ready(Err(e)).left_future(),
};
if let Err(e) = self.inner.channel.write(out_buf, handles) {
return future::ready(Err(Error::ClientWrite(e))).left_future();
}
MessageResponse {
id: Txid::from_interest_id(id),
client: Some(self.inner.clone()),
}.right_future()
}
}
#[must_use]
/// A future which polls for the response to a client message.
#[derive(Debug)]
pub struct MessageResponse {
id: Txid,
// `None` if the message response has been recieved
client: Option<Arc<ClientInner>>,
}
impl Unpin for MessageResponse {}
impl Future for MessageResponse {
type Output = Result<zx::MessageBuf, Error>;
fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
let this = &mut *self;
let res;
{
let client = this.client.as_ref().ok_or(Error::PollAfterCompletion)?;
res = client.poll_recv_msg_response(this.id, lw);
}
// Drop the client reference if the response has been received
if let Poll::Ready(Ok(_)) = res {
let client = this.client.take()
.expect("MessageResponse polled after completion");
client.wake_any();
}
res
}
}
impl Drop for MessageResponse {
fn drop(&mut self) {
if let Some(client) = &self.client {
client.deregister_msg_interest(InterestId::from_txid(self.id));
client.wake_any();
}
}
}
/// An enum reprenting either a resolved message interest or a task on which to alert
/// that a response message has arrived.
#[derive(Debug)]
enum MessageInterest {
/// A new `MessageInterest`
WillPoll,
/// A task is waiting to receive a response, and can be awoken with `Waker`.
Waiting(Waker),
/// A message has been received, and a task will poll to receive it.
Received(zx::MessageBuf),
/// A message has not been received, but the person interested in the response
/// no longer cares about it, so the message should be discared upon arrival.
Discard,
}
impl MessageInterest {
/// Check if a message has been received.
fn is_received(&self) -> bool {
if let MessageInterest::Received(_) = *self {
true
} else {
false
}
}
fn unwrap_received(self) -> zx::MessageBuf {
if let MessageInterest::Received(buf) = self {
buf
} else {
panic!("EXPECTED received message")
}
}
}
/// A stream of events as `MessageBuf`s.
#[derive(Debug)]
pub struct EventReceiver {
inner: Option<Arc<ClientInner>>,
}
impl Unpin for EventReceiver {}
impl FusedStream for EventReceiver {
fn is_terminated(&self) -> bool {
self.inner.is_none()
}
}
impl Stream for EventReceiver {
type Item = Result<zx::MessageBuf, Error>;
fn poll_next(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Option<Self::Item>> {
let inner = self.inner.as_ref().expect("polled EventReceiver after `None`");
Poll::Ready(match ready!(inner.poll_recv_event(lw)) {
Ok(x) => Some(Ok(x)),
Err(Error::ClientRead(zx::Status::PEER_CLOSED)) => {
self.inner = None;
None
},
Err(e) => Some(Err(e)),
})
}
}
impl Drop for EventReceiver {
fn drop(&mut self) {
if let Some(inner) = &self.inner {
inner.event_channel.lock().listener = EventListener::None;
inner.wake_any();
}
}
}
#[derive(Debug, Default)]
struct EventChannel {
listener: EventListener,
queue: VecDeque<zx::MessageBuf>,
}
#[derive(Debug)]
enum EventListener {
/// No one is listening for the event
None,
/// Someone is listening for the event but has not yet polled
New,
/// Someone is listening for the event and can be woken via the `Waker`
Some(Waker),
}
impl Default for EventListener {
fn default() -> Self { EventListener::None }
}
/// A shared client channel which tracks EXPECTED and received responses
#[derive(Debug)]
struct ClientInner {
channel: fasync::Channel,
/// A map of message interests to either `None` (no message received yet)
/// or `Some(DecodeBuf)` when a message has been received.
/// An interest is registered with `register_msg_interest` and deregistered
/// by either receiving a message via a call to `poll_recv` or manually
/// deregistering with `deregister_msg_interest`
message_interests: Mutex<Slab<MessageInterest>>,
/// A queue of received events and a waker for the task to receive them.
event_channel: Mutex<EventChannel>,
}
impl Deref for Client {
type Target = fasync::Channel;
fn deref(&self) -> &Self::Target {
&self.inner.channel
}
}
impl ClientInner {
/// Registers interest in a response message.
///
/// This function returns a `usize` ID which should be used to send a message
/// via the channel. Responses are then received using `poll_recv`.
fn register_msg_interest(&self) -> InterestId {
// TODO(cramertj) use `try_from` here and assert that the conversion from
// `usize` to `u32` hasn't overflowed.
InterestId(self.message_interests.lock().insert(
MessageInterest::WillPoll))
}
fn poll_recv_event(
&self,
lw: &LocalWaker,
) -> Poll<Result<zx::MessageBuf, Error>> {
let is_closed = self.recv_all(lw)?;
let mut lock = self.event_channel.lock();
if let Some(msg_buf) = lock.queue.pop_front() {
Poll::Ready(Ok(msg_buf))
} else {
lock.listener = EventListener::Some(lw.clone().into_waker());
if is_closed {
Poll::Ready(Err(Error::ClientRead(zx::Status::PEER_CLOSED)))
} else {
Poll::Pending
}
}
}
fn poll_recv_msg_response(
&self,
txid: Txid,
lw: &LocalWaker,
) -> Poll<Result<zx::MessageBuf, Error>> {
let is_closed = self.recv_all(lw)?;
let mut message_interests = self.message_interests.lock();
let interest_id = InterestId::from_txid(txid);
if message_interests.get(interest_id.as_raw_id())
.expect("Polled unregistered interest")
.is_received()
{
// If, by happy accident, we just raced to getting the result,
// then yay! Return success.
let buf = message_interests.remove(interest_id.as_raw_id()).unwrap_received();
Poll::Ready(Ok(buf))
} else {
// Set the current waker to be notified when a response arrives.
*message_interests.get_mut(interest_id.as_raw_id())
.expect("Polled unregistered interest") =
MessageInterest::Waiting(lw.clone().into_waker());
if is_closed {
Poll::Ready(Err(Error::ClientRead(zx::Status::PEER_CLOSED)))
} else {
Poll::Pending
}
}
}
/// Poll for the receipt of any response message or an event.
///
/// Returns whether or not the channel is closed.
fn recv_all(
&self,
lw: &LocalWaker,
) -> Result<bool, Error> {
// TODO(cramertj) return errors if one has occured _ever_ in recv_all, not just if
// one happens on this call.
loop {
let mut buf = zx::MessageBuf::new();
match self.channel.recv_from(&mut buf, lw) {
Poll::Ready(Ok(())) => {}
Poll::Ready(Err(zx::Status::PEER_CLOSED)) => return Ok(true),
Poll::Ready(Err(e)) => return Err(Error::ClientRead(e)),
Poll::Pending => return Ok(false),
}
let (header, _) = decode_transaction_header(buf.bytes()).map_err(|_| Error::InvalidHeader)?;
if header.tx_id == 0 { // received an event
let mut lock = self.event_channel.lock();
lock.queue.push_back(buf);
if let EventListener::Some(ref waker) = lock.listener { waker.wake(); }
} else { // received a message response
let recvd_interest_id = InterestId::from_txid(Txid(header.tx_id));
// Look for a message interest with the given ID.
// If one is found, store the message so that it can be picked up later.
let mut message_interests = self.message_interests.lock();
let raw_recvd_interest_id = recvd_interest_id.as_raw_id();
if let Some(&MessageInterest::Discard) =
message_interests.get(raw_recvd_interest_id)
{
message_interests.remove(raw_recvd_interest_id);
} else if let Some(entry) = message_interests.get_mut(raw_recvd_interest_id) {
let old_entry = mem::replace(entry, MessageInterest::Received(buf));
if let MessageInterest::Waiting(waker) = old_entry {
// Wake up the task to let them know a message has arrived.
waker.wake();
}
}
}
}
}
fn deregister_msg_interest(&self, InterestId(id): InterestId) {
let mut lock = self.message_interests.lock();
if lock[id].is_received() {
lock.remove(id);
} else {
lock[id] = MessageInterest::Discard;
}
}
// Wakes up an arbitrary task that has begun polling on the channel so that
// it will call recv_all and be registered as the new channel reader.
fn wake_any(&self) {
// Try to wake up message interests first, rather than the event listener.
// The event listener is a stream, and so could be between poll_nexts,
// blocked on a message interest completing before it begins polling again.
// Waking up only the event listener in these cases would cause a deadlock
// since the channel wouldn't be read from by the awoken event listener,
// so the message interest the event listener was blocked on would never
// complete.
//
// Message interests, however, should always be actively polled once
// they've begun being polled on a task.
{
let lock = self.message_interests.lock();
for (_, message_interest) in lock.iter() {
if let MessageInterest::Waiting(waker) = message_interest {
waker.wake();
return
}
}
}
{
let lock = self.event_channel.lock();
if let EventListener::Some(waker) = &lock.listener {
waker.wake();
return;
}
}
}
}
pub mod sync {
//! Synchronous FIDL Client
use super::*;
/// A synchronous client for making FIDL calls.
#[derive(Debug)]
pub struct Client {
// Underlying channel
channel: zx::Channel,
// Reusable buffer for r/w
buf: zx::MessageBuf,
}
// TODO: remove this and allow multiple overlapping queries on the same channel.
pub(super) const QUERY_TX_ID: u32 = 42;
impl Client {
/// Create a new synchronous FIDL client.
pub fn new(channel: zx::Channel) -> Self {
Client { channel, buf: zx::MessageBuf::new() }
}
/// Get the underlying channel out of the client.
pub fn into_channel(self) -> zx::Channel {
self.channel
}
/// Send a new message.
pub fn send<E: Encodable>(&mut self, msg: &mut E, ordinal: u32) -> Result<(), Error> {
self.buf.clear();
let (buf, handles) = self.buf.split_mut();
let msg = &mut TransactionMessage {
header: TransactionHeader {
tx_id: 0,
flags: 0,
ordinal,
},
body: msg,
};
Encoder::encode(buf, handles, msg)?;
self.channel.write(buf, handles).map_err(Error::ClientWrite)?;
Ok(())
}
/// Send a new message expecting a response.
pub fn send_query<E: Encodable, D: Decodable>(
&mut self,
msg: &mut E,
ordinal: u32,
deadline: zx::Time,
) -> Result<D, Error> {
// Write the message into the channel
self.buf.clear();
let (buf, handles) = self.buf.split_mut();
let msg = &mut TransactionMessage {
header: TransactionHeader {
tx_id: QUERY_TX_ID,
flags: 0,
ordinal,
},
body: msg,
};
Encoder::encode(buf, handles, msg)?;
self.channel.write(buf, handles).map_err(Error::ClientWrite)?;
// Read the response
self.buf.clear();
match self.channel.read(&mut self.buf) {
Ok(()) => {},
Err(zx::Status::SHOULD_WAIT) => {
let signals = self.channel.wait_handle(
zx::Signals::CHANNEL_READABLE | zx::Signals::CHANNEL_PEER_CLOSED,
deadline,
).map_err(Error::ClientRead)?;
if !signals.contains(zx::Signals::CHANNEL_READABLE) {
debug_assert!(signals.contains(zx::Signals::CHANNEL_PEER_CLOSED));
return Err(Error::ClientRead(zx::Status::PEER_CLOSED));
}
self.channel.read(&mut self.buf).map_err(Error::ClientRead)?;
},
Err(e) => return Err(Error::ClientRead(e)),
}
let (buf, handles) = self.buf.split_mut();
let (header, body_bytes) = decode_transaction_header(buf)?;
if header.tx_id != QUERY_TX_ID || header.ordinal != ordinal {
return Err(Error::UnexpectedSyncResponse);
}
let mut output = D::new_empty();
Decoder::decode_into(body_bytes, handles, &mut output)?;
Ok(output)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use {
failure::{Error, ResultExt},
futures::{FutureExt, StreamExt},
fuchsia_async::TimeoutExt,
fuchsia_zircon::DurationNum,
std::{io, thread},
};
const SEND_EXPECTED: &[u8] = &[
0, 0, 0, 0, 0, 0, 0, 0, // 32 bit tx_id followed by 32 bits of padding
0, 0, 0, 0, // 32 bits for flags
SEND_ORDINAL_HIGH_BYTE, 0, 0, 0, // 32 bit ordinal
SEND_DATA, // 8 bit data
0, 0, 0, 0, 0, 0, 0, // 7 bytes of padding after our 1 byte of data
];
const SEND_ORDINAL_HIGH_BYTE: u8 = 42;
const SEND_ORDINAL: u32 = 42;
const SEND_DATA: u8 = 55;
#[test]
fn sync_client() -> Result<(), Error> {
let (client_end, server_end) = zx::Channel::create().context("chan create")?;
let mut client = sync::Client::new(client_end);
client.send(&mut SEND_DATA, SEND_ORDINAL).context("sending")?;
let mut received = zx::MessageBuf::new();
server_end.read(&mut received).context("reading")?;
assert_eq!(SEND_EXPECTED, received.bytes());
Ok(())
}
#[test]
fn sync_client_with_response() -> Result<(), Error> {
let (client_end, server_end) = zx::Channel::create().context("chan create")?;
let mut client = sync::Client::new(client_end);
thread::spawn(move || {
// Server
let mut received = zx::MessageBuf::new();
server_end.wait_handle(zx::Signals::CHANNEL_READABLE, 5.seconds().after_now())
.expect("failed to wait for channel readable");
server_end.read(&mut received).expect("failed to read on server end");
let (buf, handles) = received.split_mut();
let (header, _body_bytes) = decode_transaction_header(buf).expect("server decode");
assert_eq!(header.tx_id, sync::QUERY_TX_ID);
assert_eq!(header.ordinal, SEND_ORDINAL);
let response = &mut TransactionMessage {
header: TransactionHeader {
tx_id: header.tx_id,
flags: 0,
ordinal: header.ordinal,
},
body: &mut SEND_DATA,
};
Encoder::encode(buf, handles, response).expect("Encoding failure");
server_end.write(buf, handles).expect("Server channel write failed");
});
let response_data = client.send_query::<u8, u8>(
&mut SEND_DATA, SEND_ORDINAL, 5.seconds().after_now(),
).context("sending query")?;
assert_eq!(SEND_DATA, response_data);
Ok(())
}
#[test]
fn client() {
let mut executor = fasync::Executor::new().unwrap();
let (client_end, server_end) = zx::Channel::create().unwrap();
let client_end = fasync::Channel::from_channel(client_end).unwrap();
let client = Client::new(client_end);
let server = fasync::Channel::from_channel(server_end).unwrap();
let receiver = async move {
let mut buffer = zx::MessageBuf::new();
await!(server.recv_msg(&mut buffer)).expect("failed to recv msg");
assert_eq!(SEND_EXPECTED, buffer.bytes());
};
// add a timeout to receiver so if test is broken it doesn't take forever
let receiver = receiver.on_timeout(
300.millis().after_now(),
|| panic!("did not receive message in time!"));
let sender = fasync::Timer::new(100.millis().after_now()).map(|()|{
client.send(&mut SEND_DATA, SEND_ORDINAL).expect("failed to send msg");
});
executor.run_singlethreaded(receiver.join(sender));
}
#[test]
fn client_with_response() {
const EXPECTED: &[u8] = &[
1, 0, 0, 0, 0, 0, 0, 0, // 32 bit tx_id followed by 32 bits of padding
0, 0, 0, 0, // 32 bits for flags
42, 0, 0, 0, // 32 bit ordinal
55, // 8 bit data
0, 0, 0, 0, 0, 0, 0, // 7 bytes of padding after our 1 byte of data
];
let mut executor = fasync::Executor::new().unwrap();
let (client_end, server_end) = zx::Channel::create().unwrap();
let client_end = fasync::Channel::from_channel(client_end).unwrap();
let client = Client::new(client_end);
let server = fasync::Channel::from_channel(server_end).unwrap();
let mut buffer = zx::MessageBuf::new();
let receiver = async move {
await!(server.recv_msg(&mut buffer)).expect("failed to recv msg");
assert_eq!(EXPECTED, buffer.bytes());
let id = 1; // internally, the first slot in a slab returns a `0`. We then add one
// since FIDL txids start with `1`.
let response = &mut TransactionMessage {
header: TransactionHeader {
tx_id: id,
flags: 0,
ordinal: 42,
},
body: &mut 55,
};
let (bytes, handles) = (&mut vec![], &mut vec![]);
Encoder::encode(bytes, handles, response).expect("Encoding failure");
server.write(bytes, handles).expect("Server channel write failed");
};
// add a timeout to receiver so if test is broken it doesn't take forever
let receiver = receiver.on_timeout(
300.millis().after_now(),
|| panic!("did not receiver message in time!"
));
let sender = client.send_query::<u8, u8>(&mut 55, 42)
.map_ok(|x|assert_eq!(x, 55))
.map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
&*format!("fidl error: {:?}", e))
});
// add a timeout to receiver so if test is broken it doesn't take forever
let sender = sender.on_timeout(
300.millis().after_now(),
|| panic!("did not receive response in time!")
);
executor.run_singlethreaded(receiver.join(sender));
}
#[test]
#[should_panic]
fn event_cant_be_taken_twice() {
let _exec = fasync::Executor::new().unwrap();
let (client_end, _) = zx::Channel::create().unwrap();
let client_end = fasync::Channel::from_channel(client_end).unwrap();
let client = Client::new(client_end);
let _foo = client.take_event_receiver();
client.take_event_receiver();
}
#[test]
fn event_can_be_taken() {
let _exec = fasync::Executor::new().unwrap();
let (client_end, _) = zx::Channel::create().unwrap();
let client_end = fasync::Channel::from_channel(client_end).unwrap();
let client = Client::new(client_end);
client.take_event_receiver();
}
#[test]
fn event_received() {
let mut executor = fasync::Executor::new().unwrap();
let (client_end, server_end) = zx::Channel::create().unwrap();
let client_end = fasync::Channel::from_channel(client_end).unwrap();
let client = Client::new(client_end);
// Send the event from the server
let server = fasync::Channel::from_channel(server_end).unwrap();
let event = &mut TransactionMessage {
header: TransactionHeader {
tx_id: 0,
flags: 0,
ordinal: 5,
},
body: &mut 55i32,
};
let (bytes, handles) = (&mut vec![], &mut vec![]);
Encoder::encode(bytes, handles, event).expect("Encoding failure");
server.write(bytes, handles).expect("Server channel write failed");
drop(server);
let recv = client.take_event_receiver()
.into_future()
.then(|(x, stream)| {
let x = x.expect("should contain one element");
let x = x.expect("fidl error");
let x: i32 = decode_transaction_body(x).expect("failed to decode event");
assert_eq!(x, 55);
stream.into_future()
})
.map(|(x, _stream)| assert!(x.is_none(), "should have emptied"));
// add a timeout to receiver so if test is broken it doesn't take forever
let recv = recv.on_timeout(
300.millis().after_now(),
|| panic!("did not receive event in time!")
);
executor.run_singlethreaded(recv);
}
}