| // Copyright 2018 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| //! An implementation of a client for a fidl interface. |
| |
| #[allow(unused_imports)] |
| // for AsHandleRef (see https://github.com/rust-lang/rust/issues/53682) |
| use { |
| crate::{ |
| encoding::{ |
| decode_transaction_header, Decodable, Decoder, Encodable, Encoder, TransactionHeader, |
| TransactionMessage, |
| }, |
| Error, |
| }, |
| fuchsia_async::{ |
| self as fasync, |
| temp::{Either, TempFutureExt}, |
| }, |
| fuchsia_zircon::{self as zx, AsHandleRef}, |
| futures::{ |
| future::{self, AndThen, Future, Ready, TryFutureExt}, |
| ready, |
| stream::{FusedStream, Stream}, |
| task::{Poll, Waker}, |
| }, |
| parking_lot::Mutex, |
| slab::Slab, |
| std::{collections::VecDeque, marker::Unpin, mem, ops::Deref, pin::Pin, sync::Arc}, |
| }; |
| |
| /// Decode a new value of a decodable type from a transaction. |
| fn decode_transaction_body<D: Decodable>(mut buf: zx::MessageBuf) -> Result<D, Error> { |
| let (bytes, handles) = buf.split_mut(); |
| let header_len = <TransactionHeader as Decodable>::inline_size(); |
| if bytes.len() < header_len { |
| return Err(Error::OutOfRange); |
| } |
| let (_header_bytes, body_bytes) = bytes.split_at(header_len); |
| |
| let mut output = D::new_empty(); |
| Decoder::decode_into(body_bytes, handles, &mut output)?; |
| Ok(output) |
| } |
| |
| fn decode_transaction_body_fut<D: Decodable>(buf: zx::MessageBuf) -> Ready<Result<D, Error>> { |
| future::ready(decode_transaction_body(buf)) |
| } |
| |
| /// A FIDL client which can be used to send buffers and receive responses via a channel. |
| #[derive(Debug, Clone)] |
| pub struct Client { |
| inner: Arc<ClientInner>, |
| } |
| |
| /// A future representing the raw response to a FIDL query. |
| pub type RawQueryResponseFut = Either<Ready<Result<zx::MessageBuf, Error>>, MessageResponse>; |
| |
| /// A future representing the decoded response to a FIDL query. |
| pub type QueryResponseFut<D> = AndThen< |
| RawQueryResponseFut, |
| Ready<Result<D, Error>>, |
| fn(zx::MessageBuf) -> Ready<Result<D, Error>>, |
| >; |
| |
| /// A FIDL transaction id. Will not be zero for a message that includes a response. |
| #[derive(Debug, Copy, Clone, PartialEq, Eq)] |
| pub struct Txid(u32); |
| /// A message interest id. |
| #[derive(Debug, Copy, Clone, PartialEq, Eq)] |
| struct InterestId(usize); |
| |
| impl InterestId { |
| fn from_txid(txid: Txid) -> Self { |
| InterestId(txid.0 as usize - 1) |
| } |
| |
| fn as_raw_id(&self) -> usize { |
| self.0 |
| } |
| } |
| |
| impl Txid { |
| fn from_interest_id(int_id: InterestId) -> Self { |
| Txid((int_id.0 + 1) as u32) |
| } |
| |
| fn as_raw_id(&self) -> u32 { |
| self.0 |
| } |
| } |
| |
| impl Client { |
| /// Create a new client. |
| /// |
| /// `channel` is the asynchronous channel over which data is sent and received. |
| /// `event_ordinals` are the ordinals on which events will be received. |
| pub fn new(channel: fasync::Channel) -> Client { |
| Client { |
| inner: Arc::new(ClientInner { |
| channel: channel, |
| message_interests: Mutex::new(Slab::<MessageInterest>::new()), |
| event_channel: Mutex::<EventChannel>::default(), |
| }), |
| } |
| } |
| |
| /// Attempt to convert the `Client` back into a channel. |
| /// |
| /// This will only succeed if there are no active clones of this `Client` |
| /// and no currently-alive `EventReceiver` or `MessageResponse`s that |
| /// came from this `Client`. |
| pub fn into_channel(self) -> Result<fasync::Channel, Self> { |
| match Arc::try_unwrap(self.inner) { |
| Ok(ClientInner { channel, .. }) => Ok(channel), |
| Err(inner) => Err(Self { inner }), |
| } |
| } |
| |
| /// Retrieve the stream of event messages for the `Client`. |
| /// Panics if the stream was already taken. |
| pub fn take_event_receiver(&self) -> EventReceiver { |
| { |
| let mut lock = self.inner.event_channel.lock(); |
| |
| if let EventListener::None = lock.listener { |
| lock.listener = EventListener::WillPoll; |
| } else { |
| panic!("Event stream was already taken"); |
| } |
| } |
| |
| EventReceiver { inner: Some(self.inner.clone()) } |
| } |
| |
| /// Send an encodable message without expecting a response. |
| pub fn send<T: Encodable>(&self, body: &mut T, ordinal: u32) -> Result<(), Error> { |
| let msg = &mut TransactionMessage { |
| header: TransactionHeader { tx_id: 0, flags: 0, ordinal }, |
| body, |
| }; |
| crate::encoding::with_tls_encoded(msg, |bytes, handles| { |
| self.send_raw_msg(&**bytes, handles) |
| }) |
| } |
| |
| /// Send an encodable query and receive a decodable response. |
| pub fn send_query<E: Encodable, D: Decodable>( |
| &self, |
| msg: &mut E, |
| ordinal: u32, |
| ) -> QueryResponseFut<D> { |
| let res_fut = self.send_raw_query(|tx_id, bytes, handles| { |
| let msg = &mut TransactionMessage { |
| header: TransactionHeader { tx_id: tx_id.as_raw_id(), flags: 0, ordinal }, |
| body: msg, |
| }; |
| Encoder::encode(bytes, handles, msg)?; |
| Ok(()) |
| }); |
| |
| res_fut.and_then(decode_transaction_body_fut::<D>) |
| } |
| |
| /// Send a raw message without expecting a response. |
| pub fn send_raw_msg(&self, buf: &[u8], handles: &mut Vec<zx::Handle>) -> Result<(), Error> { |
| Ok(self.inner.channel.write(buf, handles).map_err(Error::ClientWrite)?) |
| } |
| |
| /// Send a raw query and receive a response future. |
| pub fn send_raw_query<F>(&self, msg_from_id: F) -> RawQueryResponseFut |
| where |
| F: for<'a, 'b> FnOnce(Txid, &'a mut Vec<u8>, &'b mut Vec<zx::Handle>) -> Result<(), Error>, |
| { |
| let id = self.inner.register_msg_interest(); |
| let res = crate::encoding::with_tls_coding_bufs(|bytes, handles| { |
| msg_from_id(Txid::from_interest_id(id), bytes, handles)?; |
| self.inner.channel.write(bytes, handles).map_err(Error::ClientWrite)?; |
| Ok::<(), Error>(()) |
| }); |
| |
| match res { |
| Ok(()) => { |
| MessageResponse { id: Txid::from_interest_id(id), client: Some(self.inner.clone()) } |
| .right_future() |
| } |
| Err(e) => futures::future::ready(Err(e)).left_future(), |
| } |
| } |
| } |
| |
| #[must_use] |
| /// A future which polls for the response to a client message. |
| #[derive(Debug)] |
| pub struct MessageResponse { |
| id: Txid, |
| // `None` if the message response has been recieved |
| client: Option<Arc<ClientInner>>, |
| } |
| |
| impl Unpin for MessageResponse {} |
| |
| impl Future for MessageResponse { |
| type Output = Result<zx::MessageBuf, Error>; |
| fn poll(mut self: Pin<&mut Self>, lw: &Waker) -> Poll<Self::Output> { |
| let this = &mut *self; |
| let res; |
| { |
| let client = this.client.as_ref().ok_or(Error::PollAfterCompletion)?; |
| res = client.poll_recv_msg_response(this.id, lw); |
| } |
| |
| // Drop the client reference if the response has been received |
| if let Poll::Ready(Ok(_)) = res { |
| let client = this.client.take().expect("MessageResponse polled after completion"); |
| client.wake_any(); |
| } |
| |
| res |
| } |
| } |
| |
| impl Drop for MessageResponse { |
| fn drop(&mut self) { |
| if let Some(client) = &self.client { |
| client.deregister_msg_interest(InterestId::from_txid(self.id)); |
| client.wake_any(); |
| } |
| } |
| } |
| |
| /// An enum reprenting either a resolved message interest or a task on which to alert |
| /// that a response message has arrived. |
| #[derive(Debug)] |
| enum MessageInterest { |
| /// A new `MessageInterest` |
| WillPoll, |
| /// A task is waiting to receive a response, and can be awoken with `Waker`. |
| Waiting(Waker), |
| /// A message has been received, and a task will poll to receive it. |
| Received(zx::MessageBuf), |
| /// A message has not been received, but the person interested in the response |
| /// no longer cares about it, so the message should be discared upon arrival. |
| Discard, |
| } |
| |
| impl MessageInterest { |
| /// Check if a message has been received. |
| fn is_received(&self) -> bool { |
| if let MessageInterest::Received(_) = *self { |
| true |
| } else { |
| false |
| } |
| } |
| |
| fn unwrap_received(self) -> zx::MessageBuf { |
| if let MessageInterest::Received(buf) = self { |
| buf |
| } else { |
| panic!("EXPECTED received message") |
| } |
| } |
| } |
| |
| /// A stream of events as `MessageBuf`s. |
| #[derive(Debug)] |
| pub struct EventReceiver { |
| inner: Option<Arc<ClientInner>>, |
| } |
| |
| impl Unpin for EventReceiver {} |
| |
| impl FusedStream for EventReceiver { |
| fn is_terminated(&self) -> bool { |
| self.inner.is_none() |
| } |
| } |
| |
| impl Stream for EventReceiver { |
| type Item = Result<zx::MessageBuf, Error>; |
| |
| fn poll_next(mut self: Pin<&mut Self>, lw: &Waker) -> Poll<Option<Self::Item>> { |
| let inner = self.inner.as_ref().expect("polled EventReceiver after `None`"); |
| Poll::Ready(match ready!(inner.poll_recv_event(lw)) { |
| Ok(x) => Some(Ok(x)), |
| Err(Error::ClientRead(zx::Status::PEER_CLOSED)) => { |
| self.inner = None; |
| None |
| } |
| Err(e) => Some(Err(e)), |
| }) |
| } |
| } |
| |
| impl Drop for EventReceiver { |
| fn drop(&mut self) { |
| if let Some(inner) = &self.inner { |
| inner.event_channel.lock().listener = EventListener::None; |
| inner.wake_any(); |
| } |
| } |
| } |
| |
| #[derive(Debug, Default)] |
| struct EventChannel { |
| listener: EventListener, |
| queue: VecDeque<zx::MessageBuf>, |
| } |
| |
| #[derive(Debug)] |
| enum EventListener { |
| /// No one is listening for the event |
| None, |
| /// Someone is listening for the event but has not yet polled |
| WillPoll, |
| /// Someone is listening for the event and can be woken via the `Waker` |
| Some(Waker), |
| } |
| |
| impl Default for EventListener { |
| fn default() -> Self { |
| EventListener::None |
| } |
| } |
| |
| /// A shared client channel which tracks EXPECTED and received responses |
| #[derive(Debug)] |
| struct ClientInner { |
| channel: fasync::Channel, |
| |
| /// A map of message interests to either `None` (no message received yet) |
| /// or `Some(DecodeBuf)` when a message has been received. |
| /// An interest is registered with `register_msg_interest` and deregistered |
| /// by either receiving a message via a call to `poll_recv` or manually |
| /// deregistering with `deregister_msg_interest` |
| message_interests: Mutex<Slab<MessageInterest>>, |
| |
| /// A queue of received events and a waker for the task to receive them. |
| event_channel: Mutex<EventChannel>, |
| } |
| |
| impl Deref for Client { |
| type Target = fasync::Channel; |
| |
| fn deref(&self) -> &Self::Target { |
| &self.inner.channel |
| } |
| } |
| |
| impl ClientInner { |
| /// Registers interest in a response message. |
| /// |
| /// This function returns a `usize` ID which should be used to send a message |
| /// via the channel. Responses are then received using `poll_recv`. |
| fn register_msg_interest(&self) -> InterestId { |
| // TODO(cramertj) use `try_from` here and assert that the conversion from |
| // `usize` to `u32` hasn't overflowed. |
| InterestId(self.message_interests.lock().insert(MessageInterest::WillPoll)) |
| } |
| |
| fn poll_recv_event(&self, lw: &Waker) -> Poll<Result<zx::MessageBuf, Error>> { |
| { |
| // Update the EventListener with the latest waker, remove any stale WillPoll state |
| let mut lock = self.event_channel.lock(); |
| lock.listener = EventListener::Some(lw.clone()); |
| } |
| |
| let is_closed = self.recv_all()?; |
| |
| let mut lock = self.event_channel.lock(); |
| |
| if let Some(msg_buf) = lock.queue.pop_front() { |
| Poll::Ready(Ok(msg_buf)) |
| } else { |
| if is_closed { |
| Poll::Ready(Err(Error::ClientRead(zx::Status::PEER_CLOSED))) |
| } else { |
| Poll::Pending |
| } |
| } |
| } |
| |
| fn poll_recv_msg_response( |
| &self, |
| txid: Txid, |
| lw: &Waker, |
| ) -> Poll<Result<zx::MessageBuf, Error>> { |
| let interest_id = InterestId::from_txid(txid); |
| { |
| // If have not yet received anything, update any stale WillPoll or Waiting(stale waker) |
| // message interest for the current message. |
| let mut message_interests = self.message_interests.lock(); |
| let message_interest = message_interests |
| .get_mut(interest_id.as_raw_id()) |
| .expect("Polled unregistered interest"); |
| if !message_interest.is_received() { |
| *message_interest = MessageInterest::Waiting(lw.clone()); |
| } |
| } |
| |
| let is_closed = self.recv_all()?; |
| |
| let mut message_interests = self.message_interests.lock(); |
| if message_interests |
| .get(interest_id.as_raw_id()) |
| .expect("Polled unregistered interest") |
| .is_received() |
| { |
| // If, by happy accident, we just raced to getting the result, |
| // then yay! Return success. |
| let buf = message_interests.remove(interest_id.as_raw_id()).unwrap_received(); |
| Poll::Ready(Ok(buf)) |
| } else { |
| if is_closed { |
| Poll::Ready(Err(Error::ClientRead(zx::Status::PEER_CLOSED))) |
| } else { |
| Poll::Pending |
| } |
| } |
| } |
| |
| /// Poll for the receipt of any response message or an event. |
| /// |
| /// Returns whether or not the channel is closed. |
| fn recv_all(&self) -> Result<bool, Error> { |
| // TODO(cramertj) return errors if one has occured _ever_ in recv_all, not just if |
| // one happens on this call. |
| loop { |
| let waker = match self.get_pending_waker() { |
| Some(v) => v, |
| None => return Ok(false), |
| }; |
| |
| let mut buf = zx::MessageBuf::new(); |
| match self.channel.recv_from(&mut buf, &waker) { |
| Poll::Ready(Ok(())) => {} |
| Poll::Ready(Err(zx::Status::PEER_CLOSED)) => return Ok(true), |
| Poll::Ready(Err(e)) => return Err(Error::ClientRead(e)), |
| Poll::Pending => return Ok(false), |
| } |
| |
| let (header, _) = |
| decode_transaction_header(buf.bytes()).map_err(|_| Error::InvalidHeader)?; |
| if header.tx_id == 0 { |
| // received an event |
| let mut lock = self.event_channel.lock(); |
| lock.queue.push_back(buf); |
| if let EventListener::Some(_) = lock.listener { |
| if let EventListener::Some(waker) = |
| mem::replace(&mut lock.listener, EventListener::WillPoll) |
| { |
| waker.wake() |
| } |
| } |
| } else { |
| // received a message response |
| let recvd_interest_id = InterestId::from_txid(Txid(header.tx_id)); |
| |
| // Look for a message interest with the given ID. |
| // If one is found, store the message so that it can be picked up later. |
| let mut message_interests = self.message_interests.lock(); |
| let raw_recvd_interest_id = recvd_interest_id.as_raw_id(); |
| if let Some(&MessageInterest::Discard) = |
| message_interests.get(raw_recvd_interest_id) |
| { |
| message_interests.remove(raw_recvd_interest_id); |
| } else if let Some(entry) = message_interests.get_mut(raw_recvd_interest_id) { |
| let old_entry = mem::replace(entry, MessageInterest::Received(buf)); |
| if let MessageInterest::Waiting(waker) = old_entry { |
| // Wake up the task to let them know a message has arrived. |
| waker.wake(); |
| } |
| } |
| } |
| } |
| } |
| |
| fn deregister_msg_interest(&self, InterestId(id): InterestId) { |
| let mut lock = self.message_interests.lock(); |
| if lock[id].is_received() { |
| lock.remove(id); |
| } else { |
| lock[id] = MessageInterest::Discard; |
| } |
| } |
| |
| /// Gets an arbitrary task that has polled on this channel and has not yet |
| /// gotten a response. |
| fn get_pending_waker(&self) -> Option<Waker> { |
| { |
| let lock = self.message_interests.lock(); |
| for (_, message_interest) in lock.iter() { |
| if let MessageInterest::Waiting(waker) = message_interest { |
| return Some(waker.clone()); |
| } |
| } |
| } |
| { |
| let lock = self.event_channel.lock(); |
| if let EventListener::Some(waker) = &lock.listener { |
| return Some(waker.clone()); |
| } |
| } |
| None |
| } |
| |
| /// Wakes up an arbitrary task that has begun polling on the channel so that |
| /// it will call recv_all. |
| fn wake_any(&self) { |
| if let Some(waker) = self.get_pending_waker() { |
| waker.wake(); |
| } |
| } |
| } |
| |
| pub mod sync { |
| //! Synchronous FIDL Client |
| use super::*; |
| |
| /// A synchronous client for making FIDL calls. |
| #[derive(Debug)] |
| pub struct Client { |
| // Underlying channel |
| channel: zx::Channel, |
| |
| // Reusable buffer for r/w |
| buf: zx::MessageBuf, |
| } |
| |
| // TODO: remove this and allow multiple overlapping queries on the same channel. |
| pub(super) const QUERY_TX_ID: u32 = 42; |
| |
| impl Client { |
| /// Create a new synchronous FIDL client. |
| pub fn new(channel: zx::Channel) -> Self { |
| Client { channel, buf: zx::MessageBuf::new() } |
| } |
| |
| /// Get the underlying channel out of the client. |
| pub fn into_channel(self) -> zx::Channel { |
| self.channel |
| } |
| |
| /// Send a new message. |
| pub fn send<E: Encodable>(&mut self, msg: &mut E, ordinal: u32) -> Result<(), Error> { |
| self.buf.clear(); |
| let (buf, handles) = self.buf.split_mut(); |
| let msg = &mut TransactionMessage { |
| header: TransactionHeader { tx_id: 0, flags: 0, ordinal }, |
| body: msg, |
| }; |
| Encoder::encode(buf, handles, msg)?; |
| self.channel.write(buf, handles).map_err(Error::ClientWrite)?; |
| Ok(()) |
| } |
| |
| /// Send a new message expecting a response. |
| pub fn send_query<E: Encodable, D: Decodable>( |
| &mut self, |
| msg: &mut E, |
| ordinal: u32, |
| deadline: zx::Time, |
| ) -> Result<D, Error> { |
| // Write the message into the channel |
| self.buf.clear(); |
| let (buf, handles) = self.buf.split_mut(); |
| let msg = &mut TransactionMessage { |
| header: TransactionHeader { tx_id: QUERY_TX_ID, flags: 0, ordinal }, |
| body: msg, |
| }; |
| Encoder::encode(buf, handles, msg)?; |
| self.channel.write(buf, handles).map_err(Error::ClientWrite)?; |
| |
| // Read the response |
| self.buf.clear(); |
| match self.channel.read(&mut self.buf) { |
| Ok(()) => {} |
| Err(zx::Status::SHOULD_WAIT) => { |
| let signals = self |
| .channel |
| .wait_handle( |
| zx::Signals::CHANNEL_READABLE | zx::Signals::CHANNEL_PEER_CLOSED, |
| deadline, |
| ) |
| .map_err(Error::ClientRead)?; |
| if !signals.contains(zx::Signals::CHANNEL_READABLE) { |
| debug_assert!(signals.contains(zx::Signals::CHANNEL_PEER_CLOSED)); |
| return Err(Error::ClientRead(zx::Status::PEER_CLOSED)); |
| } |
| self.channel.read(&mut self.buf).map_err(Error::ClientRead)?; |
| } |
| Err(e) => return Err(Error::ClientRead(e)), |
| } |
| let (buf, handles) = self.buf.split_mut(); |
| let (header, body_bytes) = decode_transaction_header(buf)?; |
| if header.tx_id != QUERY_TX_ID || header.ordinal != ordinal { |
| return Err(Error::UnexpectedSyncResponse); |
| } |
| let mut output = D::new_empty(); |
| Decoder::decode_into(body_bytes, handles, &mut output)?; |
| Ok(output) |
| } |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use super::*; |
| use { |
| crate::wake_counter, |
| failure::{Error, ResultExt}, |
| fuchsia_async::TimeoutExt, |
| fuchsia_zircon::DurationNum, |
| futures::{FutureExt, StreamExt}, |
| std::{io, thread}, |
| }; |
| |
| #[rustfmt::skip] |
| const SEND_EXPECTED: &[u8] = &[ |
| 0, 0, 0, 0, 0, 0, 0, 0, // 32 bit tx_id followed by 32 bits of padding |
| 0, 0, 0, 0, // 32 bits for flags |
| SEND_ORDINAL_HIGH_BYTE, 0, 0, 0, // 32 bit ordinal |
| SEND_DATA, // 8 bit data |
| 0, 0, 0, 0, 0, 0, 0, // 7 bytes of padding after our 1 byte of data |
| ]; |
| const SEND_ORDINAL_HIGH_BYTE: u8 = 42; |
| const SEND_ORDINAL: u32 = 42; |
| const SEND_DATA: u8 = 55; |
| |
| fn send_transaction(header: TransactionHeader, channel: &zx::Channel) { |
| let (bytes, handles) = (&mut vec![], &mut vec![]); |
| encode_transaction(header, bytes, handles); |
| channel.write(bytes, handles).expect("Server channel write failed"); |
| } |
| |
| fn encode_transaction( |
| header: TransactionHeader, |
| bytes: &mut Vec<u8>, |
| handles: &mut Vec<zx::Handle>, |
| ) { |
| let event = &mut TransactionMessage { header: header, body: &mut SEND_DATA }; |
| Encoder::encode(bytes, handles, event).expect("Encoding failure"); |
| } |
| |
| #[test] |
| fn sync_client() -> Result<(), Error> { |
| let (client_end, server_end) = zx::Channel::create().context("chan create")?; |
| let mut client = sync::Client::new(client_end); |
| client.send(&mut SEND_DATA, SEND_ORDINAL).context("sending")?; |
| let mut received = zx::MessageBuf::new(); |
| server_end.read(&mut received).context("reading")?; |
| assert_eq!(SEND_EXPECTED, received.bytes()); |
| Ok(()) |
| } |
| |
| #[test] |
| fn sync_client_with_response() -> Result<(), Error> { |
| let (client_end, server_end) = zx::Channel::create().context("chan create")?; |
| let mut client = sync::Client::new(client_end); |
| thread::spawn(move || { |
| // Server |
| let mut received = zx::MessageBuf::new(); |
| server_end |
| .wait_handle(zx::Signals::CHANNEL_READABLE, 5.seconds().after_now()) |
| .expect("failed to wait for channel readable"); |
| server_end.read(&mut received).expect("failed to read on server end"); |
| let (buf, _handles) = received.split_mut(); |
| let (header, _body_bytes) = decode_transaction_header(buf).expect("server decode"); |
| assert_eq!(header.tx_id, sync::QUERY_TX_ID); |
| assert_eq!(header.ordinal, SEND_ORDINAL); |
| send_transaction( |
| TransactionHeader { tx_id: header.tx_id, flags: 0, ordinal: header.ordinal }, |
| &server_end, |
| ); |
| }); |
| let response_data = client |
| .send_query::<u8, u8>(&mut SEND_DATA, SEND_ORDINAL, 5.seconds().after_now()) |
| .context("sending query")?; |
| assert_eq!(SEND_DATA, response_data); |
| Ok(()) |
| } |
| |
| #[test] |
| fn client() { |
| let mut executor = fasync::Executor::new().unwrap(); |
| let (client_end, server_end) = zx::Channel::create().unwrap(); |
| let client_end = fasync::Channel::from_channel(client_end).unwrap(); |
| let client = Client::new(client_end); |
| |
| let server = fasync::Channel::from_channel(server_end).unwrap(); |
| let receiver = async move { |
| let mut buffer = zx::MessageBuf::new(); |
| await!(server.recv_msg(&mut buffer)).expect("failed to recv msg"); |
| assert_eq!(SEND_EXPECTED, buffer.bytes()); |
| }; |
| |
| // add a timeout to receiver so if test is broken it doesn't take forever |
| let receiver = receiver |
| .on_timeout(300.millis().after_now(), || panic!("did not receive message in time!")); |
| |
| let sender = fasync::Timer::new(100.millis().after_now()).map(|()| { |
| client.send(&mut SEND_DATA, SEND_ORDINAL).expect("failed to send msg"); |
| }); |
| |
| executor.run_singlethreaded(receiver.join(sender)); |
| } |
| |
| #[test] |
| fn client_with_response() { |
| const EXPECTED: &[u8] = &[ |
| 1, 0, 0, 0, 0, 0, 0, 0, // 32 bit tx_id followed by 32 bits of padding |
| 0, 0, 0, 0, // 32 bits for flags |
| 42, 0, 0, 0, // 32 bit ordinal |
| 55, // 8 bit data |
| 0, 0, 0, 0, 0, 0, 0, // 7 bytes of padding after our 1 byte of data |
| ]; |
| |
| let mut executor = fasync::Executor::new().unwrap(); |
| |
| let (client_end, server_end) = zx::Channel::create().unwrap(); |
| let client_end = fasync::Channel::from_channel(client_end).unwrap(); |
| let client = Client::new(client_end); |
| |
| let server = fasync::Channel::from_channel(server_end).unwrap(); |
| let mut buffer = zx::MessageBuf::new(); |
| let receiver = async move { |
| await!(server.recv_msg(&mut buffer)).expect("failed to recv msg"); |
| assert_eq!(EXPECTED, buffer.bytes()); |
| let id = 1; // internally, the first slot in a slab returns a `0`. We then add one |
| // since FIDL txids start with `1`. |
| |
| let (bytes, handles) = (&mut vec![], &mut vec![]); |
| let header = TransactionHeader { tx_id: id, flags: 0, ordinal: 42 }; |
| encode_transaction(header, bytes, handles); |
| server.write(bytes, handles).expect("Server channel write failed"); |
| }; |
| |
| // add a timeout to receiver so if test is broken it doesn't take forever |
| let receiver = receiver |
| .on_timeout(300.millis().after_now(), || panic!("did not receiver message in time!")); |
| |
| let sender = client |
| .send_query::<u8, u8>(&mut 55, 42) |
| .map_ok(|x| assert_eq!(x, 55)) |
| .map_err(|e| io::Error::new(io::ErrorKind::Other, &*format!("fidl error: {:?}", e))); |
| |
| // add a timeout to receiver so if test is broken it doesn't take forever |
| let sender = sender |
| .on_timeout(300.millis().after_now(), || panic!("did not receive response in time!")); |
| |
| executor.run_singlethreaded(receiver.join(sender)); |
| } |
| |
| #[test] |
| #[should_panic] |
| fn event_cant_be_taken_twice() { |
| let _exec = fasync::Executor::new().unwrap(); |
| let (client_end, _) = zx::Channel::create().unwrap(); |
| let client_end = fasync::Channel::from_channel(client_end).unwrap(); |
| let client = Client::new(client_end); |
| let _foo = client.take_event_receiver(); |
| client.take_event_receiver(); |
| } |
| |
| #[test] |
| fn event_can_be_taken() { |
| let _exec = fasync::Executor::new().unwrap(); |
| let (client_end, _) = zx::Channel::create().unwrap(); |
| let client_end = fasync::Channel::from_channel(client_end).unwrap(); |
| let client = Client::new(client_end); |
| client.take_event_receiver(); |
| } |
| |
| #[test] |
| fn event_received() { |
| let mut executor = fasync::Executor::new().unwrap(); |
| |
| let (client_end, server_end) = zx::Channel::create().unwrap(); |
| let client_end = fasync::Channel::from_channel(client_end).unwrap(); |
| let client = Client::new(client_end); |
| |
| // Send the event from the server |
| let server = fasync::Channel::from_channel(server_end).unwrap(); |
| let (bytes, handles) = (&mut vec![], &mut vec![]); |
| let header = TransactionHeader { tx_id: 0, flags: 0, ordinal: 5 }; |
| encode_transaction(header, bytes, handles); |
| server.write(bytes, handles).expect("Server channel write failed"); |
| drop(server); |
| |
| let recv = client |
| .take_event_receiver() |
| .into_future() |
| .then(|(x, stream)| { |
| let x = x.expect("should contain one element"); |
| let x = x.expect("fidl error"); |
| let x: i32 = decode_transaction_body(x).expect("failed to decode event"); |
| assert_eq!(x, 55); |
| stream.into_future() |
| }) |
| .map(|(x, _stream)| assert!(x.is_none(), "should have emptied")); |
| |
| // add a timeout to receiver so if test is broken it doesn't take forever |
| let recv = |
| recv.on_timeout(300.millis().after_now(), || panic!("did not receive event in time!")); |
| |
| executor.run_singlethreaded(recv); |
| } |
| |
| #[test] |
| fn client_always_wakes_pending_futures() { |
| let mut executor = fasync::Executor::new().unwrap(); |
| |
| let (client_end, server_end) = zx::Channel::create().unwrap(); |
| let client_end = fasync::Channel::from_channel(client_end).unwrap(); |
| let client = Client::new(client_end); |
| |
| let mut event_receiver = client.take_event_receiver(); |
| |
| // first poll on a response |
| let (response_waker, response_waker_count) = wake_counter::new_count_waker(); |
| let mut response_txid = Txid(0); |
| let mut response_future = client.send_raw_query(|tx_id, bytes, handles| { |
| response_txid = tx_id; |
| let header = |
| TransactionHeader { tx_id: response_txid.as_raw_id(), flags: 0, ordinal: 42 }; |
| encode_transaction(header, bytes, handles); |
| Ok(()) |
| }); |
| assert!(response_future.poll_unpin(&response_waker).is_pending()); |
| |
| // then, poll on an event |
| let (event_waker, event_waker_count) = wake_counter::new_count_waker(); |
| assert!(event_receiver.poll_next_unpin(&event_waker).is_pending()); |
| |
| // at this point, nothing should have been woken |
| assert_eq!(response_waker_count.get(), 0); |
| assert_eq!(event_waker_count.get(), 0); |
| |
| // next, simulate an event coming in |
| send_transaction(TransactionHeader { tx_id: 0, flags: 0, ordinal: 5 }, &server_end); |
| |
| // get event loop to deliver readiness notifications to channels |
| let _ = executor.run_until_stalled(&mut future::empty::<()>()); |
| |
| // either response_waker or event_waker should be woken |
| assert_eq!(response_waker_count.get() + event_waker_count.get(), 1); |
| let last_response_waker_count = response_waker_count.get(); |
| |
| // we'll pretend event_waker was woken, and have that poll out the event |
| assert!(event_receiver.poll_next_unpin(&event_waker).is_ready()); |
| |
| // next, simulate a response coming in |
| send_transaction( |
| TransactionHeader { tx_id: response_txid.as_raw_id(), flags: 0, ordinal: 42 }, |
| &server_end, |
| ); |
| |
| // get event loop to deliver readiness notifications to channels |
| let _ = executor.run_until_stalled(&mut future::empty::<()>()); |
| |
| // response waker should have been woken again |
| assert_eq!(response_waker_count.get(), last_response_waker_count + 1); |
| } |
| |
| #[test] |
| fn client_allows_take_event_stream_even_if_event_delivered() { |
| let mut _executor = fasync::Executor::new().unwrap(); |
| |
| let (client_end, server_end) = zx::Channel::create().unwrap(); |
| let client_end = fasync::Channel::from_channel(client_end).unwrap(); |
| let client = Client::new(client_end); |
| |
| // first simulate an event coming in, even though nothing has polled |
| send_transaction(TransactionHeader { tx_id: 0, flags: 0, ordinal: 5 }, &server_end); |
| |
| // next, poll on a response |
| let (response_waker, _response_waker_count) = wake_counter::new_count_waker(); |
| let mut response_future = client.send_query::<u8, u8>(&mut 55, 42); |
| assert!(response_future.poll_unpin(&response_waker).is_pending()); |
| |
| // then, make sure we can still take the event receiver without panicking |
| let mut _event_receiver = client.take_event_receiver(); |
| } |
| } |