| use crate::debug::TableEntry; |
| use crate::durability::Durability; |
| use crate::hash::FxIndexMap; |
| use crate::plumbing::CycleRecoveryStrategy; |
| use crate::plumbing::InputQueryStorageOps; |
| use crate::plumbing::QueryStorageMassOps; |
| use crate::plumbing::QueryStorageOps; |
| use crate::revision::Revision; |
| use crate::runtime::StampedValue; |
| use crate::Database; |
| use crate::Query; |
| use crate::Runtime; |
| use crate::{DatabaseKeyIndex, QueryDb}; |
| use indexmap::map::Entry; |
| use parking_lot::RwLock; |
| use std::iter; |
| use tracing::debug; |
| |
| /// Input queries store the result plus a list of the other queries |
| /// that they invoked. This means we can avoid recomputing them when |
| /// none of those inputs have changed. |
| pub struct InputStorage<Q> |
| where |
| Q: Query, |
| { |
| group_index: u16, |
| slots: RwLock<FxIndexMap<Q::Key, Slot<Q::Value>>>, |
| } |
| |
| struct Slot<V> { |
| key_index: u32, |
| stamped_value: RwLock<StampedValue<V>>, |
| } |
| |
| impl<Q> std::panic::RefUnwindSafe for InputStorage<Q> |
| where |
| Q: Query, |
| Q::Key: std::panic::RefUnwindSafe, |
| Q::Value: std::panic::RefUnwindSafe, |
| { |
| } |
| |
| impl<Q> QueryStorageOps<Q> for InputStorage<Q> |
| where |
| Q: Query, |
| { |
| const CYCLE_STRATEGY: crate::plumbing::CycleRecoveryStrategy = CycleRecoveryStrategy::Panic; |
| |
| fn new(group_index: u16) -> Self { |
| InputStorage { group_index, slots: Default::default() } |
| } |
| |
| fn fmt_index( |
| &self, |
| _db: &<Q as QueryDb<'_>>::DynDb, |
| index: u32, |
| fmt: &mut std::fmt::Formatter<'_>, |
| ) -> std::fmt::Result { |
| let slot_map = self.slots.read(); |
| let key = slot_map.get_index(index as usize).unwrap().0; |
| write!(fmt, "{}({:?})", Q::QUERY_NAME, key) |
| } |
| |
| fn maybe_changed_after( |
| &self, |
| db: &<Q as QueryDb<'_>>::DynDb, |
| index: u32, |
| revision: Revision, |
| ) -> bool { |
| debug_assert!(revision < db.salsa_runtime().current_revision()); |
| let slots = &self.slots.read(); |
| let Some((_, slot)) = slots.get_index(index as usize) else { |
| return true; |
| }; |
| |
| debug!("maybe_changed_after(slot={:?}, revision={:?})", Q::default(), revision,); |
| |
| let changed_at = slot.stamped_value.read().changed_at; |
| |
| debug!("maybe_changed_after: changed_at = {:?}", changed_at); |
| |
| changed_at > revision |
| } |
| |
| fn fetch(&self, db: &<Q as QueryDb<'_>>::DynDb, key: &Q::Key) -> Q::Value { |
| db.unwind_if_cancelled(); |
| |
| let slots = &self.slots.read(); |
| let slot = slots |
| .get(key) |
| .unwrap_or_else(|| panic!("no value set for {:?}({:?})", Q::default(), key)); |
| |
| let StampedValue { value, durability, changed_at } = slot.stamped_value.read().clone(); |
| |
| db.salsa_runtime().report_query_read_and_unwind_if_cycle_resulted( |
| DatabaseKeyIndex { |
| group_index: self.group_index, |
| query_index: Q::QUERY_INDEX, |
| key_index: slot.key_index, |
| }, |
| durability, |
| changed_at, |
| ); |
| |
| value |
| } |
| |
| fn durability(&self, _db: &<Q as QueryDb<'_>>::DynDb, key: &Q::Key) -> Durability { |
| match self.slots.read().get(key) { |
| Some(slot) => slot.stamped_value.read().durability, |
| None => panic!("no value set for {:?}({:?})", Q::default(), key), |
| } |
| } |
| |
| fn entries<C>(&self, _db: &<Q as QueryDb<'_>>::DynDb) -> C |
| where |
| C: std::iter::FromIterator<TableEntry<Q::Key, Q::Value>>, |
| { |
| let slots = self.slots.read(); |
| slots |
| .iter() |
| .map(|(key, slot)| { |
| TableEntry::new(key.clone(), Some(slot.stamped_value.read().value.clone())) |
| }) |
| .collect() |
| } |
| } |
| |
| impl<Q> QueryStorageMassOps for InputStorage<Q> |
| where |
| Q: Query, |
| { |
| fn purge(&self) { |
| *self.slots.write() = Default::default(); |
| } |
| } |
| |
| impl<Q> InputQueryStorageOps<Q> for InputStorage<Q> |
| where |
| Q: Query, |
| { |
| fn set(&self, runtime: &mut Runtime, key: &Q::Key, value: Q::Value, durability: Durability) { |
| tracing::debug!("{:?}({:?}) = {:?} ({:?})", Q::default(), key, value, durability); |
| |
| // The value is changing, so we need a new revision (*). We also |
| // need to update the 'last changed' revision by invoking |
| // `guard.mark_durability_as_changed`. |
| // |
| // CAREFUL: This will block until the global revision lock can |
| // be acquired. If there are still queries executing, they may |
| // need to read from this input. Therefore, we wait to acquire |
| // the lock on `map` until we also hold the global query write |
| // lock. |
| // |
| // (*) Technically, since you can't presently access an input |
| // for a non-existent key, and you can't enumerate the set of |
| // keys, we only need a new revision if the key used to |
| // exist. But we may add such methods in the future and this |
| // case doesn't generally seem worth optimizing for. |
| runtime.with_incremented_revision(|next_revision| { |
| let mut slots = self.slots.write(); |
| |
| // Do this *after* we acquire the lock, so that we are not |
| // racing with somebody else to modify this same cell. |
| // (Otherwise, someone else might write a *newer* revision |
| // into the same cell while we block on the lock.) |
| let stamped_value = StampedValue { value, durability, changed_at: next_revision }; |
| |
| match slots.entry(key.clone()) { |
| Entry::Occupied(entry) => { |
| let mut slot_stamped_value = entry.get().stamped_value.write(); |
| let old_durability = slot_stamped_value.durability; |
| *slot_stamped_value = stamped_value; |
| Some(old_durability) |
| } |
| |
| Entry::Vacant(entry) => { |
| let key_index = entry.index() as u32; |
| entry.insert(Slot { key_index, stamped_value: RwLock::new(stamped_value) }); |
| None |
| } |
| } |
| }); |
| } |
| } |
| |
| /// Same as `InputStorage`, but optimized for queries that take no inputs. |
| pub struct UnitInputStorage<Q> |
| where |
| Q: Query<Key = ()>, |
| { |
| slot: UnitSlot<Q::Value>, |
| } |
| |
| struct UnitSlot<V> { |
| database_key_index: DatabaseKeyIndex, |
| stamped_value: RwLock<Option<StampedValue<V>>>, |
| } |
| |
| impl<Q> std::panic::RefUnwindSafe for UnitInputStorage<Q> |
| where |
| Q: Query<Key = ()>, |
| Q::Key: std::panic::RefUnwindSafe, |
| Q::Value: std::panic::RefUnwindSafe, |
| { |
| } |
| |
| impl<Q> QueryStorageOps<Q> for UnitInputStorage<Q> |
| where |
| Q: Query<Key = ()>, |
| { |
| const CYCLE_STRATEGY: crate::plumbing::CycleRecoveryStrategy = CycleRecoveryStrategy::Panic; |
| |
| fn new(group_index: u16) -> Self { |
| let database_key_index = |
| DatabaseKeyIndex { group_index, query_index: Q::QUERY_INDEX, key_index: 0 }; |
| UnitInputStorage { slot: UnitSlot { database_key_index, stamped_value: RwLock::new(None) } } |
| } |
| |
| fn fmt_index( |
| &self, |
| _db: &<Q as QueryDb<'_>>::DynDb, |
| _index: u32, |
| fmt: &mut std::fmt::Formatter<'_>, |
| ) -> std::fmt::Result { |
| write!(fmt, "{}", Q::QUERY_NAME) |
| } |
| |
| fn maybe_changed_after( |
| &self, |
| db: &<Q as QueryDb<'_>>::DynDb, |
| _index: u32, |
| revision: Revision, |
| ) -> bool { |
| debug_assert!(revision < db.salsa_runtime().current_revision()); |
| |
| debug!("maybe_changed_after(slot={:?}, revision={:?})", Q::default(), revision,); |
| |
| let Some(value) = &*self.slot.stamped_value.read() else { |
| return true; |
| }; |
| let changed_at = value.changed_at; |
| |
| debug!("maybe_changed_after: changed_at = {:?}", changed_at); |
| |
| changed_at > revision |
| } |
| |
| fn fetch(&self, db: &<Q as QueryDb<'_>>::DynDb, &(): &Q::Key) -> Q::Value { |
| db.unwind_if_cancelled(); |
| |
| let StampedValue { value, durability, changed_at } = self |
| .slot |
| .stamped_value |
| .read() |
| .clone() |
| .unwrap_or_else(|| panic!("no value set for {:?}", Q::default())); |
| |
| db.salsa_runtime().report_query_read_and_unwind_if_cycle_resulted( |
| self.slot.database_key_index, |
| durability, |
| changed_at, |
| ); |
| |
| value |
| } |
| |
| fn durability(&self, _db: &<Q as QueryDb<'_>>::DynDb, &(): &Q::Key) -> Durability { |
| match &*self.slot.stamped_value.read() { |
| Some(stamped_value) => stamped_value.durability, |
| None => panic!("no value set for {:?}", Q::default(),), |
| } |
| } |
| |
| fn entries<C>(&self, _db: &<Q as QueryDb<'_>>::DynDb) -> C |
| where |
| C: std::iter::FromIterator<TableEntry<Q::Key, Q::Value>>, |
| { |
| iter::once(TableEntry::new( |
| (), |
| self.slot.stamped_value.read().as_ref().map(|it| it.value.clone()), |
| )) |
| .collect() |
| } |
| } |
| |
| impl<Q> QueryStorageMassOps for UnitInputStorage<Q> |
| where |
| Q: Query<Key = ()>, |
| { |
| fn purge(&self) { |
| *self.slot.stamped_value.write() = Default::default(); |
| } |
| } |
| |
| impl<Q> InputQueryStorageOps<Q> for UnitInputStorage<Q> |
| where |
| Q: Query<Key = ()>, |
| { |
| fn set(&self, runtime: &mut Runtime, (): &Q::Key, value: Q::Value, durability: Durability) { |
| tracing::debug!("{:?} = {:?} ({:?})", Q::default(), value, durability); |
| |
| // The value is changing, so we need a new revision (*). We also |
| // need to update the 'last changed' revision by invoking |
| // `guard.mark_durability_as_changed`. |
| // |
| // CAREFUL: This will block until the global revision lock can |
| // be acquired. If there are still queries executing, they may |
| // need to read from this input. Therefore, we wait to acquire |
| // the lock on `map` until we also hold the global query write |
| // lock. |
| // |
| // (*) Technically, since you can't presently access an input |
| // for a non-existent key, and you can't enumerate the set of |
| // keys, we only need a new revision if the key used to |
| // exist. But we may add such methods in the future and this |
| // case doesn't generally seem worth optimizing for. |
| runtime.with_incremented_revision(|next_revision| { |
| let mut stamped_value_slot = self.slot.stamped_value.write(); |
| |
| // Do this *after* we acquire the lock, so that we are not |
| // racing with somebody else to modify this same cell. |
| // (Otherwise, someone else might write a *newer* revision |
| // into the same cell while we block on the lock.) |
| let stamped_value = StampedValue { value, durability, changed_at: next_revision }; |
| |
| match &mut *stamped_value_slot { |
| Some(slot_stamped_value) => { |
| let old_durability = slot_stamped_value.durability; |
| *slot_stamped_value = stamped_value; |
| Some(old_durability) |
| } |
| |
| stamped_value_slot @ None => { |
| *stamped_value_slot = Some(stamped_value); |
| None |
| } |
| } |
| }); |
| } |
| } |
| |
| /// Check that `Slot<Q, MP>: Send + Sync` as long as |
| /// `DB::DatabaseData: Send + Sync`, which in turn implies that |
| /// `Q::Key: Send + Sync`, `Q::Value: Send + Sync`. |
| #[allow(dead_code)] |
| fn check_send_sync<Q>() |
| where |
| Q: Query, |
| Q::Key: Send + Sync, |
| Q::Value: Send + Sync, |
| { |
| fn is_send_sync<T: Send + Sync>() {} |
| is_send_sync::<Slot<Q::Value>>(); |
| is_send_sync::<UnitSlot<Q::Value>>(); |
| } |
| |
| /// Check that `Slot<Q, MP>: 'static` as long as |
| /// `DB::DatabaseData: 'static`, which in turn implies that |
| /// `Q::Key: 'static`, `Q::Value: 'static`. |
| #[allow(dead_code)] |
| fn check_static<Q>() |
| where |
| Q: Query + 'static, |
| Q::Key: 'static, |
| Q::Value: 'static, |
| { |
| fn is_static<T: 'static>() {} |
| is_static::<Slot<Q::Value>>(); |
| is_static::<UnitSlot<Q::Value>>(); |
| } |