blob: 367b36ade41c980d35f254631ab334d47cf11655 [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use crate::private::Sealed;
use crate::stash_logger::StashInspectLogger;
use crate::storage_factory::{DefaultLoader, NoneT};
use crate::UpdateState;
use anyhow::{format_err, Context, Error};
use fidl_fuchsia_stash::{StoreAccessorProxy, Value};
use fuchsia_async::{Duration, Task, Time, Timer};
use futures::channel::mpsc::UnboundedSender;
use futures::future::OptionFuture;
use futures::lock::{Mutex, MutexGuard};
use futures::{FutureExt, StreamExt};
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::any::Any;
use std::borrow::Cow;
use std::collections::HashMap;
use std::sync::Arc;
const SETTINGS_PREFIX: &str = "settings";
/// Minimum amount of time between Flush calls to Stash, in milliseconds. The Flush call triggers
/// file I/O which is slow. If we call flush too often, we can overwhelm Stash, which eventually
/// causes the kernel to crash our service due to filling up the channel.
const MIN_FLUSH_INTERVAL: Duration = Duration::from_millis(500);
/// Stores device level settings in persistent storage.
/// User level settings should not use this.
pub struct DeviceStorage {
/// Map of [`DeviceStorageCompatible`] keys to their typed storage.
typed_storage_map: HashMap<&'static str, TypedStorage>,
typed_loader_map: HashMap<&'static str, Box<TypeErasedLoader>>,
/// If true, reads will be returned from the data in memory rather than reading from storage.
caching_enabled: bool,
/// If true, writes to the underlying storage will only occur at most every
/// MIN_WRITE_INTERVAL_MS.
debounce_writes: bool,
/// Handle used to write stash failures to inspect.
inspect_handle: Arc<Mutex<StashInspectLogger>>,
}
/// A wrapper for managing all communication and caching for one particular type of data being
/// stored. The actual types are erased.
struct TypedStorage {
/// Sender to communicate with task loop that handles flushes.
flush_sender: UnboundedSender<()>,
/// Cached storage managed through interior mutability.
cached_storage: Mutex<CachedStorage>,
}
/// `CachedStorage` abstracts over a cached value that's read from and written
/// to some backing store.
struct CachedStorage {
/// Cache for the most recently read or written value.
current_data: Option<Box<TypeErasedData>>,
/// Stash connection for this particular type's stash storage.
stash_proxy: StoreAccessorProxy,
}
/// Structs that can be stored in device storage should derive the Serialize, Deserialize, and
/// Clone traits, as well as provide constants.
/// KEY should be unique the struct, usually the name of the struct itself.
/// DEFAULT_VALUE will be the value returned when nothing has yet been stored.
///
/// Anything that implements this should not introduce breaking changes with the same key.
/// Clients that want to make a breaking change should create a new structure with a new key and
/// implement conversion/cleanup logic. Adding optional fields to a struct is not breaking, but
/// removing fields, renaming fields, or adding non-optional fields are.
///
/// The [`Storage`] trait has [`Send`] and [`Sync`] requirements, so they have to be carried here
/// as well. This was not necessary before because rust could determine the additional trait
/// requirements at compile-time just for when the [`Storage`] trait was used. We don't get that
/// benefit anymore once we hide the type.
///
/// [`Storage`]: super::setting_handler::persist::Storage
pub trait DeviceStorageCompatible:
Serialize + DeserializeOwned + Clone + PartialEq + Any + Send + Sync
{
type Loader: DefaultDispatcher<Self>;
fn try_deserialize_from(value: &str) -> Result<Self, Error> {
Self::extract(value)
}
fn extract(value: &str) -> Result<Self, Error> {
serde_json::from_str(value).map_err(|e| format_err!("could not deserialize: {e:?}"))
}
fn serialize_to(&self) -> String {
serde_json::to_string(self).expect("value should serialize")
}
const KEY: &'static str;
}
/// This trait represents types that can be converted into a storable type. It's also important
/// that the type it is transformed into can also be converted back into this type. This reverse
/// conversion is used to populate the fields of the original type with the stored values plus
/// defaulting the other fields that, e.g. might later be populated from hardware APIs.
///
/// # Example
/// ```
/// // Struct used in controllers.
/// struct SomeSettingInfo {
/// storable_field: u8,
/// hardware_backed_field: String,
/// }
///
/// // Struct only used for storage.
/// #[derive(Serialize, Deserialize, PartialEq, Clone)]
/// struct StorableSomeSettingInfo {
/// storable_field: u8,
/// }
///
/// // Impl compatible for the storable type.
/// impl DeviceStorageCompatible for StorableSomeSettingInfo {
/// const KEY: &'static str = "some_setting_info";
///
/// fn default_value() -> Self {
/// Self { storable_field: 1, }
/// }
/// }
///
/// // Impl convertible for controller type.
/// impl DeviceStorageConvertible for SomeSettingInfo {
/// type Storable = StorableSomeSettingInfo;
/// fn get_storable(&self) -> Cow<'_, Self::Storable> {
/// Cow::Owned(Self {
/// storable_field: self.storable_field,
/// hardware_backed_field: String::new()
/// })
/// }
/// }
///
/// // This impl helps us convert from the storable version to the
/// // controller version of the struct. Hardware fields should be backed
/// // by default or usable values.
/// impl Into<SomeSettingInfo> for StorableSomeSettingInfo {
/// fn into(self) -> SomeSettingInfo {
/// SomeSettingInfo {
/// storable_field: self.storable_field,
/// hardware_backed_field: String::new(),
/// }
/// }
/// }
///
/// ```
pub trait DeviceStorageConvertible: Sized {
/// The type that will be used for storing the data.
type Storable: DeviceStorageCompatible + Into<Self>;
/// Convert `self` into its storable version.
// The reason we don't take ownership here is that the setting handler uses the original value
// to send a message on the message hub for when the change is written. Serializing also only
// borrows the data and doesn't need to own it. When `Storable` is `Self`, we only need to keep
// the borrow on self, but when the types differ, then we need to own the newly constructed
// type.
fn get_storable(&self) -> Cow<'_, Self::Storable>;
}
// Any type that is storage compatible is also storage convertible (it can convert to itself!).
impl<T> DeviceStorageConvertible for T
where
T: DeviceStorageCompatible,
{
type Storable = T;
fn get_storable(&self) -> Cow<'_, Self::Storable> {
Cow::Borrowed(self)
}
}
type MappingFn = Box<dyn FnOnce(&(dyn Any + Send + Sync)) -> String + Send>;
type TypeErasedData = dyn Any + Send + Sync + 'static;
type TypeErasedLoader = dyn Any + Send + Sync + 'static;
impl DeviceStorage {
/// Construct a device storage from the iteratable item, which will produce the keys for
/// storage, and from a generator that will produce a stash proxy given a particular key.
pub fn with_stash_proxy<I, G>(
iter: I,
stash_generator: G,
inspect_handle: Arc<Mutex<StashInspectLogger>>,
) -> Self
where
I: IntoIterator<Item = (&'static str, Option<Box<TypeErasedLoader>>)>,
G: Fn() -> StoreAccessorProxy,
{
let mut typed_loader_map = HashMap::new();
let typed_storage_map = iter
.into_iter()
.map({
let inspect_handle = Arc::clone(&inspect_handle);
let typed_loader_map = &mut typed_loader_map;
move |(key, loader)| {
if let Some(loader) = loader {
let _ = typed_loader_map.insert(key, loader);
}
// Generate a separate stash proxy for each key.
let (flush_sender, flush_receiver) = futures::channel::mpsc::unbounded::<()>();
let stash_proxy = stash_generator();
let storage = TypedStorage {
flush_sender,
cached_storage: Mutex::new(CachedStorage {
current_data: None,
stash_proxy: stash_proxy.clone(),
}),
};
let inspect_handle = Arc::clone(&inspect_handle);
// Each key has an independent flush queue.
Task::spawn(async move {
let mut next_allowed_flush = Time::now();
let mut next_flush_timer = OptionFuture::from(None).fuse();
let flush_requested = flush_receiver.fuse();
futures::pin_mut!(flush_requested);
loop {
futures::select! {
() = flush_requested.select_next_some() => {
next_flush_timer = OptionFuture::from(Some(Timer::new(
next_allowed_flush
)))
.fuse();
},
o = next_flush_timer => {
if let Some(()) = o {
DeviceStorage::stash_flush(
&stash_proxy,
Arc::clone(&inspect_handle),
key.to_string()).await;
next_allowed_flush = Time::now() + MIN_FLUSH_INTERVAL;
}
}
complete => break,
}
}
})
.detach();
(key, storage)
}
})
.collect();
DeviceStorage {
caching_enabled: true,
debounce_writes: true,
typed_storage_map,
typed_loader_map,
inspect_handle,
}
}
/// Test-only
pub fn set_caching_enabled(&mut self, enabled: bool) {
self.caching_enabled = enabled;
}
/// Test-only
pub fn set_debounce_writes(&mut self, debounce: bool) {
self.debounce_writes = debounce;
}
/// Triggers a flush on the given stash proxy.
async fn stash_flush(
stash_proxy: &StoreAccessorProxy,
inspect_handle: Arc<Mutex<StashInspectLogger>>,
setting_key: String,
) {
let flush_result = stash_proxy.flush().await;
match flush_result {
Ok(Err(err)) => {
Self::handle_flush_failure(inspect_handle, setting_key, format!("{:?}", err)).await;
}
Err(err) => {
Self::handle_flush_failure(inspect_handle, setting_key, format!("{:?}", err)).await;
}
_ => {}
}
}
async fn handle_flush_failure(
inspect_handle: Arc<Mutex<StashInspectLogger>>,
setting_key: String,
err: String,
) {
tracing::error!("Failed to flush to stash: {:?}", err);
// Record the write failure to inspect.
inspect_handle.lock().await.record_flush_failure(setting_key);
}
async fn inner_write(
&self,
key: &'static str,
new_value: String,
data_as_any: Box<TypeErasedData>,
mapping_fn: MappingFn,
) -> Result<UpdateState, Error> {
let typed_storage = self
.typed_storage_map
.get(key)
.ok_or_else(|| format_err!("Invalid data keyed by {}", key))?;
let mut cached_storage = typed_storage.cached_storage.lock().await;
let mut maybe_init;
let cached_value = {
maybe_init = cached_storage
.current_data
.as_deref()
// Get the data as a shared reference so we don't move out of the option.
.map(mapping_fn);
if maybe_init.is_none() {
let stash_key = prefixed(key);
if let Some(stash_value) =
cached_storage.stash_proxy.get_value(&stash_key).await.unwrap_or_else(|_| {
panic!("failed to get value from stash for {stash_key:?}")
})
{
if let Value::Stringval(string_value) = &*stash_value {
maybe_init = Some(string_value.clone());
} else {
panic!("Unexpected type for key found in stash");
}
}
}
maybe_init.as_ref()
};
Ok(if cached_value != Some(&new_value) {
let serialized = Value::Stringval(new_value);
let key = prefixed(key);
cached_storage.stash_proxy.set_value(&key, serialized)?;
if !self.debounce_writes {
// Not debouncing writes for testing, just flush immediately.
DeviceStorage::stash_flush(
&cached_storage.stash_proxy,
Arc::clone(&self.inspect_handle),
key,
)
.await;
} else {
typed_storage.flush_sender.unbounded_send(()).with_context(|| {
format!("flush_sender failed to send flush message, associated key is {key}")
})?;
}
cached_storage.current_data = Some(data_as_any);
UpdateState::Updated
} else {
UpdateState::Unchanged
})
}
/// Write `new_value` to storage. The write will be persisted to disk at a set interval.
pub async fn write<T>(&self, new_value: &T) -> Result<UpdateState, Error>
where
T: DeviceStorageCompatible,
{
self.inner_write(
T::KEY,
new_value.serialize_to(),
Box::new(new_value.clone()) as Box<TypeErasedData>,
Box::new(|any: &(dyn Any + Send + Sync)| {
// Attempt to downcast the `dyn Any` to its original type. If `T` was not its
// original type, then we want to panic because there's a compile-time issue
// with overlapping keys.
let value = any.downcast_ref::<T>().expect(
"Type mismatch even though keys match. Two different\
types have the same key value",
);
value.serialize_to()
}),
)
.await
}
/// Test-only method to write directly to stash without touching the cache. This is used for
/// setting up data as if it existed on disk before the connection to stash was made.
pub async fn write_str(&self, key: &'static str, value: String) -> Result<(), Error> {
let typed_storage =
self.typed_storage_map.get(key).expect("Did not request an initialized key");
let cached_storage = typed_storage.cached_storage.lock().await;
cached_storage.stash_proxy.set_value(&prefixed(key), Value::Stringval(value))?;
typed_storage.flush_sender.unbounded_send(()).unwrap();
Ok(())
}
async fn get_inner(
&self,
key: &'static str,
) -> (MutexGuard<'_, CachedStorage>, Option<Option<String>>) {
let typed_storage = self
.typed_storage_map
.get(key)
// TODO(https://fxbug.dev/42064613) Replace this with an error result.
.unwrap_or_else(|| panic!("Invalid data keyed by {key}"));
let cached_storage = typed_storage.cached_storage.lock().await;
let new = if cached_storage.current_data.is_none() || !self.caching_enabled {
let stash_key = prefixed(key);
if let Some(stash_value) = cached_storage
.stash_proxy
.get_value(&stash_key)
.await
.unwrap_or_else(|_| panic!("failed to get value from stash for {stash_key:?}"))
{
if let Value::Stringval(string_value) = *stash_value {
Some(Some(string_value))
} else {
panic!("Unexpected type for key found in stash");
}
} else {
Some(None)
}
} else {
None
};
(cached_storage, new)
}
/// Gets the latest value cached locally, or loads the value from storage.
/// Doesn't support multiple concurrent callers of the same struct.
pub async fn get<T>(&self) -> T
where
T: DeviceStorageCompatible,
{
let (mut cached_storage, update) = self.get_inner(T::KEY).await;
if let Some(update) = update {
cached_storage.current_data = Some(update.and_then(|string_value| {
T::try_deserialize_from(&string_value).map(|val| Box::new(val) as Box<TypeErasedData>).map_err(|e| tracing::error!(
"Using default. Failed to deserialize type {}: {e:?}\nSource data: {string_value:?}",
T::KEY
)).ok()
}).unwrap_or_else(|| Box::new(<T::Loader as DefaultDispatcher<T>>::get_default(self)) as Box<TypeErasedData>));
};
cached_storage
.current_data
.as_ref()
.expect("should always have a value")
.downcast_ref::<T>()
.expect(
"Type mismatch even though keys match. Two different types have the same key\
value",
)
.clone()
}
}
pub trait DefaultDispatcher<T>: Sealed
where
T: DeviceStorageCompatible,
{
fn get_default(_: &DeviceStorage) -> T;
}
impl<T> DefaultDispatcher<T> for NoneT
where
T: DeviceStorageCompatible<Loader = Self> + Default,
{
fn get_default(_: &DeviceStorage) -> T {
T::default()
}
}
impl<T, L> DefaultDispatcher<T> for L
where
T: DeviceStorageCompatible<Loader = L>,
L: DefaultLoader<Result = T> + 'static,
{
fn get_default(storage: &DeviceStorage) -> T {
match storage.typed_loader_map.get(T::KEY) {
Some(loader) => match loader.downcast_ref::<T::Loader>() {
Some(loader) => loader.default_value(),
None => {
panic!("Mismatch key and loader for key {}", T::KEY);
}
},
None => panic!("Missing loader for {}", T::KEY),
}
}
}
fn prefixed(input_string: &str) -> String {
format!("{SETTINGS_PREFIX}_{input_string}")
}
#[cfg(test)]
mod tests {
use super::*;
use assert_matches::assert_matches;
use diagnostics_assertions::assert_data_tree;
use fidl_fuchsia_stash::{
FlushError, StoreAccessorMarker, StoreAccessorRequest, StoreAccessorRequestStream,
};
use fuchsia_async as fasync;
use fuchsia_async::TestExecutor;
use fuchsia_inspect::component;
use futures::prelude::*;
use serde::{Deserialize, Serialize};
use std::task::Poll;
const VALUE0: i32 = 3;
const VALUE1: i32 = 33;
const VALUE2: i32 = 128;
#[derive(PartialEq, Clone, Serialize, Deserialize, Debug)]
struct TestStruct {
value: i32,
}
const STORE_KEY: &str = "settings_testkey";
impl DeviceStorageCompatible for TestStruct {
type Loader = NoneT;
const KEY: &'static str = "testkey";
}
impl Default for TestStruct {
fn default() -> Self {
TestStruct { value: VALUE0 }
}
}
/// Advances `future` until `executor` finishes. Panics if the end result was a stall.
#[track_caller]
fn advance_executor<F>(executor: &mut TestExecutor, future: &mut F)
where
F: Future + Unpin,
{
assert!(executor.run_until_stalled(future).is_ready(), "TestExecutor stalled!");
}
/// Verifies that a SetValue call was sent to stash with the given value.
async fn verify_stash_set(stash_stream: &mut StoreAccessorRequestStream, expected_value: i32) {
match stash_stream.next().await.unwrap() {
Ok(StoreAccessorRequest::SetValue { key, val, control_handle: _ }) => {
assert_eq!(key, STORE_KEY);
if let Value::Stringval(string_value) = val {
let input_value = TestStruct::try_deserialize_from(&string_value)
.expect("deserialization should succeed");
assert_eq!(input_value.value, expected_value);
} else {
panic!("Unexpected type for key found in stash");
}
}
request => panic!("Unexpected request: {request:?}"),
}
}
/// Verifies that a SetValue call was sent to stash with the given value.
async fn validate_stash_get_and_respond(
stash_stream: &mut StoreAccessorRequestStream,
response: String,
) {
match stash_stream.next().await.unwrap() {
Ok(StoreAccessorRequest::GetValue { key, responder }) => {
assert_eq!(key, STORE_KEY);
responder.send(Some(Value::Stringval(response))).expect("unable to send response");
}
request => panic!("Unexpected request: {request:?}"),
}
}
/// Verifies that a Flush call was sent to stash.
async fn verify_stash_flush(stash_stream: &mut StoreAccessorRequestStream) {
match stash_stream.next().await.unwrap() {
Ok(StoreAccessorRequest::Flush { responder }) => {
let _ = responder.send(Ok(()));
} // expected
request => panic!("Unexpected request: {request:?}"),
}
}
/// Verifies that a Flush call was sent to stash, and send back a failure.
async fn fail_stash_flush(stash_stream: &mut StoreAccessorRequestStream) {
match stash_stream.next().await.unwrap() {
Ok(StoreAccessorRequest::Flush { responder }) => {
let _ = responder.send(Err(FlushError::CommitFailed));
} // expected
request => panic!("Unexpected request: {request:?}"),
}
}
#[fuchsia::test(allow_stalls = false)]
async fn test_get() {
let (stash_proxy, mut stash_stream) =
fidl::endpoints::create_proxy_and_stream::<StoreAccessorMarker>().unwrap();
fasync::Task::spawn(async move {
let value_to_get = TestStruct { value: VALUE1 };
#[allow(clippy::single_match)]
while let Some(req) = stash_stream.try_next().await.unwrap() {
#[allow(unreachable_patterns)]
match req {
StoreAccessorRequest::GetValue { key, responder } => {
assert_eq!(key, STORE_KEY);
let response = Value::Stringval(value_to_get.serialize_to());
responder.send(Some(response)).unwrap();
}
_ => {}
}
}
})
.detach();
let storage = DeviceStorage::with_stash_proxy(
vec![(TestStruct::KEY, None)],
move || stash_proxy.clone(),
Arc::new(Mutex::new(StashInspectLogger::new(component::inspector().root()))),
);
let result = storage.get::<TestStruct>().await;
assert_eq!(result.value, VALUE1);
}
#[fuchsia::test(allow_stalls = false)]
async fn test_get_default() {
let (stash_proxy, mut stash_stream) =
fidl::endpoints::create_proxy_and_stream::<StoreAccessorMarker>().unwrap();
fasync::Task::spawn(async move {
#[allow(clippy::single_match)]
while let Some(req) = stash_stream.try_next().await.unwrap() {
#[allow(unreachable_patterns)]
match req {
StoreAccessorRequest::GetValue { key: _, responder } => {
responder.send(None).unwrap();
}
_ => {}
}
}
})
.detach();
let storage = DeviceStorage::with_stash_proxy(
vec![(TestStruct::KEY, None)],
move || stash_proxy.clone(),
Arc::new(Mutex::new(StashInspectLogger::new(component::inspector().root()))),
);
let result = storage.get::<TestStruct>().await;
assert_eq!(result.value, VALUE0);
}
// For an invalid stash value, the get() method should return the default value.
#[fuchsia::test(allow_stalls = false)]
async fn test_invalid_stash() {
let (stash_proxy, mut stash_stream) =
fidl::endpoints::create_proxy_and_stream::<StoreAccessorMarker>().unwrap();
fasync::Task::spawn(async move {
#[allow(clippy::single_match)]
while let Some(req) = stash_stream.try_next().await.unwrap() {
#[allow(unreachable_patterns)]
match req {
StoreAccessorRequest::GetValue { key: _, responder } => {
let response = Value::Stringval("invalid value".to_string());
responder.send(Some(response)).unwrap();
}
_ => {}
}
}
})
.detach();
let storage = DeviceStorage::with_stash_proxy(
vec![(TestStruct::KEY, None)],
move || stash_proxy.clone(),
Arc::new(Mutex::new(StashInspectLogger::new(component::inspector().root()))),
);
let result = storage.get::<TestStruct>().await;
assert_eq!(result.value, VALUE0);
}
// Verifies that stash flush failures are written to inspect.
#[fuchsia::test]
fn test_flush_fail_writes_to_inspect() {
let written_value = VALUE2;
let mut executor = TestExecutor::new_with_fake_time();
let (stash_proxy, mut stash_stream) =
fidl::endpoints::create_proxy_and_stream::<StoreAccessorMarker>().unwrap();
let inspector = component::inspector();
let logger_handle = Arc::new(Mutex::new(StashInspectLogger::new(inspector.root())));
let storage = DeviceStorage::with_stash_proxy(
vec![(TestStruct::KEY, None)],
move || stash_proxy.clone(),
logger_handle,
);
// Write to device storage.
let value_to_write = TestStruct { value: written_value };
let write_future = storage.write(&value_to_write);
futures::pin_mut!(write_future);
// Initial cache check is done if no read was ever performed.
assert_matches!(executor.run_until_stalled(&mut write_future), Poll::Pending);
{
let respond_future = validate_stash_get_and_respond(
&mut stash_stream,
serde_json::to_string(&TestStruct::default()).unwrap(),
);
futures::pin_mut!(respond_future);
advance_executor(&mut executor, &mut respond_future);
}
// Write request finishes immediately.
assert_matches!(executor.run_until_stalled(&mut write_future), Poll::Ready(Ok(_)));
// Set request is received immediately on write.
{
let set_value_future = verify_stash_set(&mut stash_stream, written_value);
futures::pin_mut!(set_value_future);
advance_executor(&mut executor, &mut set_value_future);
}
// Start listening for the flush request.
let flush_future = fail_stash_flush(&mut stash_stream);
futures::pin_mut!(flush_future);
// Flush is received without a wait. Due to the way time works with executors, if there was
// a delay, the test would stall since time never advances.
advance_executor(&mut executor, &mut flush_future);
// Queue up a second write to guarantee that CachedStorage has written the failure to
// inspect.
{
let value_to_write = TestStruct { value: VALUE1 };
let write_future = storage.write(&value_to_write);
futures::pin_mut!(write_future);
assert_matches!(
executor.run_until_stalled(&mut write_future),
Poll::Ready(Result::Ok(_))
);
}
// Run all background tasks until stalled.
let _ = executor.run_until_stalled(&mut future::pending::<()>());
assert_data_tree!(inspector, root: {
stash_failures: {
testkey: {
count: 1u64,
}
}
});
}
// Test that an initial write to DeviceStorage causes a SetValue and Flush to Stash
// without any wait.
#[fuchsia::test]
fn test_first_write_flushes_immediately() {
let written_value = VALUE2;
let mut executor = TestExecutor::new_with_fake_time();
let (stash_proxy, mut stash_stream) =
fidl::endpoints::create_proxy_and_stream::<StoreAccessorMarker>().unwrap();
let storage = DeviceStorage::with_stash_proxy(
vec![(TestStruct::KEY, None)],
move || stash_proxy.clone(),
Arc::new(Mutex::new(StashInspectLogger::new(component::inspector().root()))),
);
// Write to device storage.
let value_to_write = TestStruct { value: written_value };
let write_future = storage.write(&value_to_write);
futures::pin_mut!(write_future);
// Initial cache check is done if no read was ever performed.
assert_matches!(executor.run_until_stalled(&mut write_future), Poll::Pending);
{
let respond_future = validate_stash_get_and_respond(
&mut stash_stream,
serde_json::to_string(&TestStruct::default()).unwrap(),
);
futures::pin_mut!(respond_future);
advance_executor(&mut executor, &mut respond_future);
}
// Write request finishes immediately.
assert_matches!(executor.run_until_stalled(&mut write_future), Poll::Ready(Ok(_)));
// Set request is received immediately on write.
{
let set_value_future = verify_stash_set(&mut stash_stream, written_value);
futures::pin_mut!(set_value_future);
advance_executor(&mut executor, &mut set_value_future);
}
// Start listening for the flush request.
let flush_future = verify_stash_flush(&mut stash_stream);
futures::pin_mut!(flush_future);
// Flush is received without a wait. Due to the way time works with executors, if there was
// a delay, the test would stall since time never advances.
advance_executor(&mut executor, &mut flush_future);
}
#[derive(Default, Copy, Clone, PartialEq, Serialize, Deserialize)]
struct WrongStruct;
impl DeviceStorageCompatible for WrongStruct {
type Loader = NoneT;
const KEY: &'static str = "WRONG_STRUCT";
}
// Test that an initial write to DeviceStorage causes a SetValue and Flush to Stash
// without any wait.
#[fuchsia::test(allow_stalls = false)]
async fn test_write_with_mismatch_type_returns_error() {
let (stash_proxy, mut stream) =
fidl::endpoints::create_proxy_and_stream::<StoreAccessorMarker>().unwrap();
let spawned = fasync::Task::spawn(async move {
while let Some(request) = stream.next().await {
match request {
Ok(StoreAccessorRequest::GetValue { key, responder }) => {
assert_eq!(key, STORE_KEY);
let _ = responder.send(Some(Value::Stringval(
serde_json::to_string(&TestStruct { value: VALUE2 }).unwrap(),
)));
}
Ok(StoreAccessorRequest::SetValue { key, .. }) => {
assert_eq!(key, STORE_KEY);
}
_ => panic!("Unexpected request {request:?}"),
}
}
});
let storage = DeviceStorage::with_stash_proxy(
vec![(TestStruct::KEY, None)],
move || stash_proxy.clone(),
Arc::new(Mutex::new(StashInspectLogger::new(component::inspector().root()))),
);
// Write successfully to storage once.
let result = storage.write(&TestStruct { value: VALUE2 }).await;
assert!(result.is_ok());
// Write to device storage again with a different type to validate that the type can't
// be changed.
let result = storage.write(&WrongStruct).await;
assert_matches!(result, Err(e) if e.to_string() == "Invalid data keyed by WRONG_STRUCT");
drop(storage);
spawned.await;
}
// Test that multiple writes to DeviceStorage will cause a SetValue each time, but will only
// Flush to Stash at an interval.
#[fuchsia::test]
fn test_multiple_write_debounce() {
// Custom executor for this test so that we can advance the clock arbitrarily and verify the
// state of the executor at any given point.
let mut executor = TestExecutor::new_with_fake_time();
let start_time = Time::from_nanos(0);
executor.set_fake_time(start_time);
let (stash_proxy, mut stash_stream) =
fidl::endpoints::create_proxy_and_stream::<StoreAccessorMarker>().unwrap();
let storage = DeviceStorage::with_stash_proxy(
vec![(TestStruct::KEY, None)],
move || stash_proxy.clone(),
Arc::new(Mutex::new(StashInspectLogger::new(component::inspector().root()))),
);
let first_value = VALUE1;
let second_value = VALUE2;
// First write finishes immediately.
{
let value_to_write = TestStruct { value: first_value };
let write_future = storage.write(&value_to_write);
futures::pin_mut!(write_future);
// Initial cache check is done if no read was ever performed.
assert_matches!(executor.run_until_stalled(&mut write_future), Poll::Pending);
{
let respond_future = validate_stash_get_and_respond(
&mut stash_stream,
serde_json::to_string(&TestStruct::default()).unwrap(),
);
futures::pin_mut!(respond_future);
advance_executor(&mut executor, &mut respond_future);
}
assert_matches!(
executor.run_until_stalled(&mut write_future),
Poll::Ready(Result::Ok(_))
);
}
// First set request is received immediately on write.
{
let set_value_future = verify_stash_set(&mut stash_stream, first_value);
futures::pin_mut!(set_value_future);
advance_executor(&mut executor, &mut set_value_future);
}
// First flush request is received.
{
let flush_future = verify_stash_flush(&mut stash_stream);
futures::pin_mut!(flush_future);
advance_executor(&mut executor, &mut flush_future);
}
// Now we repeat the process with a second write request, which will need to advance the
// fake time due to the timer.
// Second write finishes immediately.
{
let value_to_write = TestStruct { value: second_value };
let write_future = storage.write(&value_to_write);
futures::pin_mut!(write_future);
assert_matches!(
executor.run_until_stalled(&mut write_future),
Poll::Ready(Result::Ok(_))
);
}
// Second set request finishes immediately on write.
{
let set_value_future = verify_stash_set(&mut stash_stream, second_value);
futures::pin_mut!(set_value_future);
advance_executor(&mut executor, &mut set_value_future);
}
// Start waiting for flush request.
let flush_future = verify_stash_flush(&mut stash_stream);
futures::pin_mut!(flush_future);
// TextExecutor stalls due to waiting on timer to finish.
assert_matches!(executor.run_until_stalled(&mut flush_future), Poll::Pending);
// Advance time to 1ms before the flush triggers.
executor.set_fake_time(start_time + (MIN_FLUSH_INTERVAL - Duration::from_millis(1)));
// TextExecutor is still waiting on the time to finish.
assert_matches!(executor.run_until_stalled(&mut flush_future), Poll::Pending);
// Advance time so that the flush will trigger.
executor.set_fake_time(start_time + MIN_FLUSH_INTERVAL);
// Stash receives a flush request after one timer cycle and the future terminates.
advance_executor(&mut executor, &mut flush_future);
}
// This mod includes structs to only be used by
// test_device_compatible_migration tests.
mod test_device_compatible_migration {
use super::*;
use serde::{Deserialize, Serialize};
pub(crate) const DEFAULT_V1_VALUE: i32 = 1;
pub(crate) const DEFAULT_CURRENT_VALUE: i32 = 2;
pub(crate) const DEFAULT_CURRENT_VALUE_2: i32 = 3;
#[derive(PartialEq, Clone, Serialize, Deserialize, Debug)]
pub(crate) struct V1 {
pub value: i32,
}
impl DeviceStorageCompatible for V1 {
type Loader = NoneT;
const KEY: &'static str = "testkey";
}
impl Default for V1 {
fn default() -> Self {
Self { value: DEFAULT_V1_VALUE }
}
}
#[derive(PartialEq, Clone, Serialize, Deserialize, Debug)]
pub(crate) struct Current {
pub value: i32,
pub value_2: i32,
}
impl From<V1> for Current {
fn from(v1: V1) -> Self {
Current { value: v1.value, value_2: DEFAULT_CURRENT_VALUE_2 }
}
}
impl DeviceStorageCompatible for Current {
type Loader = NoneT;
const KEY: &'static str = "testkey2";
fn try_deserialize_from(value: &str) -> Result<Self, Error> {
Self::extract(value).or_else(|_| V1::extract(value).map(Self::from))
}
}
impl Default for Current {
fn default() -> Self {
Self { value: DEFAULT_CURRENT_VALUE, value_2: DEFAULT_CURRENT_VALUE_2 }
}
}
}
#[fuchsia::test]
fn test_device_compatible_custom_migration() {
// Create an initial struct based on the first version.
let initial = test_device_compatible_migration::V1::default();
// Serialize.
let initial_serialized = initial.serialize_to();
// Deserialize using the second version.
let current =
test_device_compatible_migration::Current::try_deserialize_from(&initial_serialized)
.expect("deserialization should succeed");
// Assert values carried over from first version and defaults are used for rest.
assert_eq!(current.value, test_device_compatible_migration::DEFAULT_V1_VALUE);
assert_eq!(current.value_2, test_device_compatible_migration::DEFAULT_CURRENT_VALUE_2);
}
#[fuchsia::test(allow_stalls = false)]
async fn test_corrupt_get_returns_default() {
let (stash_proxy, mut stash_stream) =
fidl::endpoints::create_proxy_and_stream::<StoreAccessorMarker>().unwrap();
fasync::Task::spawn(async move {
#[allow(clippy::single_match)]
while let Some(req) = stash_stream.try_next().await.unwrap() {
#[allow(unreachable_patterns)]
match req {
StoreAccessorRequest::GetValue { key, responder } => {
assert_eq!(
key,
format!("settings_{}", test_device_compatible_migration::Current::KEY)
);
let response = Value::Stringval("bad json".to_string());
responder.send(Some(response)).unwrap();
}
_ => {}
}
}
})
.detach();
let storage = DeviceStorage::with_stash_proxy(
vec![(test_device_compatible_migration::Current::KEY, None)],
move || stash_proxy.clone(),
Arc::new(Mutex::new(StashInspectLogger::new(component::inspector().root()))),
);
let current = storage.get::<test_device_compatible_migration::Current>().await;
assert_eq!(current.value, test_device_compatible_migration::DEFAULT_CURRENT_VALUE);
assert_eq!(current.value_2, test_device_compatible_migration::DEFAULT_CURRENT_VALUE_2);
}
}