| // Copyright 2021 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| use crate::mm::memory::MemoryObject; |
| use crate::mm::{CompareExchangeResult, ProtectionFlags}; |
| use crate::task::{CurrentTask, EventHandler, SignalHandler, SignalHandlerInner, Task, Waiter}; |
| use futures::channel::oneshot; |
| use starnix_sync::{InterruptibleEvent, LockBefore, Locked, OrderedMutex, TerminalLock, Unlocked}; |
| use starnix_types::futex_address::FutexAddress; |
| use starnix_uapi::errors::Errno; |
| use starnix_uapi::user_address::UserAddress; |
| use starnix_uapi::{FUTEX_BITSET_MATCH_ANY, FUTEX_TID_MASK, FUTEX_WAITERS, errno, error}; |
| use std::collections::hash_map::Entry; |
| use std::collections::{HashMap, VecDeque}; |
| use std::hash::Hash; |
| use std::sync::{Arc, Weak}; |
| |
| /// A table of futexes. |
| /// |
| /// Each 32-bit aligned address in an address space can potentially have an associated futex that |
| /// userspace can wait upon. This table is a sparse representation that has an actual WaitQueue |
| /// only for those addresses that have ever actually had a futex operation performed on them. |
| pub struct FutexTable<Key: FutexKey> { |
| /// The futexes associated with each address in each VMO. |
| /// |
| /// This HashMap is populated on-demand when futexes are used. |
| state: OrderedMutex<FutexTableState<Key>, TerminalLock>, |
| } |
| |
| impl<Key: FutexKey> Default for FutexTable<Key> { |
| fn default() -> Self { |
| Self { state: OrderedMutex::new(FutexTableState::default()) } |
| } |
| } |
| |
| impl<Key: FutexKey> FutexTable<Key> { |
| /// Wait on the futex at the given address given a boot deadline. |
| /// |
| /// See FUTEX_WAIT when passed a deadline in CLOCK_REALTIME. |
| pub fn wait_boot( |
| &self, |
| locked: &mut Locked<Unlocked>, |
| current_task: &CurrentTask, |
| addr: UserAddress, |
| value: u32, |
| mask: u32, |
| deadline: zx::BootInstant, |
| timer_slack: zx::BootDuration, |
| ) -> Result<(), Errno> { |
| let addr = FutexAddress::try_from(addr)?; |
| let mut state = self.state.lock(locked); |
| // As the state is locked, no wake can happen before the waiter is registered. |
| // If the addr is remapped, we will read stale data, but we will not miss a futex wake. |
| // Acquire ordering to synchronize with userspace modifications to the value on other |
| // threads. |
| let loaded_value = current_task.mm()?.atomic_load_u32_acquire(addr)?; |
| if value != loaded_value { |
| return error!(EAGAIN); |
| } |
| |
| let key = Key::get(current_task, addr)?; |
| let waiter = Arc::new(Waiter::new()); |
| let timer = zx::BootTimer::create(); |
| let signal_handler = SignalHandler { |
| inner: SignalHandlerInner::None, |
| event_handler: EventHandler::None, |
| err_code: Some(errno!(ETIMEDOUT)), |
| }; |
| waiter |
| .wake_on_zircon_signals(&timer, zx::Signals::TIMER_SIGNALED, signal_handler) |
| .expect("wait can only fail in OOM conditions"); |
| timer |
| .set(deadline, timer_slack) |
| .expect("timer set cannot fail with valid handles and slack"); |
| state.get_waiters_or_default(key.clone()).add(FutexWaiter { |
| mask, |
| notifiable: FutexNotifiable::new_internal_boot(Arc::downgrade(&waiter)), |
| }); |
| std::mem::drop(state); |
| waiter.wait(locked, current_task).inspect_err(|_| { |
| // If wait returned an error (e.g., ETIMEDOUT, EINTR), we must explicitly |
| // remove our waiter from the queue to prevent a memory leak. |
| // If it succeeded, the waker has already removed us from the queue. |
| self.state.lock(locked).remove_boot_waiter_from_queue(key, &waiter); |
| }) |
| } |
| |
| /// Wait on the futex at the given address. |
| /// |
| /// See FUTEX_WAIT. |
| pub fn wait<L>( |
| &self, |
| locked: &mut Locked<L>, |
| current_task: &CurrentTask, |
| addr: UserAddress, |
| value: u32, |
| mask: u32, |
| deadline: zx::MonotonicInstant, |
| ) -> Result<(), Errno> |
| where |
| L: LockBefore<TerminalLock>, |
| { |
| let addr = FutexAddress::try_from(addr)?; |
| let mut state = self.state.lock(locked); |
| // As the state is locked, no wake can happen before the waiter is registered. |
| // If the addr is remapped, we will read stale data, but we will not miss a futex wake. |
| // Acquire ordering to synchronize with userspace modifications to the value on other |
| // threads. |
| let loaded_value = current_task.mm()?.atomic_load_u32_acquire(addr)?; |
| if value != loaded_value { |
| return error!(EAGAIN); |
| } |
| |
| let key = Key::get(current_task, addr)?; |
| let event = InterruptibleEvent::new(); |
| let guard = event.begin_wait(); |
| state.get_waiters_or_default(key.clone()).add(FutexWaiter { |
| mask, |
| notifiable: FutexNotifiable::new_internal(Arc::downgrade(&event)), |
| }); |
| std::mem::drop(state); |
| |
| current_task.block_until(guard, deadline).inspect_err(|_| { |
| // If block_until returned an error (e.g., ETIMEDOUT, EINTR), we must explicitly |
| // remove our waiter from the queue to prevent a memory leak. |
| // If it succeeded, the waker has already removed us from the queue. |
| self.state.lock(locked).remove_waiter_from_queue(key, &event); |
| }) |
| } |
| |
| /// Wake the given number of waiters on futex at the given address. Returns the number of |
| /// waiters actually woken. |
| /// |
| /// See FUTEX_WAKE. |
| pub fn wake<L>( |
| &self, |
| locked: &mut Locked<L>, |
| task: &Task, |
| addr: UserAddress, |
| count: usize, |
| mask: u32, |
| ) -> Result<usize, Errno> |
| where |
| L: LockBefore<TerminalLock>, |
| { |
| let addr = FutexAddress::try_from(addr)?; |
| let key = Key::get(task, addr)?; |
| Ok(self.state.lock(locked).wake(key, count, mask)) |
| } |
| |
| /// Requeue the waiters to another address. |
| /// |
| /// See FUTEX_CMP_REQUEUE |
| pub fn requeue<L>( |
| &self, |
| locked: &mut Locked<L>, |
| current_task: &CurrentTask, |
| addr: UserAddress, |
| wake_count: usize, |
| requeue_count: usize, |
| new_addr: UserAddress, |
| expected_value: Option<u32>, |
| ) -> Result<usize, Errno> |
| where |
| L: LockBefore<TerminalLock>, |
| { |
| let addr = FutexAddress::try_from(addr)?; |
| let new_addr = FutexAddress::try_from(new_addr)?; |
| let key = Key::get(current_task, addr)?; |
| let new_key = Key::get(current_task, new_addr)?; |
| let mut state = self.state.lock(locked); |
| if let Some(expected) = expected_value { |
| // Use acquire ordering here to synchronize with mutex impls that store w/ release |
| // ordering. |
| let value = current_task.mm()?.atomic_load_u32_acquire(addr)?; |
| if value != expected { |
| return error!(EAGAIN); |
| } |
| } |
| |
| let woken; |
| let to_requeue; |
| match state.waiters.entry(key) { |
| Entry::Vacant(_) => return Ok(0), |
| Entry::Occupied(mut entry) => { |
| // Wake up at most `wake_count` waiters. |
| woken = entry.get_mut().notify(FUTEX_BITSET_MATCH_ANY, wake_count); |
| |
| // Dequeue up to `requeue_count` waiters to requeue below. |
| to_requeue = entry.get_mut().split_for_requeue(requeue_count); |
| |
| if entry.get().is_empty() { |
| entry.remove(); |
| } |
| } |
| } |
| |
| let requeued = to_requeue.0.len(); |
| if !to_requeue.is_empty() { |
| state.get_waiters_or_default(new_key).transfer(to_requeue); |
| } |
| |
| Ok(woken + requeued) |
| } |
| |
| /// Lock the futex at the given address. |
| /// |
| /// See FUTEX_LOCK_PI. |
| pub fn lock_pi<L>( |
| &self, |
| locked: &mut Locked<L>, |
| current_task: &CurrentTask, |
| addr: UserAddress, |
| deadline: zx::MonotonicInstant, |
| ) -> Result<(), Errno> |
| where |
| L: LockBefore<TerminalLock>, |
| { |
| let addr = FutexAddress::try_from(addr)?; |
| let mut state = self.state.lock(locked); |
| // As the state is locked, no unlock can happen before the waiter is registered. |
| // If the addr is remapped, we will read stale data, but we will not miss a futex unlock. |
| let key = Key::get(current_task, addr)?; |
| |
| let tid = current_task.get_tid() as u32; |
| let mm = current_task.mm()?; |
| |
| // Use a relaxed ordering because the compare/exchange below creates a synchronization |
| // point with userspace threads in the success case. No synchronization is required in |
| // failure cases. |
| let mut current_value = mm.atomic_load_u32_relaxed(addr)?; |
| let new_owner_tid = loop { |
| let new_owner_tid = current_value & FUTEX_TID_MASK; |
| if new_owner_tid == tid { |
| // From <https://man7.org/linux/man-pages/man2/futex.2.html>: |
| // |
| // EDEADLK |
| // (FUTEX_LOCK_PI, FUTEX_LOCK_PI2, FUTEX_TRYLOCK_PI, |
| // FUTEX_CMP_REQUEUE_PI) The futex word at uaddr is already |
| // locked by the caller. |
| return error!(EDEADLOCK); |
| } |
| |
| if current_value == 0 { |
| // Use acq/rel ordering to synchronize with acquire ordering on userspace lock ops |
| // and with the release ordering on userspace unlock ops. |
| match mm.atomic_compare_exchange_weak_u32_acq_rel(addr, current_value, tid) { |
| CompareExchangeResult::Success => return Ok(()), |
| CompareExchangeResult::Stale { observed } => { |
| current_value = observed; |
| continue; |
| } |
| CompareExchangeResult::Error(e) => return Err(e), |
| } |
| } |
| |
| // Use acq/rel ordering to synchronize with acquire ordering on userspace lock ops and |
| // with the release ordering on userspace unlock ops. |
| let target_value = current_value | FUTEX_WAITERS; |
| match mm.atomic_compare_exchange_u32_acq_rel(addr, current_value, target_value) { |
| CompareExchangeResult::Success => (), |
| CompareExchangeResult::Stale { observed } => { |
| current_value = observed; |
| continue; |
| } |
| CompareExchangeResult::Error(e) => return Err(e), |
| } |
| break new_owner_tid; |
| }; |
| |
| let event = InterruptibleEvent::new(); |
| let guard = event.begin_wait(); |
| let notifiable = FutexNotifiable::new_internal(Arc::downgrade(&event)); |
| state |
| .get_rt_mutex_waiters_or_default(key.clone()) |
| .push_back(RtMutexWaiter { tid, notifiable }); |
| std::mem::drop(state); |
| |
| // ESRCH (FUTEX_LOCK_PI, FUTEX_LOCK_PI2, FUTEX_TRYLOCK_PI, |
| // FUTEX_CMP_REQUEUE_PI) The thread ID in the futex word at |
| // uaddr does not exist. |
| current_task |
| .get_task(new_owner_tid as i32) |
| .upgrade() |
| .and_then(|o| o.live().unwrap().thread.read().as_ref().map(Arc::clone)) |
| .map_or_else( |
| || error!(ESRCH), |
| |owner| current_task.block_with_owner_until(guard, &owner, deadline), |
| ) |
| .inspect_err(|_| { |
| // If block_with_owner_until returned an error (e.g., ETIMEDOUT), or if we |
| // failed to find the new owner (ESRCH), we must explicitly remove our waiter |
| // from the PI-mutex queue to prevent a memory leak. |
| self.state.lock(locked).remove_rt_mutex_waiter_from_queue(key, &event); |
| }) |
| } |
| |
| /// Unlock the futex at the given address. |
| /// |
| /// See FUTEX_UNLOCK_PI. |
| pub fn unlock_pi<L>( |
| &self, |
| locked: &mut Locked<L>, |
| current_task: &CurrentTask, |
| addr: UserAddress, |
| ) -> Result<(), Errno> |
| where |
| L: LockBefore<TerminalLock>, |
| { |
| let addr = FutexAddress::try_from(addr)?; |
| let mut state = self.state.lock(locked); |
| let tid = current_task.get_tid() as u32; |
| let mm = current_task.mm()?; |
| |
| let key = Key::get(current_task, addr)?; |
| |
| // Use a relaxed ordering because the compare/exchange below creates a synchronization |
| // point with userspace threads in the success case. No synchronization is required in |
| // failure cases. |
| let current_value = mm.atomic_load_u32_relaxed(addr)?; |
| if current_value & FUTEX_TID_MASK != tid { |
| // From <https://man7.org/linux/man-pages/man2/futex.2.html>: |
| // |
| // EPERM (FUTEX_UNLOCK_PI) The caller does not own the lock |
| // represented by the futex word. |
| return error!(EPERM); |
| } |
| |
| loop { |
| let maybe_waiter = state.pop_rt_mutex_waiter(key.clone()); |
| let target_value = if let Some(waiter) = &maybe_waiter { waiter.tid } else { 0 }; |
| |
| // Use acq/rel ordering to synchronize with acquire ordering on userspace lock ops and |
| // with the release ordering on userspace unlock ops. |
| match mm.atomic_compare_exchange_u32_acq_rel(addr, current_value, target_value) { |
| CompareExchangeResult::Success => (), |
| // From <https://man7.org/linux/man-pages/man2/futex.2.html>: |
| // |
| // EINVAL (FUTEX_LOCK_PI, FUTEX_LOCK_PI2, FUTEX_TRYLOCK_PI, |
| // FUTEX_UNLOCK_PI) The kernel detected an inconsistency |
| // between the user-space state at uaddr and the kernel |
| // state. This indicates either state corruption or that the |
| // kernel found a waiter on uaddr which is waiting via |
| // FUTEX_WAIT or FUTEX_WAIT_BITSET. |
| CompareExchangeResult::Stale { .. } => return error!(EINVAL), |
| // From <https://man7.org/linux/man-pages/man2/futex.2.html>: |
| // |
| // EACCES No read access to the memory of a futex word. |
| CompareExchangeResult::Error(_) => return error!(EACCES), |
| } |
| |
| let Some(mut waiter) = maybe_waiter else { |
| // We can stop trying to notify a thread if there are no more waiters. |
| break; |
| }; |
| |
| if waiter.notifiable.notify() { |
| break; |
| } |
| |
| // If we couldn't notify the waiter, then we need to pull the next thread off the |
| // waiter list. |
| } |
| |
| Ok(()) |
| } |
| } |
| |
| impl FutexTable<SharedFutexKey> { |
| /// Wait on the futex at the given offset in the memory. |
| /// |
| /// Returns a receiver that will be signaled when the futex is woken, and an |
| /// `Arc<()>` token that must be kept alive by the caller for the duration of the |
| /// wait. If the caller drops the token (e.g., if the external client |
| /// disconnects), the waiter is marked as stale and will be garbage-collected by the |
| /// next futex operation on this table. |
| /// |
| /// See FUTEX_WAIT. |
| pub fn external_wait<L>( |
| &self, |
| locked: &mut Locked<L>, |
| memory: MemoryObject, |
| offset: u64, |
| value: u32, |
| mask: u32, |
| ) -> Result<(Arc<()>, oneshot::Receiver<()>), Errno> |
| where |
| L: LockBefore<TerminalLock>, |
| { |
| let key = SharedFutexKey::new(&memory, offset); |
| let mut state = self.state.lock(locked); |
| // As the state is locked, no wake can happen before the waiter is registered. |
| Self::external_check_futex_value(&memory, offset, value)?; |
| |
| let token = Arc::new(()); |
| let (sender, receiver) = oneshot::channel::<()>(); |
| state.get_waiters_or_default(key).add(FutexWaiter { |
| mask, |
| notifiable: FutexNotifiable::new_external(Arc::downgrade(&token), sender), |
| }); |
| Ok((token, receiver)) |
| } |
| |
| /// Wake the given number of waiters on futex at the given offset in the memory. Returns the |
| /// number of waiters actually woken. |
| /// |
| /// See FUTEX_WAKE. |
| pub fn external_wake<L>( |
| &self, |
| locked: &mut Locked<L>, |
| memory: MemoryObject, |
| offset: u64, |
| count: usize, |
| mask: u32, |
| ) -> Result<usize, Errno> |
| where |
| L: LockBefore<TerminalLock>, |
| { |
| Ok(self.state.lock(locked).wake(SharedFutexKey::new(&memory, offset), count, mask)) |
| } |
| |
| fn external_check_futex_value( |
| memory: &MemoryObject, |
| offset: u64, |
| value: u32, |
| ) -> Result<(), Errno> { |
| let loaded_value = { |
| // TODO: This read should be atomic. |
| let mut buf = [0u8; 4]; |
| memory.read(&mut buf, offset).map_err(|_| errno!(EINVAL))?; |
| u32::from_ne_bytes(buf) |
| }; |
| if loaded_value != value { |
| return error!(EAGAIN); |
| } |
| Ok(()) |
| } |
| } |
| |
| pub trait FutexKey: Sized + Ord + Hash + Clone { |
| fn get(task: &Task, addr: FutexAddress) -> Result<Self, Errno>; |
| fn get_table_from_task(task: &Task) -> Result<Arc<FutexTable<Self>>, Errno>; |
| } |
| |
| #[derive(Debug, Clone, Eq, Hash, PartialEq, Ord, PartialOrd)] |
| pub struct PrivateFutexKey { |
| addr: FutexAddress, |
| } |
| |
| impl FutexKey for PrivateFutexKey { |
| fn get(_task: &Task, addr: FutexAddress) -> Result<Self, Errno> { |
| Ok(PrivateFutexKey { addr }) |
| } |
| |
| fn get_table_from_task(task: &Task) -> Result<Arc<FutexTable<Self>>, Errno> { |
| Ok(task.mm()?.futex.clone()) |
| } |
| } |
| |
| #[derive(Debug, Clone, Eq, Hash, PartialEq, Ord, PartialOrd)] |
| pub struct SharedFutexKey { |
| // No chance of collisions since koids are never reused: |
| // https://fuchsia.dev/fuchsia-src/concepts/kernel/concepts#kernel_object_ids |
| koid: zx::Koid, |
| offset: u64, |
| } |
| |
| impl FutexKey for SharedFutexKey { |
| fn get(task: &Task, addr: FutexAddress) -> Result<Self, Errno> { |
| let (memory, offset) = task.mm()?.get_mapping_memory(addr.into(), ProtectionFlags::READ)?; |
| Ok(SharedFutexKey::new(&memory, offset)) |
| } |
| |
| fn get_table_from_task(task: &Task) -> Result<Arc<FutexTable<Self>>, Errno> { |
| Ok(task.kernel().shared_futexes.clone()) |
| } |
| } |
| |
| impl SharedFutexKey { |
| fn new(memory: &MemoryObject, offset: u64) -> Self { |
| Self { koid: memory.get_koid(), offset } |
| } |
| } |
| |
| struct FutexTableState<Key: FutexKey> { |
| waiters: HashMap<Key, FutexWaiters>, |
| rt_mutex_waiters: HashMap<Key, VecDeque<RtMutexWaiter>>, |
| } |
| |
| impl<Key: FutexKey> Default for FutexTableState<Key> { |
| fn default() -> Self { |
| Self { waiters: Default::default(), rt_mutex_waiters: Default::default() } |
| } |
| } |
| |
| impl<Key: FutexKey> FutexTableState<Key> { |
| /// Returns the FutexWaiters for a given address, creating an empty one if none is registered. |
| fn get_waiters_or_default(&mut self, key: Key) -> &mut FutexWaiters { |
| self.waiters.entry(key).or_default() |
| } |
| |
| fn wake(&mut self, key: Key, count: usize, mask: u32) -> usize { |
| let entry = self.waiters.entry(key); |
| match entry { |
| Entry::Vacant(_) => 0, |
| Entry::Occupied(mut entry) => { |
| let count = entry.get_mut().notify(mask, count); |
| if entry.get().is_empty() { |
| entry.remove(); |
| } |
| count |
| } |
| } |
| } |
| |
| /// Returns the RT-Mutex waiters queue for a given address, creating an empty queue if none is |
| /// registered. |
| fn get_rt_mutex_waiters_or_default(&mut self, key: Key) -> &mut VecDeque<RtMutexWaiter> { |
| self.rt_mutex_waiters.entry(key).or_default() |
| } |
| |
| /// Pop the next RT-Mutex for the given address. |
| fn pop_rt_mutex_waiter(&mut self, key: Key) -> Option<RtMutexWaiter> { |
| let entry = self.rt_mutex_waiters.entry(key); |
| match entry { |
| Entry::Vacant(_) => None, |
| Entry::Occupied(mut entry) => { |
| let mut waiter = entry.get_mut().pop_front(); |
| // Clean up the hash map entry if the queue is empty. We do this |
| // regardless of whether `pop_front` returned a waiter or `None`, |
| // effectively garbage collecting erroneously empty map entries. |
| if entry.get().is_empty() { |
| entry.remove(); |
| } else if let Some(waiter) = &mut waiter { |
| waiter.tid |= FUTEX_WAITERS; |
| } |
| waiter |
| } |
| } |
| } |
| |
| /// Removes a standard `FUTEX_WAIT` waiter from the queue. |
| /// |
| /// This uses a two-step approach: |
| /// 1. O(1) Fast path: Check the `key` where the waiter originally went to sleep. |
| /// 2. O(N) Fallback: If not found (e.g. moved via `FUTEX_REQUEUE`), scan all futexes. |
| fn remove_waiter_from_queue(&mut self, key: Key, event: &Arc<InterruptibleEvent>) { |
| if let Entry::Occupied(mut entry) = self.waiters.entry(key) { |
| if entry.get_mut().remove_waiter(event) { |
| if entry.get().is_empty() { |
| entry.remove(); |
| } |
| return; |
| } |
| } |
| |
| let mut key_to_remove = None; |
| for (key, waiters) in self.waiters.iter_mut() { |
| if waiters.remove_waiter(event) { |
| if waiters.is_empty() { |
| key_to_remove = Some(key.clone()); |
| } |
| break; |
| } |
| } |
| if let Some(key) = key_to_remove { |
| self.waiters.remove(&key); |
| } |
| } |
| |
| /// Removes a `FUTEX_WAIT_BITSET` waiter (with `FUTEX_CLOCK_REALTIME`). |
| /// |
| /// Like `remove_waiter_from_queue`, it tries the fast O(1) lookup on the original `key` first, |
| /// and falls back to an O(N) scan across all queues in case of a requeue. |
| fn remove_boot_waiter_from_queue(&mut self, key: Key, waiter: &Arc<Waiter>) { |
| if let Entry::Occupied(mut entry) = self.waiters.entry(key) { |
| if entry.get_mut().remove_boot_waiter(waiter) { |
| if entry.get().is_empty() { |
| entry.remove(); |
| } |
| return; |
| } |
| } |
| |
| let mut key_to_remove = None; |
| for (key, waiters) in self.waiters.iter_mut() { |
| if waiters.remove_boot_waiter(waiter) { |
| if waiters.is_empty() { |
| key_to_remove = Some(key.clone()); |
| } |
| break; |
| } |
| } |
| if let Some(key) = key_to_remove { |
| self.waiters.remove(&key); |
| } |
| } |
| |
| /// Removes a PI-mutex (`FUTEX_LOCK_PI`) waiter. |
| /// |
| /// Operates on the separate `rt_mutex_waiters` map using the same two-step |
| /// O(1)/O(N) algorithm as the other removal methods to handle edge cases where |
| /// PI-mutexes might be requeued (e.g. if `FUTEX_CMP_REQUEUE_PI` is used). |
| fn remove_rt_mutex_waiter_from_queue(&mut self, key: Key, event: &Arc<InterruptibleEvent>) { |
| let predicate = |
| |w: &RtMutexWaiter| !w.notifiable.matches_event(event) && !w.notifiable.is_stale(); |
| |
| if let Entry::Occupied(mut entry) = self.rt_mutex_waiters.entry(key) { |
| let len_before = entry.get().len(); |
| entry.get_mut().retain(predicate); |
| if entry.get().len() < len_before { |
| if entry.get().is_empty() { |
| entry.remove(); |
| } |
| return; |
| } |
| } |
| |
| let mut key_to_remove = None; |
| for (key, waiters) in self.rt_mutex_waiters.iter_mut() { |
| let len_before = waiters.len(); |
| waiters.retain(predicate); |
| if waiters.len() < len_before { |
| if waiters.is_empty() { |
| key_to_remove = Some(key.clone()); |
| } |
| break; |
| } |
| } |
| if let Some(key) = key_to_remove { |
| self.rt_mutex_waiters.remove(&key); |
| } |
| } |
| } |
| |
| /// Abstraction over a process waiting on a Futex that can be notified. |
| enum FutexNotifiable { |
| /// An internal process waiting on a Futex. |
| Internal(Weak<InterruptibleEvent>), |
| // An internal process waiting on a Futex with a boot deadline. |
| InternalBoot(Weak<Waiter>), |
| /// An external process waiting on a Futex. |
| // The sender needs to be an option so that one can send the notification while only holding a |
| // mut reference on the ExternalWaiter. |
| External(Weak<()>, Option<oneshot::Sender<()>>), |
| } |
| |
| impl FutexNotifiable { |
| fn new_internal(event: Weak<InterruptibleEvent>) -> Self { |
| Self::Internal(event) |
| } |
| |
| fn new_internal_boot(waiter: Weak<Waiter>) -> Self { |
| Self::InternalBoot(waiter) |
| } |
| |
| fn new_external(token: Weak<()>, sender: oneshot::Sender<()>) -> Self { |
| Self::External(token, Some(sender)) |
| } |
| |
| /// Tries to notify the process. Returns `true` is the process have been notified. Returns |
| /// `false` otherwise. This means the process is stale and will never be available again. |
| fn notify(&mut self) -> bool { |
| match self { |
| Self::Internal(event) => { |
| if let Some(event) = event.upgrade() { |
| event.notify(); |
| true |
| } else { |
| false |
| } |
| } |
| Self::InternalBoot(waiter) => { |
| if let Some(waiter) = waiter.upgrade() { |
| waiter.notify(); |
| true |
| } else { |
| false |
| } |
| } |
| Self::External(_, sender) => { |
| if let Some(sender) = sender.take() { |
| sender.send(()).is_ok() |
| } else { |
| false |
| } |
| } |
| } |
| } |
| |
| fn matches_event(&self, event: &Arc<InterruptibleEvent>) -> bool { |
| match self { |
| Self::Internal(weak) => { |
| if let Some(strong) = weak.upgrade() { |
| Arc::ptr_eq(&strong, event) |
| } else { |
| false |
| } |
| } |
| _ => false, |
| } |
| } |
| |
| fn matches_waiter(&self, waiter: &Arc<Waiter>) -> bool { |
| match self { |
| Self::InternalBoot(weak) => { |
| if let Some(strong) = weak.upgrade() { |
| Arc::ptr_eq(&strong, waiter) |
| } else { |
| false |
| } |
| } |
| _ => false, |
| } |
| } |
| |
| fn is_stale(&self) -> bool { |
| match self { |
| Self::Internal(weak) => weak.strong_count() == 0, |
| Self::External(weak, _) => weak.strong_count() == 0, |
| Self::InternalBoot(weak) => weak.strong_count() == 0, |
| } |
| } |
| } |
| |
| struct FutexWaiter { |
| mask: u32, |
| notifiable: FutexNotifiable, |
| } |
| |
| #[derive(Default)] |
| struct FutexWaiters(VecDeque<FutexWaiter>); |
| |
| impl FutexWaiters { |
| fn add(&mut self, waiter: FutexWaiter) { |
| self.0.push_back(waiter); |
| } |
| |
| fn notify(&mut self, mask: u32, count: usize) -> usize { |
| let mut woken = 0; |
| self.0.retain_mut(|waiter| { |
| if woken == count || waiter.mask & mask == 0 { |
| return true; |
| } |
| // The send will fail if the receiver is gone, which means nothing was actualling |
| // waiting on the futex. |
| if waiter.notifiable.notify() { |
| woken += 1; |
| } |
| false |
| }); |
| woken |
| } |
| |
| fn transfer(&mut self, mut other: Self) { |
| self.0.append(&mut other.0); |
| } |
| |
| fn is_empty(&self) -> bool { |
| self.0.is_empty() |
| } |
| |
| fn remove_waiter(&mut self, event: &Arc<InterruptibleEvent>) -> bool { |
| let initial_len = self.0.len(); |
| self.0.retain(|w| !w.notifiable.matches_event(event) && !w.notifiable.is_stale()); |
| self.0.len() < initial_len |
| } |
| |
| fn remove_boot_waiter(&mut self, waiter: &Arc<Waiter>) -> bool { |
| let initial_len = self.0.len(); |
| self.0.retain(|w| !w.notifiable.matches_waiter(waiter) && !w.notifiable.is_stale()); |
| self.0.len() < initial_len |
| } |
| |
| fn split_for_requeue(&mut self, count: usize) -> Self { |
| let count = std::cmp::min(count, self.0.len()); |
| let tail = self.0.split_off(count); |
| let head = std::mem::replace(&mut self.0, tail); |
| FutexWaiters(head) |
| } |
| } |
| |
| struct RtMutexWaiter { |
| /// The tid, possibly with the FUTEX_WAITERS bit set. |
| tid: u32, |
| |
| notifiable: FutexNotifiable, |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use super::*; |
| use starnix_sync::InterruptibleEvent; |
| use starnix_uapi::restricted_aspace::RESTRICTED_ASPACE_BASE; |
| use starnix_uapi::user_address::UserAddress; |
| |
| #[fuchsia::test] |
| fn test_remove_waiter_simple() { |
| let mut state = FutexTableState::<PrivateFutexKey>::default(); |
| let key = PrivateFutexKey { |
| addr: FutexAddress::try_from(UserAddress::from( |
| (RESTRICTED_ASPACE_BASE + 0x1000) as u64, |
| )) |
| .unwrap(), |
| }; |
| let event = Arc::new(InterruptibleEvent::new()); |
| |
| state.get_waiters_or_default(key.clone()).add(FutexWaiter { |
| mask: u32::MAX, |
| notifiable: FutexNotifiable::new_internal(Arc::downgrade(&event)), |
| }); |
| |
| assert_eq!(state.waiters.len(), 1); |
| state.remove_waiter_from_queue(key, &event); |
| assert_eq!(state.waiters.len(), 0); |
| } |
| |
| #[fuchsia::test] |
| fn test_remove_waiter_requeued() { |
| let mut state = FutexTableState::<PrivateFutexKey>::default(); |
| let key1 = PrivateFutexKey { |
| addr: FutexAddress::try_from(UserAddress::from( |
| (RESTRICTED_ASPACE_BASE + 0x1000) as u64, |
| )) |
| .unwrap(), |
| }; |
| let key2 = PrivateFutexKey { |
| addr: FutexAddress::try_from(UserAddress::from( |
| (RESTRICTED_ASPACE_BASE + 0x2000) as u64, |
| )) |
| .unwrap(), |
| }; |
| let event = Arc::new(InterruptibleEvent::new()); |
| |
| state.get_waiters_or_default(key2.clone()).add(FutexWaiter { |
| mask: u32::MAX, |
| notifiable: FutexNotifiable::new_internal(Arc::downgrade(&event)), |
| }); |
| |
| assert_eq!(state.waiters.len(), 1); |
| state.remove_waiter_from_queue(key1, &event); |
| assert_eq!(state.waiters.len(), 0); |
| } |
| |
| #[fuchsia::test] |
| fn test_remove_rt_mutex_waiter() { |
| let mut state = FutexTableState::<PrivateFutexKey>::default(); |
| let key = PrivateFutexKey { |
| addr: FutexAddress::try_from(UserAddress::from( |
| (RESTRICTED_ASPACE_BASE + 0x1000) as u64, |
| )) |
| .unwrap(), |
| }; |
| let event = Arc::new(InterruptibleEvent::new()); |
| |
| state.get_rt_mutex_waiters_or_default(key.clone()).push_back(RtMutexWaiter { |
| tid: 1, |
| notifiable: FutexNotifiable::new_internal(Arc::downgrade(&event)), |
| }); |
| |
| assert_eq!(state.rt_mutex_waiters.len(), 1); |
| state.remove_rt_mutex_waiter_from_queue(key, &event); |
| assert_eq!(state.rt_mutex_waiters.len(), 0); |
| } |
| |
| #[fuchsia::test] |
| fn test_split_for_requeue_fairness() { |
| let mut waiters = FutexWaiters::default(); |
| let e1 = Arc::new(InterruptibleEvent::new()); |
| let e2 = Arc::new(InterruptibleEvent::new()); |
| let e3 = Arc::new(InterruptibleEvent::new()); |
| |
| waiters.add(FutexWaiter { |
| mask: 1, |
| notifiable: FutexNotifiable::new_internal(Arc::downgrade(&e1)), |
| }); |
| waiters.add(FutexWaiter { |
| mask: 2, |
| notifiable: FutexNotifiable::new_internal(Arc::downgrade(&e2)), |
| }); |
| waiters.add(FutexWaiter { |
| mask: 3, |
| notifiable: FutexNotifiable::new_internal(Arc::downgrade(&e3)), |
| }); |
| |
| let split = waiters.split_for_requeue(2); |
| |
| assert_eq!(split.0.len(), 2); |
| assert_eq!(split.0[0].mask, 1); |
| assert_eq!(split.0[1].mask, 2); |
| |
| assert_eq!(waiters.0.len(), 1); |
| assert_eq!(waiters.0[0].mask, 3); |
| } |
| |
| #[fuchsia::test] |
| fn test_stale_external_waiter_cleanup() { |
| let mut state = FutexTableState::<PrivateFutexKey>::default(); |
| let key = PrivateFutexKey { |
| addr: FutexAddress::try_from(UserAddress::from( |
| (RESTRICTED_ASPACE_BASE + 0x1000) as u64, |
| )) |
| .unwrap(), |
| }; |
| |
| { |
| let token = Arc::new(()); |
| let (sender, _receiver) = oneshot::channel::<()>(); |
| state.get_waiters_or_default(key.clone()).add(FutexWaiter { |
| mask: u32::MAX, |
| notifiable: FutexNotifiable::new_external(Arc::downgrade(&token), sender), |
| }); |
| } // token is dropped here, so it becomes stale |
| |
| assert_eq!(state.waiters.len(), 1); |
| |
| // Trigger a cleanup with a placeholder event |
| let dummy_event = Arc::new(InterruptibleEvent::new()); |
| state.remove_waiter_from_queue(key, &dummy_event); |
| |
| assert_eq!(state.waiters.len(), 0, "Stale external waiter should be removed"); |
| } |
| } |