| // Copyright 2021 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| use fuchsia_zircon as zx; |
| use itertools::Itertools; |
| use std::collections::hash_map::Entry; |
| use std::collections::{HashMap, VecDeque}; |
| use std::sync::{Arc, Weak}; |
| |
| use crate::arch::uapi::epoll_event; |
| use crate::fs::buffers::{InputBuffer, OutputBuffer}; |
| use crate::fs::*; |
| use crate::lock::RwLock; |
| use crate::logging::*; |
| use crate::task::*; |
| use crate::types::*; |
| |
| /// Maximum depth of epoll instances monitoring one another. |
| /// From https://man7.org/linux/man-pages/man2/epoll_ctl.2.html |
| const MAX_NESTED_DEPTH: u32 = 5; |
| |
| /// WaitObject represents a FileHandle that is being waited upon. |
| /// The `data` field is a user defined quantity passed in |
| /// via `sys_epoll_ctl`. Typically C programs could use this |
| /// to store a pointer to the data that needs to be processed |
| /// after an event. |
| struct WaitObject { |
| target: Weak<FileObject>, |
| events: FdEvents, |
| data: u64, |
| wait_canceler: Option<WaitCanceler>, |
| } |
| |
| impl WaitObject { |
| // TODO(fxbug.dev/64296) we should not report an error if the file was closed while it was |
| // registered for epoll(). Either the file needs to be removed from our lists when it is closed, |
| // we need to ignore/remove WaitObjects when the file is gone, or (more likely) both because of |
| // race conditions removing the file object. |
| fn target(&self) -> Result<FileHandle, Errno> { |
| self.target.upgrade().ok_or_else(|| errno!(EBADF)) |
| } |
| } |
| |
| /// EpollKey acts as an key to a map of WaitObject. |
| /// In reality it is a pointer to a FileHandle object. |
| type EpollKey = usize; |
| |
| fn as_epoll_key(file: &FileHandle) -> EpollKey { |
| Arc::as_ptr(file) as EpollKey |
| } |
| |
| /// ReadyObject represents an event on a waited upon object. |
| #[derive(Clone, Debug)] |
| struct ReadyObject { |
| key: EpollKey, |
| observed: FdEvents, |
| } |
| |
| /// EpollFileObject represents the FileObject used to |
| /// implement epoll_create1/epoll_ctl/epoll_pwait. |
| pub struct EpollFileObject { |
| waiter: Waiter, |
| /// Mutable state of this epoll object. |
| state: Arc<RwLock<EpollState>>, |
| } |
| |
| struct EpollState { |
| /// Any file tracked by this epoll instance |
| /// will exist as a key in `wait_objects`. |
| wait_objects: HashMap<EpollKey, WaitObject>, |
| /// trigger_list is a FIFO of events that have |
| /// happened, but have not yet been processed. |
| trigger_list: VecDeque<ReadyObject>, |
| /// rearm_list is the list of event that need to |
| /// be waited upon prior to actually waiting in |
| /// EpollFileObject::wait. They cannot be re-armed |
| /// before that, because, if the client process has |
| /// not cleared the wait condition, they would just |
| /// be immediately triggered. |
| rearm_list: Vec<ReadyObject>, |
| /// A list of waiters waiting for events from this |
| /// epoll instance. |
| waiters: WaitQueue, |
| } |
| |
| impl EpollFileObject { |
| /// Allocate a new, empty epoll object. |
| pub fn new_file(current_task: &CurrentTask) -> FileHandle { |
| Anon::new_file( |
| current_task, |
| Box::new(EpollFileObject { |
| waiter: Waiter::new(), |
| state: Arc::new(RwLock::new(EpollState { |
| wait_objects: HashMap::default(), |
| trigger_list: VecDeque::new(), |
| rearm_list: Vec::new(), |
| waiters: WaitQueue::default(), |
| })), |
| }), |
| OpenFlags::RDWR, |
| ) |
| } |
| |
| fn new_wait_handler(&self, key: EpollKey) -> EventHandler { |
| let state = self.state.clone(); |
| Box::new(move |observed: FdEvents| { |
| state.write().trigger_list.push_back(ReadyObject { key, observed }) |
| }) |
| } |
| |
| fn wait_on_file( |
| &self, |
| current_task: &CurrentTask, |
| key: EpollKey, |
| wait_object: &mut WaitObject, |
| ) -> Result<(), Errno> { |
| let target = wait_object.target()?; |
| |
| // First start the wait. If an event happens after this, we'll get it. |
| self.wait_on_file_edge_triggered(current_task, key, wait_object)?; |
| |
| // Now check the events. If an event happened before this, we'll detect it here. There's |
| // now no race window where an event would be missed. |
| // |
| // That said, if an event happens between the wait and the query_events, we'll get two |
| // notifications. We handle this by deduping on the epoll_wait end. |
| let events = target.query_events(current_task); |
| if !(events & wait_object.events).is_empty() { |
| self.waiter.wake_immediately(events.bits(), self.new_wait_handler(key)); |
| wait_object |
| .wait_canceler |
| .as_ref() |
| .expect("canceler must have been set by `wait_on_file_edge_triggered`") |
| .cancel(); |
| } |
| Ok(()) |
| } |
| |
| fn wait_on_file_edge_triggered( |
| &self, |
| current_task: &CurrentTask, |
| key: EpollKey, |
| wait_object: &mut WaitObject, |
| ) -> Result<(), Errno> { |
| wait_object.wait_canceler = wait_object.target()?.wait_async( |
| current_task, |
| &self.waiter, |
| wait_object.events, |
| self.new_wait_handler(key), |
| ); |
| if wait_object.wait_canceler.is_none() { |
| return error!(EPERM); |
| } |
| Ok(()) |
| } |
| |
| /// Checks if this EpollFileObject monitors the `epoll_file_object` at `epoll_file_handle`. |
| #[allow(clippy::only_used_in_recursion)] |
| fn monitors( |
| &self, |
| epoll_file_handle: &FileHandle, |
| epoll_file_object: &EpollFileObject, |
| depth_left: u32, |
| ) -> Result<bool, Errno> { |
| if depth_left == 0 { |
| return Ok(true); |
| } |
| |
| let state = self.state.read(); |
| for nested_object in state.wait_objects.values() { |
| match nested_object.target()?.downcast_file::<EpollFileObject>() { |
| None => continue, |
| Some(target) => { |
| if target.monitors(epoll_file_handle, epoll_file_object, depth_left - 1)? |
| || Arc::ptr_eq(&nested_object.target()?, epoll_file_handle) |
| { |
| return Ok(true); |
| } |
| } |
| } |
| } |
| |
| Ok(false) |
| } |
| |
| /// Asynchronously wait on certain events happening on a FileHandle. |
| pub fn add( |
| &self, |
| current_task: &CurrentTask, |
| file: &FileHandle, |
| epoll_file_handle: &FileHandle, |
| mut epoll_event: epoll_event, |
| ) -> Result<(), Errno> { |
| epoll_event.events |= FdEvents::POLLHUP.bits(); |
| epoll_event.events |= FdEvents::POLLERR.bits(); |
| |
| // Check if adding this file would cause a cycle at a max depth of 5. |
| if let Some(epoll_to_add) = file.downcast_file::<EpollFileObject>() { |
| if epoll_to_add.monitors(epoll_file_handle, self, MAX_NESTED_DEPTH)? { |
| return error!(ELOOP); |
| } |
| } |
| |
| let mut state = self.state.write(); |
| let key = as_epoll_key(file); |
| match state.wait_objects.entry(key) { |
| Entry::Occupied(_) => error!(EEXIST), |
| Entry::Vacant(entry) => { |
| let wait_object = entry.insert(WaitObject { |
| target: Arc::downgrade(file), |
| events: FdEvents::from_bits_truncate(epoll_event.events), |
| data: epoll_event.data, |
| wait_canceler: None, |
| }); |
| self.wait_on_file(current_task, key, wait_object) |
| } |
| } |
| } |
| |
| /// Modify the events we are looking for on a Filehandle. |
| pub fn modify( |
| &self, |
| current_task: &CurrentTask, |
| file: &FileHandle, |
| mut epoll_event: epoll_event, |
| ) -> Result<(), Errno> { |
| epoll_event.events |= FdEvents::POLLHUP.bits(); |
| epoll_event.events |= FdEvents::POLLERR.bits(); |
| |
| let mut state = self.state.write(); |
| let key = as_epoll_key(file); |
| state.rearm_list.retain(|x| x.key != key); |
| match state.wait_objects.entry(key) { |
| Entry::Occupied(mut entry) => { |
| let wait_object = entry.get_mut(); |
| if let Some(wait_canceler) = wait_object.wait_canceler.take() { |
| wait_canceler.cancel(); |
| } |
| wait_object.events = FdEvents::from_bits_truncate(epoll_event.events); |
| self.wait_on_file(current_task, key, wait_object) |
| } |
| Entry::Vacant(_) => error!(ENOENT), |
| } |
| } |
| |
| /// Cancel an asynchronous wait on an object. Events triggered before |
| /// calling this will still be delivered. |
| pub fn delete(&self, file: &FileHandle) -> Result<(), Errno> { |
| let mut state = self.state.write(); |
| let key = as_epoll_key(file); |
| if let Some(mut wait_object) = state.wait_objects.remove(&key) { |
| if let Some(wait_canceler) = wait_object.wait_canceler.take() { |
| wait_canceler.cancel(); |
| } |
| state.rearm_list.retain(|x| x.key != key); |
| Ok(()) |
| } else { |
| error!(ENOENT) |
| } |
| } |
| |
| /// Stores events from the Epoll's trigger list to the parameter `pending_list`. This does not |
| /// actually invoke the waiter which is how items are added to the trigger list. The caller |
| /// will have to do that before calling if needed. |
| /// |
| /// If an event in the trigger list is stale, the event will be re-added to the waiter. |
| /// |
| /// Returns true if any events were added. False means there was nothing in the trigger list. |
| fn process_triggered_events( |
| &self, |
| current_task: &CurrentTask, |
| pending_list: &mut Vec<ReadyObject>, |
| max_events: usize, |
| ) { |
| let mut state = self.state.write(); |
| while pending_list.len() < max_events && !state.trigger_list.is_empty() { |
| if let Some(pending) = state.trigger_list.pop_front() { |
| if let Some(wait) = state.wait_objects.get_mut(&pending.key) { |
| // The weak pointer to the FileObject target can be gone if the file was closed |
| // out from under us. If this happens it is not an error: ignore it and |
| // continue. |
| if let Some(target) = wait.target.upgrade() { |
| let observed = target.query_events(current_task); |
| let ready = ReadyObject { key: pending.key, observed }; |
| if observed.intersects(wait.events) { |
| pending_list.push(ready); |
| } else { |
| // Another thread already handled this event, wait for another one. |
| // Files can be legitimately closed out from under us so bad file |
| // descriptors are not an error. We do not currently expect any other |
| // errors and it's not clear how to handle them, so warn for now. |
| match self.wait_on_file(current_task, pending.key, wait) { |
| Err(err) if err == EBADF => {} // File closed. |
| Err(err) => log_warn!("Unexpected wait result {:#?}", err), |
| _ => {} |
| } |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| /// Waits until an event exists in `pending_list` or until `timeout` has |
| /// been reached. |
| fn wait_until_pending_event( |
| &self, |
| current_task: &CurrentTask, |
| pending_list: &mut Vec<ReadyObject>, |
| max_events: usize, |
| input_wait_deadline: zx::Time, |
| ) -> Result<(), Errno> { |
| // Avoid nonzero deadlines if there are already extracted events (see EINTR handling below). |
| debug_assert!(input_wait_deadline.into_nanos() == 0 || pending_list.is_empty()); |
| |
| let mut wait_deadline = input_wait_deadline; |
| |
| loop { |
| // The handlers in the waits cause items to be appended to trigger_list. See the closure |
| // in `wait_on_file` to see how this happens. |
| // |
| // This wait may return EINTR for nonzero timeouts which is not an error. We must be |
| // careful not to lose events if this happens. |
| // |
| // The first time through this loop we'll use the timeout passed into this function so |
| // can get EINTR. But since we haven't done anything or accumulated any results yet it's |
| // OK to immediately return and no information will be lost. |
| match self.waiter.wait_until(current_task, wait_deadline) { |
| Err(err) if err == ETIMEDOUT => break, |
| Err(err) if err == EINTR => { |
| // Terminating early will lose any events in the pending_list so that should |
| // only be for unrecoverable errors (not EINTR). The only time there should be a |
| // nonzero wait_deadline (and hence the ability to encounter EINTR) is when the |
| // pending list is empty. |
| debug_assert!( |
| pending_list.is_empty(), |
| "Got EINTR from wait of {}ns with {} items pending.", |
| wait_deadline.into_nanos(), |
| pending_list.len() |
| ); |
| return Err(err); |
| } |
| // TODO check if this is supposed to actually fail! |
| result => result?, |
| } |
| |
| self.process_triggered_events(current_task, pending_list, max_events); |
| |
| if pending_list.len() == max_events { |
| break; // No input events or output list full, nothing more we can do. |
| } |
| |
| if !pending_list.is_empty() { |
| // We now know we have at least one event to return. We shouldn't return |
| // immediately, in case there are more events available, but the next loop should |
| // wait with a 0 timeout to prevent further blocking. |
| wait_deadline = zx::Time::ZERO; |
| } |
| |
| // Loop back to check if there are more items in the Waiter's queue. Every wait_until() |
| // call will process a single event. In order to drain as many events as we can that |
| // are synchronously available, keep trying until it reports empty. |
| } |
| |
| Ok(()) |
| } |
| |
| /// Blocking wait on all waited upon events with a timeout. |
| pub fn wait( |
| &self, |
| current_task: &CurrentTask, |
| max_events: usize, |
| mut deadline: zx::Time, |
| ) -> Result<Vec<epoll_event>, Errno> { |
| // First we start waiting again on wait objects that have |
| // previously been triggered. |
| { |
| let mut state = self.state.write(); |
| let rearm_list = std::mem::take(&mut state.rearm_list); |
| for to_wait in rearm_list.iter() { |
| // TODO handle interrupts here |
| let w = state.wait_objects.get_mut(&to_wait.key).unwrap(); |
| self.wait_on_file(current_task, to_wait.key, w)?; |
| } |
| } |
| |
| // Process any events that are already available in the triggered queue. |
| // TODO(tbodt) fold this into the wait_until_pending_event loop |
| let mut pending_list = vec![]; |
| self.process_triggered_events(current_task, &mut pending_list, max_events); |
| if !pending_list.is_empty() { |
| // TODO(tbodt) delete this block |
| // If there are events synchronously available, don't actually wait for any more. |
| // We still need to call wait_until_pending_event() (this time with a 0 deadline) to |
| // process any events currently pending in the Waiter that haven't been added to our |
| // triggered queue yet. |
| deadline = zx::Time::ZERO; |
| } |
| |
| // Note: wait_until_pending_event() can be interrupted with EINTR. We must be careful not to |
| // lose state if that happens. The only state that can be lost are items already in the |
| // pending list, and the code above sets the deadline to 0 in that case which will remove |
| // the wait and avoid EINTR. |
| self.wait_until_pending_event(current_task, &mut pending_list, max_events, deadline)?; |
| |
| // Process the pending list and add processed ReadyObject |
| // entries to the rearm_list for the next wait. |
| let mut result = vec![]; |
| let mut state = self.state.write(); |
| for pending_event in pending_list.iter().unique_by(|e| e.key) { |
| // The wait could have been deleted by here, |
| // so ignore the None case. |
| if let Some(wait) = state.wait_objects.get_mut(&pending_event.key) { |
| let reported_events = pending_event.observed.bits() & wait.events.bits(); |
| result.push(epoll_event::new(reported_events, wait.data)); |
| |
| // Files marked with `EPOLLONESHOT` should only notify |
| // once and need to be rearmed manually with epoll_ctl_mod(). |
| if wait.events.bits() & EPOLLONESHOT != 0 { |
| continue; |
| } |
| if wait.events.bits() & EPOLLET != 0 { |
| // The file can be closed while registered for epoll which is not an error. |
| // We do not expect other errors from waiting. |
| match self.wait_on_file_edge_triggered(current_task, pending_event.key, wait) { |
| Err(err) if err == EBADF => {} // File closed, ignore. |
| Err(err) => log_warn!("Unexpected wait result {:#?}", err), |
| _ => {} |
| } |
| } else { |
| state.rearm_list.push(pending_event.clone()); |
| } |
| } |
| } |
| |
| // Notify waiters of unprocessed events. |
| if !state.trigger_list.is_empty() { |
| state.waiters.notify_events(FdEvents::POLLIN); |
| } |
| |
| Ok(result) |
| } |
| } |
| |
| impl FileOps for EpollFileObject { |
| fileops_impl_nonseekable!(); |
| |
| fn write( |
| &self, |
| _file: &FileObject, |
| _current_task: &CurrentTask, |
| _data: &mut dyn InputBuffer, |
| ) -> Result<usize, Errno> { |
| error!(EINVAL) |
| } |
| |
| fn read( |
| &self, |
| _file: &FileObject, |
| _current_task: &CurrentTask, |
| _data: &mut dyn OutputBuffer, |
| ) -> Result<usize, Errno> { |
| error!(EINVAL) |
| } |
| |
| fn wait_async( |
| &self, |
| _file: &FileObject, |
| _current_task: &CurrentTask, |
| waiter: &Waiter, |
| events: FdEvents, |
| handler: EventHandler, |
| ) -> Option<WaitCanceler> { |
| Some(self.state.read().waiters.wait_async_mask(waiter, events.bits(), handler)) |
| } |
| |
| fn query_events(&self, _current_task: &CurrentTask) -> FdEvents { |
| let mut events = FdEvents::empty(); |
| if self.state.read().trigger_list.is_empty() { |
| events |= FdEvents::POLLIN; |
| } |
| events |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use super::*; |
| use crate::fs::buffers::{VecInputBuffer, VecOutputBuffer}; |
| use crate::fs::fuchsia::create_fuchsia_pipe; |
| use crate::fs::pipe::new_pipe; |
| use crate::fs::socket::{SocketDomain, SocketType, UnixSocket}; |
| use crate::fs::FdEvents; |
| use fuchsia_zircon::HandleBased; |
| use std::sync::atomic::{AtomicU64, Ordering}; |
| use syncio::Zxio; |
| |
| use crate::testing::*; |
| |
| #[::fuchsia::test] |
| fn test_epoll_read_ready() { |
| static WRITE_COUNT: AtomicU64 = AtomicU64::new(0); |
| const EVENT_DATA: u64 = 42; |
| |
| let (kernel, _init_task) = create_kernel_and_task(); |
| let current_task = create_task(&kernel, "main-task"); |
| let writer_task = create_task(&kernel, "writer-task"); |
| |
| let (pipe_out, pipe_in) = new_pipe(¤t_task).unwrap(); |
| |
| let test_string = "hello starnix".to_string(); |
| let test_len = test_string.len(); |
| |
| let epoll_file_handle = EpollFileObject::new_file(¤t_task); |
| let epoll_file = epoll_file_handle.downcast_file::<EpollFileObject>().unwrap(); |
| epoll_file |
| .add( |
| ¤t_task, |
| &pipe_out, |
| &epoll_file_handle, |
| epoll_event::new(FdEvents::POLLIN.bits(), EVENT_DATA), |
| ) |
| .unwrap(); |
| |
| let test_string_copy = test_string.clone(); |
| let thread = std::thread::spawn(move || { |
| let bytes_written = pipe_in |
| .write(&writer_task, &mut VecInputBuffer::new(test_string_copy.as_bytes())) |
| .unwrap(); |
| assert_eq!(bytes_written, test_len); |
| WRITE_COUNT.fetch_add(bytes_written as u64, Ordering::Relaxed); |
| }); |
| let events = epoll_file.wait(¤t_task, 10, zx::Time::INFINITE).unwrap(); |
| let _ = thread.join(); |
| assert_eq!(1, events.len()); |
| let event = &events[0]; |
| assert!(FdEvents::from_bits_truncate(event.events).contains(FdEvents::POLLIN)); |
| let data = event.data; |
| assert_eq!(EVENT_DATA, data); |
| |
| let mut buffer = VecOutputBuffer::new(test_len); |
| let bytes_read = pipe_out.read(¤t_task, &mut buffer).unwrap(); |
| assert_eq!(bytes_read as u64, WRITE_COUNT.load(Ordering::Relaxed)); |
| assert_eq!(bytes_read, test_len); |
| assert_eq!(buffer.data(), test_string.as_bytes()); |
| } |
| |
| #[::fuchsia::test] |
| fn test_epoll_ready_then_wait() { |
| const EVENT_DATA: u64 = 42; |
| |
| let (_kernel, current_task) = create_kernel_and_task(); |
| |
| let (pipe_out, pipe_in) = new_pipe(¤t_task).unwrap(); |
| |
| let test_string = "hello starnix".to_string(); |
| let test_bytes = test_string.as_bytes(); |
| let test_len = test_bytes.len(); |
| |
| assert_eq!( |
| pipe_in.write(¤t_task, &mut VecInputBuffer::new(test_bytes)).unwrap(), |
| test_bytes.len() |
| ); |
| |
| let epoll_file_handle = EpollFileObject::new_file(¤t_task); |
| let epoll_file = epoll_file_handle.downcast_file::<EpollFileObject>().unwrap(); |
| epoll_file |
| .add( |
| ¤t_task, |
| &pipe_out, |
| &epoll_file_handle, |
| epoll_event::new(FdEvents::POLLIN.bits(), EVENT_DATA), |
| ) |
| .unwrap(); |
| |
| let events = epoll_file.wait(¤t_task, 10, zx::Time::INFINITE).unwrap(); |
| assert_eq!(1, events.len()); |
| let event = &events[0]; |
| assert!(FdEvents::from_bits_truncate(event.events).contains(FdEvents::POLLIN)); |
| let data = event.data; |
| assert_eq!(EVENT_DATA, data); |
| |
| let mut buffer = VecOutputBuffer::new(test_len); |
| let bytes_read = pipe_out.read(¤t_task, &mut buffer).unwrap(); |
| assert_eq!(bytes_read, test_len); |
| assert_eq!(buffer.data(), test_bytes); |
| } |
| |
| #[::fuchsia::test] |
| fn test_epoll_ctl_cancel() { |
| for do_cancel in [true, false] { |
| let (_kernel, current_task) = create_kernel_and_task(); |
| let event = new_eventfd(¤t_task, 0, EventFdType::Counter, true); |
| let waiter = Waiter::new(); |
| |
| let epoll_file_handle = EpollFileObject::new_file(¤t_task); |
| let epoll_file = epoll_file_handle.downcast_file::<EpollFileObject>().unwrap(); |
| const EVENT_DATA: u64 = 42; |
| epoll_file |
| .add( |
| ¤t_task, |
| &event, |
| &epoll_file_handle, |
| epoll_event::new(FdEvents::POLLIN.bits(), EVENT_DATA), |
| ) |
| .unwrap(); |
| |
| if do_cancel { |
| epoll_file.delete(&event).unwrap(); |
| } |
| |
| let callback_count = Arc::new(AtomicU64::new(0)); |
| let callback_count_clone = callback_count.clone(); |
| let handler = move |_observed: FdEvents| { |
| callback_count_clone.fetch_add(1, Ordering::Relaxed); |
| }; |
| let wait_canceler = event |
| .wait_async(¤t_task, &waiter, FdEvents::POLLIN, Box::new(handler)) |
| .expect("wait_async"); |
| if do_cancel { |
| wait_canceler.cancel(); |
| } |
| |
| let add_val = 1u64; |
| assert_eq!( |
| event |
| .write(¤t_task, &mut VecInputBuffer::new(&add_val.to_ne_bytes())) |
| .unwrap(), |
| std::mem::size_of::<u64>() |
| ); |
| |
| let events = epoll_file.wait(¤t_task, 10, zx::Time::ZERO).unwrap(); |
| |
| if do_cancel { |
| assert_eq!(0, events.len()); |
| } else { |
| assert_eq!(1, events.len()); |
| let event = &events[0]; |
| assert!(FdEvents::from_bits_truncate(event.events).contains(FdEvents::POLLIN)); |
| let data = event.data; |
| assert_eq!(EVENT_DATA, data); |
| } |
| } |
| } |
| |
| #[::fuchsia::test] |
| fn test_multiple_events() { |
| let (_kernel, current_task) = create_kernel_and_task(); |
| let (client1, server1) = zx::Socket::create_stream(); |
| let (client2, server2) = zx::Socket::create_stream(); |
| let pipe1 = create_fuchsia_pipe(¤t_task, client1, OpenFlags::RDWR) |
| .expect("create_fuchsia_pipe"); |
| let pipe2 = create_fuchsia_pipe(¤t_task, client2, OpenFlags::RDWR) |
| .expect("create_fuchsia_pipe"); |
| let server1_zxio = Zxio::create(server1.into_handle()).expect("Zxio::create"); |
| let server2_zxio = Zxio::create(server2.into_handle()).expect("Zxio::create"); |
| |
| let poll = || { |
| let epoll_object = EpollFileObject::new_file(¤t_task); |
| let epoll_file = epoll_object.downcast_file::<EpollFileObject>().unwrap(); |
| epoll_file |
| .add( |
| ¤t_task, |
| &pipe1, |
| &epoll_object, |
| epoll_event::new(FdEvents::POLLIN.bits(), 1), |
| ) |
| .expect("epoll_file.add"); |
| epoll_file |
| .add( |
| ¤t_task, |
| &pipe2, |
| &epoll_object, |
| epoll_event::new(FdEvents::POLLIN.bits(), 2), |
| ) |
| .expect("epoll_file.add"); |
| epoll_file.wait(¤t_task, 2, zx::Time::ZERO).expect("wait") |
| }; |
| |
| let fds = poll(); |
| assert!(fds.is_empty()); |
| |
| assert_eq!(server1_zxio.write(&[0]).expect("write"), 1); |
| |
| let fds = poll(); |
| assert_eq!(fds.len(), 1); |
| assert_eq!(FdEvents::from_bits_truncate(fds[0].events), FdEvents::POLLIN); |
| let data = fds[0].data; |
| assert_eq!(data, 1); |
| assert_eq!(pipe1.read(¤t_task, &mut VecOutputBuffer::new(64)).expect("read"), 1); |
| |
| let fds = poll(); |
| assert!(fds.is_empty()); |
| |
| assert_eq!(server2_zxio.write(&[0]).expect("write"), 1); |
| |
| let fds = poll(); |
| assert_eq!(fds.len(), 1); |
| assert_eq!(FdEvents::from_bits_truncate(fds[0].events), FdEvents::POLLIN); |
| let data = fds[0].data; |
| assert_eq!(data, 2); |
| assert_eq!(pipe2.read(¤t_task, &mut VecOutputBuffer::new(64)).expect("read"), 1); |
| |
| let fds = poll(); |
| assert!(fds.is_empty()); |
| } |
| |
| #[::fuchsia::test] |
| fn test_cancel_after_notify() { |
| let (_kernel, current_task) = create_kernel_and_task(); |
| let event = new_eventfd(¤t_task, 0, EventFdType::Counter, true); |
| let epoll_file_handle = EpollFileObject::new_file(¤t_task); |
| let epoll_file = epoll_file_handle.downcast_file::<EpollFileObject>().unwrap(); |
| |
| // Add a thing |
| const EVENT_DATA: u64 = 42; |
| epoll_file |
| .add( |
| ¤t_task, |
| &event, |
| &epoll_file_handle, |
| epoll_event::new(FdEvents::POLLIN.bits(), EVENT_DATA), |
| ) |
| .unwrap(); |
| |
| // Make the thing send a notification, wait for it |
| let add_val = 1u64; |
| assert_eq!( |
| event.write(¤t_task, &mut VecInputBuffer::new(&add_val.to_ne_bytes())).unwrap(), |
| std::mem::size_of::<u64>() |
| ); |
| |
| assert_eq!(epoll_file.wait(¤t_task, 10, zx::Time::ZERO).unwrap().len(), 1); |
| |
| // Remove the thing |
| epoll_file.delete(&event).unwrap(); |
| |
| // Wait for new notifications |
| assert_eq!(epoll_file.wait(¤t_task, 10, zx::Time::ZERO).unwrap().len(), 0); |
| // That shouldn't crash |
| } |
| |
| #[::fuchsia::test] |
| fn test_add_then_modify() { |
| let (_kernel, current_task) = create_kernel_and_task(); |
| let (socket1, _socket2) = UnixSocket::new_pair( |
| ¤t_task, |
| SocketDomain::Unix, |
| SocketType::Stream, |
| OpenFlags::RDWR, |
| ) |
| .expect("Failed to create socket pair."); |
| |
| let epoll_file_handle = EpollFileObject::new_file(¤t_task); |
| let epoll_file = epoll_file_handle.downcast_file::<EpollFileObject>().unwrap(); |
| |
| const EVENT_DATA: u64 = 42; |
| epoll_file |
| .add( |
| ¤t_task, |
| &socket1, |
| &epoll_file_handle, |
| epoll_event::new(FdEvents::POLLIN.bits(), EVENT_DATA), |
| ) |
| .unwrap(); |
| assert_eq!(epoll_file.wait(¤t_task, 10, zx::Time::ZERO).unwrap().len(), 0); |
| |
| let read_write_event = FdEvents::POLLIN | FdEvents::POLLOUT; |
| epoll_file |
| .modify(¤t_task, &socket1, epoll_event::new(read_write_event.bits(), EVENT_DATA)) |
| .unwrap(); |
| let triggered_events = epoll_file.wait(¤t_task, 10, zx::Time::ZERO).unwrap(); |
| assert_eq!(1, triggered_events.len()); |
| let event = &triggered_events[0]; |
| let events = event.events; |
| assert_eq!(events, FdEvents::POLLOUT.bits()); |
| let data = event.data; |
| assert_eq!(EVENT_DATA, data); |
| } |
| } |