blob: 938cbb8fc182bc80a291710f536683689288f5a3 [file] [log] [blame]
//! Repository implementation backed by memory
use futures_io::AsyncRead;
use futures_util::future::{BoxFuture, FutureExt};
use futures_util::io::{AsyncReadExt, Cursor};
use std::collections::HashMap;
use std::marker::PhantomData;
use crate::error::Error;
use crate::interchange::DataInterchange;
use crate::metadata::{MetadataPath, MetadataVersion, TargetPath};
use crate::repository::{RepositoryProvider, RepositoryStorage};
use crate::Result;
/// An ephemeral repository contained solely in memory.
#[derive(Debug, Default)]
pub struct EphemeralRepository<D> {
metadata: HashMap<(MetadataPath, MetadataVersion), Box<[u8]>>,
targets: HashMap<TargetPath, Box<[u8]>>,
_interchange: PhantomData<D>,
}
impl<D> EphemeralRepository<D>
where
D: DataInterchange,
{
/// Create a new ephemeral repository.
pub fn new() -> Self {
Self {
metadata: HashMap::new(),
targets: HashMap::new(),
_interchange: PhantomData,
}
}
/// Returns a [EphemeralBatchUpdate] for manipulating this repository. This allows callers to
/// stage a number of mutations, and optionally atomically write them all at once.
pub fn batch_update(&mut self) -> EphemeralBatchUpdate<'_, D> {
EphemeralBatchUpdate {
parent_repo: self,
staging_repo: EphemeralRepository::new(),
}
}
#[cfg(test)]
pub(crate) fn metadata(&self) -> &HashMap<(MetadataPath, MetadataVersion), Box<[u8]>> {
&self.metadata
}
}
impl<D> RepositoryProvider<D> for EphemeralRepository<D>
where
D: DataInterchange + Sync,
{
fn fetch_metadata<'a>(
&'a self,
meta_path: &MetadataPath,
version: MetadataVersion,
) -> BoxFuture<'a, Result<Box<dyn AsyncRead + Send + Unpin + 'a>>> {
let bytes = match self.metadata.get(&(meta_path.clone(), version)) {
Some(bytes) => Ok(bytes),
None => Err(Error::NotFound),
};
bytes_to_reader(bytes).boxed()
}
fn fetch_target<'a>(
&'a self,
target_path: &TargetPath,
) -> BoxFuture<'a, Result<Box<dyn AsyncRead + Send + Unpin + 'a>>> {
let bytes = match self.targets.get(target_path) {
Some(bytes) => Ok(bytes),
None => Err(Error::NotFound),
};
bytes_to_reader(bytes).boxed()
}
}
impl<D> RepositoryStorage<D> for EphemeralRepository<D>
where
D: DataInterchange + Sync,
{
fn store_metadata<'a>(
&'a mut self,
meta_path: &MetadataPath,
version: MetadataVersion,
metadata: &'a mut (dyn AsyncRead + Send + Unpin + 'a),
) -> BoxFuture<'a, Result<()>> {
let meta_path = meta_path.clone();
let self_metadata = &mut self.metadata;
async move {
let mut buf = Vec::new();
metadata.read_to_end(&mut buf).await?;
buf.shrink_to_fit();
self_metadata.insert((meta_path, version), buf.into_boxed_slice());
Ok(())
}
.boxed()
}
fn store_target<'a>(
&'a mut self,
target_path: &TargetPath,
read: &'a mut (dyn AsyncRead + Send + Unpin + 'a),
) -> BoxFuture<'a, Result<()>> {
let target_path = target_path.clone();
let self_targets = &mut self.targets;
async move {
let mut buf = Vec::new();
read.read_to_end(&mut buf).await?;
buf.shrink_to_fit();
self_targets.insert(target_path.clone(), buf.into_boxed_slice());
Ok(())
}
.boxed()
}
}
/// [EphemeralBatchUpdate] is a special repository that is designed to write the metadata and
/// targets to an [EphemeralRepository] in a single batch.
///
/// Note: `EphemeralBatchUpdate::commit()` must be called in order to write the metadata and
/// targets to the [EphemeralRepository]. Otherwise any queued changes will be lost on drop.
#[derive(Debug)]
pub struct EphemeralBatchUpdate<'a, D> {
parent_repo: &'a mut EphemeralRepository<D>,
staging_repo: EphemeralRepository<D>,
}
impl<'a, D> EphemeralBatchUpdate<'a, D>
where
D: DataInterchange + Sync,
{
/// Write all the metadata and targets in the [EphemeralBatchUpdate] to the source
/// [EphemeralRepository] in a single batch operation.
pub fn commit(self) {
self.parent_repo
.metadata
.extend(self.staging_repo.metadata.into_iter());
self.parent_repo
.targets
.extend(self.staging_repo.targets.into_iter());
}
}
impl<D> RepositoryProvider<D> for EphemeralBatchUpdate<'_, D>
where
D: DataInterchange + Sync,
{
fn fetch_metadata<'a>(
&'a self,
meta_path: &MetadataPath,
version: MetadataVersion,
) -> BoxFuture<'a, Result<Box<dyn AsyncRead + Send + Unpin + 'a>>> {
let key = (meta_path.clone(), version);
let bytes = if let Some(bytes) = self.staging_repo.metadata.get(&key) {
Ok(bytes)
} else {
self.parent_repo.metadata.get(&key).ok_or(Error::NotFound)
};
bytes_to_reader(bytes).boxed()
}
fn fetch_target<'a>(
&'a self,
target_path: &TargetPath,
) -> BoxFuture<'a, Result<Box<dyn AsyncRead + Send + Unpin + 'a>>> {
let bytes = if let Some(bytes) = self.staging_repo.targets.get(target_path) {
Ok(bytes)
} else {
self.parent_repo
.targets
.get(target_path)
.ok_or(Error::NotFound)
};
bytes_to_reader(bytes).boxed()
}
}
impl<D> RepositoryStorage<D> for EphemeralBatchUpdate<'_, D>
where
D: DataInterchange + Sync,
{
fn store_metadata<'a>(
&'a mut self,
meta_path: &MetadataPath,
version: MetadataVersion,
metadata: &'a mut (dyn AsyncRead + Send + Unpin + 'a),
) -> BoxFuture<'a, Result<()>> {
self.staging_repo
.store_metadata(meta_path, version, metadata)
}
fn store_target<'a>(
&'a mut self,
target_path: &TargetPath,
read: &'a mut (dyn AsyncRead + Send + Unpin + 'a),
) -> BoxFuture<'a, Result<()>> {
self.staging_repo.store_target(target_path, read)
}
}
#[allow(clippy::borrowed_box)]
async fn bytes_to_reader(
bytes: Result<&'_ Box<[u8]>>,
) -> Result<Box<dyn AsyncRead + Send + Unpin + '_>> {
let bytes = bytes?;
let reader: Box<dyn AsyncRead + Send + Unpin> = Box::new(Cursor::new(bytes));
Ok(reader)
}
#[cfg(test)]
mod test {
use super::*;
use crate::interchange::Json;
use crate::repository::{fetch_metadata_to_string, fetch_target_to_string};
use assert_matches::assert_matches;
use futures_executor::block_on;
#[test]
fn ephemeral_repo_targets() {
block_on(async {
let mut repo = EphemeralRepository::<Json>::new();
let path = TargetPath::new("batty").unwrap();
if let Err(err) = repo.fetch_target(&path).await {
assert_matches!(err, Error::NotFound);
} else {
panic!("expected fetch_target to fail");
}
let data: &[u8] = b"like tears in the rain";
let path = TargetPath::new("batty").unwrap();
repo.store_target(&path, &mut &*data).await.unwrap();
let mut read = repo.fetch_target(&path).await.unwrap();
let mut buf = Vec::new();
read.read_to_end(&mut buf).await.unwrap();
assert_eq!(buf.as_slice(), data);
drop(read);
// RepositoryProvider implementations do not guarantee data is not corrupt.
let bad_data: &[u8] = b"you're in a desert";
repo.store_target(&path, &mut &*bad_data).await.unwrap();
let mut read = repo.fetch_target(&path).await.unwrap();
buf.clear();
read.read_to_end(&mut buf).await.unwrap();
assert_eq!(buf.as_slice(), bad_data);
})
}
#[test]
fn ephemeral_repo_batch_update() {
block_on(async {
let mut repo = EphemeralRepository::<Json>::new();
let meta_path = MetadataPath::new("meta").unwrap();
let meta_version = MetadataVersion::None;
let target_path = TargetPath::new("target").unwrap();
// First, write some stuff to the repository.
let committed_meta = "committed meta";
let committed_target = "committed target";
repo.store_metadata(&meta_path, meta_version, &mut committed_meta.as_bytes())
.await
.unwrap();
repo.store_target(&target_path, &mut committed_target.as_bytes())
.await
.unwrap();
let mut batch = repo.batch_update();
// Make sure we can read back the committed stuff.
assert_eq!(
fetch_metadata_to_string(&batch, &meta_path, meta_version)
.await
.unwrap(),
committed_meta,
);
assert_eq!(
fetch_target_to_string(&batch, &target_path).await.unwrap(),
committed_target,
);
// Next, stage some stuff in the batch_update.
let staged_meta = "staged meta";
let staged_target = "staged target";
batch
.store_metadata(&meta_path, meta_version, &mut staged_meta.as_bytes())
.await
.unwrap();
batch
.store_target(&target_path, &mut staged_target.as_bytes())
.await
.unwrap();
// Make sure it got staged.
assert_eq!(
fetch_metadata_to_string(&batch, &meta_path, meta_version)
.await
.unwrap(),
staged_meta,
);
assert_eq!(
fetch_target_to_string(&batch, &target_path).await.unwrap(),
staged_target,
);
// Next, drop the batch_update. We shouldn't have written the data back to the
// repository.
drop(batch);
assert_eq!(
fetch_metadata_to_string(&repo, &meta_path, meta_version)
.await
.unwrap(),
committed_meta,
);
assert_eq!(
fetch_target_to_string(&repo, &target_path).await.unwrap(),
committed_target,
);
// Do the batch_update again, but this time write the data.
let mut batch = repo.batch_update();
batch
.store_metadata(&meta_path, meta_version, &mut staged_meta.as_bytes())
.await
.unwrap();
batch
.store_target(&target_path, &mut staged_target.as_bytes())
.await
.unwrap();
batch.commit();
// Make sure the new data got to the repository.
assert_eq!(
fetch_metadata_to_string(&repo, &meta_path, meta_version)
.await
.unwrap(),
staged_meta,
);
assert_eq!(
fetch_target_to_string(&repo, &target_path).await.unwrap(),
staged_target,
);
})
}
}