| // Copyright 2021 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| use fuchsia_zircon as zx; |
| use itertools::Itertools; |
| use starnix_lock::Mutex; |
| use std::{ |
| collections::{hash_map::Entry, HashMap, VecDeque}, |
| sync::{Arc, Weak}, |
| }; |
| |
| use crate::{ |
| arch::uapi::epoll_event, |
| fs::{ |
| buffers::{InputBuffer, OutputBuffer}, |
| fileops_impl_nonseekable, Anon, FdEvents, FileHandle, FileObject, FileOps, |
| }, |
| logging::log_warn, |
| task::{ |
| CurrentTask, EnqueueEventHandler, EventHandler, ReadyItem, ReadyItemKey, WaitCanceler, |
| WaitQueue, Waiter, |
| }, |
| types::{errno, error, Errno, OpenFlags, EBADF, EINTR, EPOLLET, EPOLLONESHOT, ETIMEDOUT}, |
| }; |
| |
| /// Maximum depth of epoll instances monitoring one another. |
| /// From https://man7.org/linux/man-pages/man2/epoll_ctl.2.html |
| const MAX_NESTED_DEPTH: u32 = 5; |
| |
| /// WaitObject represents a FileHandle that is being waited upon. |
| /// The `data` field is a user defined quantity passed in |
| /// via `sys_epoll_ctl`. Typically C programs could use this |
| /// to store a pointer to the data that needs to be processed |
| /// after an event. |
| struct WaitObject { |
| target: Weak<FileObject>, |
| events: FdEvents, |
| data: u64, |
| wait_canceler: Option<WaitCanceler>, |
| } |
| |
| impl WaitObject { |
| // TODO(fxbug.dev/64296) we should not report an error if the file was closed while it was |
| // registered for epoll(). Either the file needs to be removed from our lists when it is closed, |
| // we need to ignore/remove WaitObjects when the file is gone, or (more likely) both because of |
| // race conditions removing the file object. |
| fn target(&self) -> Result<FileHandle, Errno> { |
| self.target.upgrade().ok_or_else(|| errno!(EBADF)) |
| } |
| } |
| |
| /// EpollKey acts as an key to a map of WaitObject. |
| /// In reality it is a pointer to a FileHandle object. |
| type EpollKey = usize; |
| |
| fn as_epoll_key(file: &FileHandle) -> EpollKey { |
| Arc::as_ptr(file) as EpollKey |
| } |
| |
| /// EpollFileObject represents the FileObject used to |
| /// implement epoll_create1/epoll_ctl/epoll_pwait. |
| #[derive(Default)] |
| pub struct EpollFileObject { |
| waiter: Waiter, |
| /// Mutable state of this epoll object. |
| state: Mutex<EpollState>, |
| /// trigger_list is a FIFO of events that have |
| /// happened, but have not yet been processed. |
| trigger_list: Arc<Mutex<VecDeque<ReadyItem>>>, |
| } |
| |
| #[derive(Default)] |
| struct EpollState { |
| /// Any file tracked by this epoll instance |
| /// will exist as a key in `wait_objects`. |
| wait_objects: HashMap<ReadyItemKey, WaitObject>, |
| /// processing_list is a FIFO of events that are being |
| /// processed. |
| /// |
| /// Objects from the `EpollFileObject`'s `trigger_list` are moved into this |
| /// list so that we can handle triggered events without holding its lock |
| /// longer than we need to. This reduces contention with waited-on objects |
| /// that tries to notify this epoll object on subscribed events. |
| processing_list: VecDeque<ReadyItem>, |
| /// rearm_list is the list of event that need to |
| /// be waited upon prior to actually waiting in |
| /// EpollFileObject::wait. They cannot be re-armed |
| /// before that, because, if the client process has |
| /// not cleared the wait condition, they would just |
| /// be immediately triggered. |
| rearm_list: Vec<ReadyItem>, |
| /// A list of waiters waiting for events from this |
| /// epoll instance. |
| waiters: WaitQueue, |
| } |
| |
| impl EpollFileObject { |
| /// Allocate a new, empty epoll object. |
| pub fn new_file(current_task: &CurrentTask) -> FileHandle { |
| let epoll = Box::new(EpollFileObject::default()); |
| |
| #[cfg(any(test, debug_assertions))] |
| { |
| let _l1 = epoll.state.lock(); |
| let _l2 = epoll.trigger_list.lock(); |
| } |
| |
| Anon::new_file(current_task, epoll, OpenFlags::RDWR) |
| } |
| |
| fn new_wait_handler(&self, key: ReadyItemKey) -> EventHandler { |
| EventHandler::Enqueue(EnqueueEventHandler { |
| key, |
| queue: self.trigger_list.clone(), |
| sought_events: FdEvents::all(), |
| mappings: Default::default(), |
| }) |
| } |
| |
| fn wait_on_file( |
| &self, |
| current_task: &CurrentTask, |
| key: ReadyItemKey, |
| wait_object: &mut WaitObject, |
| ) -> Result<(), Errno> { |
| let target = wait_object.target()?; |
| |
| // First start the wait. If an event happens after this, we'll get it. |
| self.wait_on_file_edge_triggered(current_task, key, wait_object)?; |
| |
| // Now check the events. If an event happened before this, we'll detect it here. There's |
| // now no race window where an event would be missed. |
| // |
| // That said, if an event happens between the wait and the query_events, we'll get two |
| // notifications. We handle this by deduping on the epoll_wait end. |
| let events = target.query_events(current_task)?; |
| if !(events & wait_object.events).is_empty() { |
| self.waiter.wake_immediately(events, self.new_wait_handler(key)); |
| if let Some(wait_canceler) = wait_object.wait_canceler.take() { |
| wait_canceler.cancel(); |
| } else { |
| log_warn!("wait canceler should have been set by `wait_on_file_edge_triggered`"); |
| } |
| } |
| Ok(()) |
| } |
| |
| fn wait_on_file_edge_triggered( |
| &self, |
| current_task: &CurrentTask, |
| key: ReadyItemKey, |
| wait_object: &mut WaitObject, |
| ) -> Result<(), Errno> { |
| wait_object.wait_canceler = wait_object.target()?.wait_async( |
| current_task, |
| &self.waiter, |
| wait_object.events, |
| self.new_wait_handler(key), |
| ); |
| if wait_object.wait_canceler.is_none() { |
| return error!(EPERM); |
| } |
| Ok(()) |
| } |
| |
| /// Checks if this EpollFileObject monitors the `epoll_file_object` at `epoll_file_handle`. |
| fn check_monitors(&self, epoll_file_handle: &FileHandle, depth_left: u32) -> Result<(), Errno> { |
| if depth_left == 0 { |
| return error!(EINVAL); |
| } |
| |
| let state = self.state.lock(); |
| for nested_object in state.wait_objects.values() { |
| match nested_object.target()?.downcast_file::<EpollFileObject>() { |
| None => continue, |
| Some(target) => { |
| if Arc::ptr_eq(&nested_object.target()?, epoll_file_handle) { |
| return error!(ELOOP); |
| } |
| target.check_monitors(epoll_file_handle, depth_left - 1)?; |
| } |
| } |
| } |
| |
| Ok(()) |
| } |
| |
| /// Asynchronously wait on certain events happening on a FileHandle. |
| pub fn add( |
| &self, |
| current_task: &CurrentTask, |
| file: &FileHandle, |
| epoll_file_handle: &FileHandle, |
| mut epoll_event: epoll_event, |
| ) -> Result<(), Errno> { |
| epoll_event.events |= FdEvents::POLLHUP.bits(); |
| epoll_event.events |= FdEvents::POLLERR.bits(); |
| |
| // Check if adding this file would cause a cycle at a max depth of 5. |
| if let Some(epoll_to_add) = file.downcast_file::<EpollFileObject>() { |
| // We need to check for `MAX_NESTED_DEPTH - 1` because adding `epoll_to_add` to self |
| // would result in a total depth of one more. |
| epoll_to_add.check_monitors(epoll_file_handle, MAX_NESTED_DEPTH - 1)?; |
| } |
| |
| let mut state = self.state.lock(); |
| let key = as_epoll_key(file).into(); |
| match state.wait_objects.entry(key) { |
| Entry::Occupied(_) => error!(EEXIST), |
| Entry::Vacant(entry) => { |
| let wait_object = entry.insert(WaitObject { |
| target: Arc::downgrade(file), |
| events: FdEvents::from_bits_truncate(epoll_event.events), |
| data: epoll_event.data, |
| wait_canceler: None, |
| }); |
| self.wait_on_file(current_task, key, wait_object) |
| } |
| } |
| } |
| |
| /// Modify the events we are looking for on a Filehandle. |
| pub fn modify( |
| &self, |
| current_task: &CurrentTask, |
| file: &FileHandle, |
| mut epoll_event: epoll_event, |
| ) -> Result<(), Errno> { |
| epoll_event.events |= FdEvents::POLLHUP.bits(); |
| epoll_event.events |= FdEvents::POLLERR.bits(); |
| |
| let mut state = self.state.lock(); |
| let key = as_epoll_key(file).into(); |
| state.rearm_list.retain(|x| x.key != key); |
| match state.wait_objects.entry(key) { |
| Entry::Occupied(mut entry) => { |
| let wait_object = entry.get_mut(); |
| if let Some(wait_canceler) = wait_object.wait_canceler.take() { |
| wait_canceler.cancel(); |
| } |
| wait_object.events = FdEvents::from_bits_truncate(epoll_event.events); |
| self.wait_on_file(current_task, key, wait_object) |
| } |
| Entry::Vacant(_) => error!(ENOENT), |
| } |
| } |
| |
| /// Cancel an asynchronous wait on an object. Events triggered before |
| /// calling this will still be delivered. |
| pub fn delete(&self, file: &FileHandle) -> Result<(), Errno> { |
| let mut state = self.state.lock(); |
| let key = as_epoll_key(file).into(); |
| if let Some(mut wait_object) = state.wait_objects.remove(&key) { |
| if let Some(wait_canceler) = wait_object.wait_canceler.take() { |
| wait_canceler.cancel(); |
| } |
| state.rearm_list.retain(|x| x.key != key); |
| Ok(()) |
| } else { |
| error!(ENOENT) |
| } |
| } |
| |
| /// Stores events from the Epoll's trigger list to the parameter `pending_list`. This does not |
| /// actually invoke the waiter which is how items are added to the trigger list. The caller |
| /// will have to do that before calling if needed. |
| /// |
| /// If an event in the trigger list is stale, the event will be re-added to the waiter. |
| /// |
| /// Returns true if any events were added. False means there was nothing in the trigger list. |
| fn process_triggered_events( |
| &self, |
| current_task: &CurrentTask, |
| pending_list: &mut Vec<ReadyItem>, |
| max_events: usize, |
| ) -> Result<(), Errno> { |
| let mut state = self.state.lock(); |
| // Move all the elements from `self.trigger_list` to this intermediary |
| // queue that we handle events from. This reduces the time spent holding |
| // `self.trigger_list`'s lock which reduces contention with objects that |
| // this epoll object has subscribed for notifications from. |
| state.processing_list.append(&mut *self.trigger_list.lock()); |
| while pending_list.len() < max_events && !state.processing_list.is_empty() { |
| if let Some(pending) = state.processing_list.pop_front() { |
| if let Some(wait) = state.wait_objects.get_mut(&pending.key) { |
| // The weak pointer to the FileObject target can be gone if the file was closed |
| // out from under us. If this happens it is not an error: ignore it and |
| // continue. |
| if let Some(target) = wait.target.upgrade() { |
| let ready = ReadyItem { |
| key: pending.key, |
| events: target.query_events(current_task)?, |
| }; |
| if ready.events.intersects(wait.events) { |
| pending_list.push(ready); |
| } else { |
| // Another thread already handled this event, wait for another one. |
| // Files can be legitimately closed out from under us so bad file |
| // descriptors are not an error. |
| match self.wait_on_file(current_task, pending.key, wait) { |
| Err(err) if err == EBADF => {} // File closed. |
| Err(err) => return Err(err), |
| _ => {} |
| } |
| } |
| } |
| } |
| } |
| } |
| Ok(()) |
| } |
| |
| /// Waits until an event exists in `pending_list` or until `timeout` has |
| /// been reached. |
| fn wait_until_pending_event( |
| &self, |
| current_task: &CurrentTask, |
| max_events: usize, |
| mut wait_deadline: zx::Time, |
| ) -> Result<Vec<ReadyItem>, Errno> { |
| let mut pending_list = Vec::new(); |
| |
| loop { |
| self.process_triggered_events(current_task, &mut pending_list, max_events)?; |
| |
| if pending_list.len() == max_events { |
| break; // No input events or output list full, nothing more we can do. |
| } |
| |
| if !pending_list.is_empty() { |
| // We now know we have at least one event to return. We shouldn't return |
| // immediately, in case there are more events available, but the next loop should |
| // wait with a 0 timeout to prevent further blocking. |
| wait_deadline = zx::Time::ZERO; |
| } |
| |
| // Loop back to check if there are more items in the Waiter's queue. Every wait_until() |
| // call will process a single event. In order to drain as many events as we can that |
| // are synchronously available, keep trying until it reports empty. |
| // |
| // The handlers in the waits cause items to be appended to trigger_list. See the closure |
| // in `wait_on_file` to see how this happens. |
| // |
| // This wait may return EINTR for nonzero timeouts which is not an error. We must be |
| // careful not to lose events if this happens. |
| // |
| // The first time through this loop we'll use the timeout passed into this function so |
| // can get EINTR. But since we haven't done anything or accumulated any results yet it's |
| // OK to immediately return and no information will be lost. |
| match self.waiter.wait_until(current_task, wait_deadline) { |
| Err(err) if err == ETIMEDOUT => break, |
| Err(err) if err == EINTR => { |
| // Terminating early will lose any events in the pending_list so that should |
| // only be for unrecoverable errors (not EINTR). The only time there should be a |
| // nonzero wait_deadline (and hence the ability to encounter EINTR) is when the |
| // pending list is empty. |
| debug_assert!( |
| pending_list.is_empty(), |
| "Got EINTR from wait of {}ns with {} items pending.", |
| wait_deadline.into_nanos(), |
| pending_list.len() |
| ); |
| return Err(err); |
| } |
| // TODO check if this is supposed to actually fail! |
| result => result?, |
| } |
| } |
| |
| Ok(pending_list) |
| } |
| |
| /// Blocking wait on all waited upon events with a timeout. |
| pub fn wait( |
| &self, |
| current_task: &CurrentTask, |
| max_events: usize, |
| deadline: zx::Time, |
| ) -> Result<Vec<epoll_event>, Errno> { |
| // First we start waiting again on wait objects that have |
| // previously been triggered. |
| { |
| let mut state = self.state.lock(); |
| let rearm_list = std::mem::take(&mut state.rearm_list); |
| for to_wait in rearm_list.iter() { |
| // TODO handle interrupts here |
| let w = state.wait_objects.get_mut(&to_wait.key).unwrap(); |
| self.wait_on_file(current_task, to_wait.key, w)?; |
| } |
| } |
| |
| let pending_list = self.wait_until_pending_event(current_task, max_events, deadline)?; |
| |
| // Process the pending list and add processed ReadyItem |
| // entries to the rearm_list for the next wait. |
| let mut result = vec![]; |
| let mut state = self.state.lock(); |
| for pending_event in pending_list.iter().unique_by(|e| e.key) { |
| // The wait could have been deleted by here, |
| // so ignore the None case. |
| if let Some(wait) = state.wait_objects.get_mut(&pending_event.key) { |
| let reported_events = pending_event.events.bits() & wait.events.bits(); |
| result.push(epoll_event::new(reported_events, wait.data)); |
| |
| // Files marked with `EPOLLONESHOT` should only notify |
| // once and need to be rearmed manually with epoll_ctl_mod(). |
| if wait.events.bits() & EPOLLONESHOT != 0 { |
| continue; |
| } |
| if wait.events.bits() & EPOLLET != 0 { |
| // The file can be closed while registered for epoll which is not an error. |
| // We do not expect other errors from waiting. |
| match self.wait_on_file_edge_triggered(current_task, pending_event.key, wait) { |
| Err(err) if err == EBADF => {} // File closed, ignore. |
| Err(err) => log_warn!("Unexpected wait result {:#?}", err), |
| _ => {} |
| } |
| } else { |
| state.rearm_list.push(pending_event.clone()); |
| } |
| } |
| } |
| |
| // Notify waiters of unprocessed events. |
| if !state.processing_list.is_empty() || !self.trigger_list.lock().is_empty() { |
| state.waiters.notify_fd_events(FdEvents::POLLIN); |
| } |
| |
| Ok(result) |
| } |
| } |
| |
| impl FileOps for EpollFileObject { |
| fileops_impl_nonseekable!(); |
| |
| fn write( |
| &self, |
| _file: &FileObject, |
| _current_task: &CurrentTask, |
| offset: usize, |
| _data: &mut dyn InputBuffer, |
| ) -> Result<usize, Errno> { |
| debug_assert!(offset == 0); |
| error!(EINVAL) |
| } |
| |
| fn read( |
| &self, |
| _file: &FileObject, |
| _current_task: &CurrentTask, |
| offset: usize, |
| _data: &mut dyn OutputBuffer, |
| ) -> Result<usize, Errno> { |
| debug_assert!(offset == 0); |
| error!(EINVAL) |
| } |
| |
| fn wait_async( |
| &self, |
| _file: &FileObject, |
| _current_task: &CurrentTask, |
| waiter: &Waiter, |
| events: FdEvents, |
| handler: EventHandler, |
| ) -> Option<WaitCanceler> { |
| Some(self.state.lock().waiters.wait_async_fd_events(waiter, events, handler)) |
| } |
| |
| fn query_events( |
| &self, |
| _file: &FileObject, |
| _current_task: &CurrentTask, |
| ) -> Result<FdEvents, Errno> { |
| let mut events = FdEvents::empty(); |
| let state = self.state.lock(); |
| if state.processing_list.is_empty() && self.trigger_list.lock().is_empty() { |
| events |= FdEvents::POLLIN; |
| } |
| Ok(events) |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use super::{epoll_event, EpollFileObject, EventHandler, OpenFlags}; |
| use crate::{ |
| fs::{ |
| buffers::{VecInputBuffer, VecOutputBuffer}, |
| eventfd::{new_eventfd, EventFdType}, |
| fuchsia::create_fuchsia_pipe, |
| pipe::new_pipe, |
| socket::{SocketDomain, SocketType, UnixSocket}, |
| FdEvents, |
| }, |
| task::Waiter, |
| testing::{create_kernel_and_task, create_task}, |
| }; |
| use fuchsia_zircon::{self as zx, HandleBased}; |
| use std::sync::atomic::{AtomicU64, Ordering}; |
| use syncio::Zxio; |
| |
| #[::fuchsia::test] |
| async fn test_epoll_read_ready() { |
| static WRITE_COUNT: AtomicU64 = AtomicU64::new(0); |
| const EVENT_DATA: u64 = 42; |
| |
| let (kernel, _init_task) = create_kernel_and_task(); |
| let current_task = create_task(&kernel, "main-task"); |
| let writer_task = create_task(&kernel, "writer-task"); |
| |
| let (pipe_out, pipe_in) = new_pipe(¤t_task).unwrap(); |
| |
| let test_string = "hello starnix".to_string(); |
| let test_len = test_string.len(); |
| |
| let epoll_file_handle = EpollFileObject::new_file(¤t_task); |
| let epoll_file = epoll_file_handle.downcast_file::<EpollFileObject>().unwrap(); |
| epoll_file |
| .add( |
| ¤t_task, |
| &pipe_out, |
| &epoll_file_handle, |
| epoll_event::new(FdEvents::POLLIN.bits(), EVENT_DATA), |
| ) |
| .unwrap(); |
| |
| let test_string_copy = test_string.clone(); |
| let thread = std::thread::spawn(move || { |
| let bytes_written = pipe_in |
| .write(&writer_task, &mut VecInputBuffer::new(test_string_copy.as_bytes())) |
| .unwrap(); |
| assert_eq!(bytes_written, test_len); |
| WRITE_COUNT.fetch_add(bytes_written as u64, Ordering::Relaxed); |
| }); |
| let events = epoll_file.wait(¤t_task, 10, zx::Time::INFINITE).unwrap(); |
| let _ = thread.join(); |
| assert_eq!(1, events.len()); |
| let event = &events[0]; |
| assert!(FdEvents::from_bits_truncate(event.events).contains(FdEvents::POLLIN)); |
| let data = event.data; |
| assert_eq!(EVENT_DATA, data); |
| |
| let mut buffer = VecOutputBuffer::new(test_len); |
| let bytes_read = pipe_out.read(¤t_task, &mut buffer).unwrap(); |
| assert_eq!(bytes_read as u64, WRITE_COUNT.load(Ordering::Relaxed)); |
| assert_eq!(bytes_read, test_len); |
| assert_eq!(buffer.data(), test_string.as_bytes()); |
| } |
| |
| #[::fuchsia::test] |
| async fn test_epoll_ready_then_wait() { |
| const EVENT_DATA: u64 = 42; |
| |
| let (_kernel, current_task) = create_kernel_and_task(); |
| |
| let (pipe_out, pipe_in) = new_pipe(¤t_task).unwrap(); |
| |
| let test_string = "hello starnix".to_string(); |
| let test_bytes = test_string.as_bytes(); |
| let test_len = test_bytes.len(); |
| |
| assert_eq!( |
| pipe_in.write(¤t_task, &mut VecInputBuffer::new(test_bytes)).unwrap(), |
| test_bytes.len() |
| ); |
| |
| let epoll_file_handle = EpollFileObject::new_file(¤t_task); |
| let epoll_file = epoll_file_handle.downcast_file::<EpollFileObject>().unwrap(); |
| epoll_file |
| .add( |
| ¤t_task, |
| &pipe_out, |
| &epoll_file_handle, |
| epoll_event::new(FdEvents::POLLIN.bits(), EVENT_DATA), |
| ) |
| .unwrap(); |
| |
| let events = epoll_file.wait(¤t_task, 10, zx::Time::INFINITE).unwrap(); |
| assert_eq!(1, events.len()); |
| let event = &events[0]; |
| assert!(FdEvents::from_bits_truncate(event.events).contains(FdEvents::POLLIN)); |
| let data = event.data; |
| assert_eq!(EVENT_DATA, data); |
| |
| let mut buffer = VecOutputBuffer::new(test_len); |
| let bytes_read = pipe_out.read(¤t_task, &mut buffer).unwrap(); |
| assert_eq!(bytes_read, test_len); |
| assert_eq!(buffer.data(), test_bytes); |
| } |
| |
| #[::fuchsia::test] |
| async fn test_epoll_ctl_cancel() { |
| for do_cancel in [true, false] { |
| let (_kernel, current_task) = create_kernel_and_task(); |
| let event = new_eventfd(¤t_task, 0, EventFdType::Counter, true); |
| let waiter = Waiter::new(); |
| |
| let epoll_file_handle = EpollFileObject::new_file(¤t_task); |
| let epoll_file = epoll_file_handle.downcast_file::<EpollFileObject>().unwrap(); |
| const EVENT_DATA: u64 = 42; |
| epoll_file |
| .add( |
| ¤t_task, |
| &event, |
| &epoll_file_handle, |
| epoll_event::new(FdEvents::POLLIN.bits(), EVENT_DATA), |
| ) |
| .unwrap(); |
| |
| if do_cancel { |
| epoll_file.delete(&event).unwrap(); |
| } |
| |
| let wait_canceler = event |
| .wait_async(¤t_task, &waiter, FdEvents::POLLIN, EventHandler::None) |
| .expect("wait_async"); |
| if do_cancel { |
| wait_canceler.cancel(); |
| } |
| |
| let add_val = 1u64; |
| assert_eq!( |
| event |
| .write(¤t_task, &mut VecInputBuffer::new(&add_val.to_ne_bytes())) |
| .unwrap(), |
| std::mem::size_of::<u64>() |
| ); |
| |
| let events = epoll_file.wait(¤t_task, 10, zx::Time::ZERO).unwrap(); |
| |
| if do_cancel { |
| assert_eq!(0, events.len()); |
| } else { |
| assert_eq!(1, events.len()); |
| let event = &events[0]; |
| assert!(FdEvents::from_bits_truncate(event.events).contains(FdEvents::POLLIN)); |
| let data = event.data; |
| assert_eq!(EVENT_DATA, data); |
| } |
| } |
| } |
| |
| #[::fuchsia::test] |
| async fn test_multiple_events() { |
| let (_kernel, current_task) = create_kernel_and_task(); |
| let (client1, server1) = zx::Socket::create_stream(); |
| let (client2, server2) = zx::Socket::create_stream(); |
| let pipe1 = create_fuchsia_pipe(¤t_task, client1, OpenFlags::RDWR) |
| .expect("create_fuchsia_pipe"); |
| let pipe2 = create_fuchsia_pipe(¤t_task, client2, OpenFlags::RDWR) |
| .expect("create_fuchsia_pipe"); |
| let server1_zxio = Zxio::create(server1.into_handle()).expect("Zxio::create"); |
| let server2_zxio = Zxio::create(server2.into_handle()).expect("Zxio::create"); |
| |
| let poll = || { |
| let epoll_object = EpollFileObject::new_file(¤t_task); |
| let epoll_file = epoll_object.downcast_file::<EpollFileObject>().unwrap(); |
| epoll_file |
| .add( |
| ¤t_task, |
| &pipe1, |
| &epoll_object, |
| epoll_event::new(FdEvents::POLLIN.bits(), 1), |
| ) |
| .expect("epoll_file.add"); |
| epoll_file |
| .add( |
| ¤t_task, |
| &pipe2, |
| &epoll_object, |
| epoll_event::new(FdEvents::POLLIN.bits(), 2), |
| ) |
| .expect("epoll_file.add"); |
| epoll_file.wait(¤t_task, 2, zx::Time::ZERO).expect("wait") |
| }; |
| |
| let fds = poll(); |
| assert!(fds.is_empty()); |
| |
| assert_eq!(server1_zxio.write(&[0]).expect("write"), 1); |
| |
| let fds = poll(); |
| assert_eq!(fds.len(), 1); |
| assert_eq!(FdEvents::from_bits_truncate(fds[0].events), FdEvents::POLLIN); |
| let data = fds[0].data; |
| assert_eq!(data, 1); |
| assert_eq!(pipe1.read(¤t_task, &mut VecOutputBuffer::new(64)).expect("read"), 1); |
| |
| let fds = poll(); |
| assert!(fds.is_empty()); |
| |
| assert_eq!(server2_zxio.write(&[0]).expect("write"), 1); |
| |
| let fds = poll(); |
| assert_eq!(fds.len(), 1); |
| assert_eq!(FdEvents::from_bits_truncate(fds[0].events), FdEvents::POLLIN); |
| let data = fds[0].data; |
| assert_eq!(data, 2); |
| assert_eq!(pipe2.read(¤t_task, &mut VecOutputBuffer::new(64)).expect("read"), 1); |
| |
| let fds = poll(); |
| assert!(fds.is_empty()); |
| } |
| |
| #[::fuchsia::test] |
| async fn test_cancel_after_notify() { |
| let (_kernel, current_task) = create_kernel_and_task(); |
| let event = new_eventfd(¤t_task, 0, EventFdType::Counter, true); |
| let epoll_file_handle = EpollFileObject::new_file(¤t_task); |
| let epoll_file = epoll_file_handle.downcast_file::<EpollFileObject>().unwrap(); |
| |
| // Add a thing |
| const EVENT_DATA: u64 = 42; |
| epoll_file |
| .add( |
| ¤t_task, |
| &event, |
| &epoll_file_handle, |
| epoll_event::new(FdEvents::POLLIN.bits(), EVENT_DATA), |
| ) |
| .unwrap(); |
| |
| // Make the thing send a notification, wait for it |
| let add_val = 1u64; |
| assert_eq!( |
| event.write(¤t_task, &mut VecInputBuffer::new(&add_val.to_ne_bytes())).unwrap(), |
| std::mem::size_of::<u64>() |
| ); |
| |
| assert_eq!(epoll_file.wait(¤t_task, 10, zx::Time::ZERO).unwrap().len(), 1); |
| |
| // Remove the thing |
| epoll_file.delete(&event).unwrap(); |
| |
| // Wait for new notifications |
| assert_eq!(epoll_file.wait(¤t_task, 10, zx::Time::ZERO).unwrap().len(), 0); |
| // That shouldn't crash |
| } |
| |
| #[::fuchsia::test] |
| async fn test_add_then_modify() { |
| let (_kernel, current_task) = create_kernel_and_task(); |
| let (socket1, _socket2) = UnixSocket::new_pair( |
| ¤t_task, |
| SocketDomain::Unix, |
| SocketType::Stream, |
| OpenFlags::RDWR, |
| ) |
| .expect("Failed to create socket pair."); |
| |
| let epoll_file_handle = EpollFileObject::new_file(¤t_task); |
| let epoll_file = epoll_file_handle.downcast_file::<EpollFileObject>().unwrap(); |
| |
| const EVENT_DATA: u64 = 42; |
| epoll_file |
| .add( |
| ¤t_task, |
| &socket1, |
| &epoll_file_handle, |
| epoll_event::new(FdEvents::POLLIN.bits(), EVENT_DATA), |
| ) |
| .unwrap(); |
| assert_eq!(epoll_file.wait(¤t_task, 10, zx::Time::ZERO).unwrap().len(), 0); |
| |
| let read_write_event = FdEvents::POLLIN | FdEvents::POLLOUT; |
| epoll_file |
| .modify(¤t_task, &socket1, epoll_event::new(read_write_event.bits(), EVENT_DATA)) |
| .unwrap(); |
| let triggered_events = epoll_file.wait(¤t_task, 10, zx::Time::ZERO).unwrap(); |
| assert_eq!(1, triggered_events.len()); |
| let event = &triggered_events[0]; |
| let events = event.events; |
| assert_eq!(events, FdEvents::POLLOUT.bits()); |
| let data = event.data; |
| assert_eq!(EVENT_DATA, data); |
| } |
| } |