blob: c7d076224edfb28ddd4cfacd428c7dd55d9fef5b [file] [log] [blame]
use {
crate::{
interchange::DataInterchange,
metadata::{Metadata, MetadataPath, MetadataVersion, RawSignedMetadata, TargetPath},
repository::{RepositoryProvider, RepositoryStorage},
Result,
},
futures_io::AsyncRead,
futures_util::{
future::{BoxFuture, FutureExt},
io::{AsyncReadExt, Cursor},
},
std::sync::{Arc, Mutex},
};
#[derive(Debug, PartialEq)]
pub(crate) enum Track {
Store {
path: MetadataPath,
version: MetadataVersion,
metadata: String,
},
FetchFound {
path: MetadataPath,
version: MetadataVersion,
metadata: String,
},
FetchErr(MetadataPath, MetadataVersion),
}
impl Track {
pub(crate) fn store<T>(meta_path: &MetadataPath, version: MetadataVersion, metadata: T) -> Self
where
T: Into<Vec<u8>>,
{
Track::Store {
path: meta_path.clone(),
version,
metadata: String::from_utf8(metadata.into()).unwrap(),
}
}
pub(crate) fn store_meta<M, D>(
version: MetadataVersion,
metadata: &RawSignedMetadata<D, M>,
) -> Self
where
M: Metadata,
D: DataInterchange,
{
Self::store(&M::ROLE.into(), version, metadata.as_bytes())
}
pub(crate) fn fetch_found<T>(
meta_path: &MetadataPath,
version: MetadataVersion,
metadata: T,
) -> Self
where
T: Into<Vec<u8>>,
{
Track::FetchFound {
path: meta_path.clone(),
version,
metadata: String::from_utf8(metadata.into()).unwrap(),
}
}
pub(crate) fn fetch_meta_found<M, D>(
version: MetadataVersion,
metadata: &RawSignedMetadata<D, M>,
) -> Self
where
M: Metadata,
D: DataInterchange,
{
Track::fetch_found(&M::ROLE.into(), version, metadata.as_bytes())
}
}
/// Helper Repository wrapper that tracks all the metadata fetches and stores for testing purposes.
pub(crate) struct TrackRepository<R> {
repo: R,
tracks: Arc<Mutex<Vec<Track>>>,
}
impl<R> TrackRepository<R> {
pub(crate) fn new(repo: R) -> Self {
Self {
repo,
tracks: Arc::new(Mutex::new(vec![])),
}
}
pub(crate) fn take_tracks(&self) -> Vec<Track> {
self.tracks.lock().unwrap().drain(..).collect()
}
pub(crate) fn as_inner_mut(&mut self) -> &mut R {
&mut self.repo
}
}
impl<D, R> RepositoryStorage<D> for TrackRepository<R>
where
R: RepositoryStorage<D> + Sync + Send,
D: DataInterchange + Sync,
{
fn store_metadata<'a>(
&'a mut self,
meta_path: &MetadataPath,
version: MetadataVersion,
metadata: &'a mut (dyn AsyncRead + Send + Unpin + 'a),
) -> BoxFuture<'a, Result<()>> {
let meta_path = meta_path.clone();
async move {
let mut buf = Vec::new();
metadata.read_to_end(&mut buf).await?;
let () = self
.repo
.store_metadata(&meta_path, version, &mut buf.as_slice())
.await?;
self.tracks
.lock()
.unwrap()
.push(Track::store(&meta_path, version, buf));
Ok(())
}
.boxed()
}
fn store_target<'a>(
&'a mut self,
target_path: &TargetPath,
target: &'a mut (dyn AsyncRead + Send + Unpin + 'a),
) -> BoxFuture<'a, Result<()>> {
self.repo.store_target(target_path, target)
}
}
impl<D, R> RepositoryProvider<D> for TrackRepository<R>
where
D: DataInterchange + Sync,
R: RepositoryProvider<D> + Sync,
{
fn fetch_metadata<'a>(
&'a self,
meta_path: &MetadataPath,
version: MetadataVersion,
) -> BoxFuture<'a, Result<Box<dyn AsyncRead + Send + Unpin + 'a>>> {
let meta_path = meta_path.clone();
async move {
let fut = self.repo.fetch_metadata(&meta_path, version);
match fut.await {
Ok(mut rdr) => {
let mut buf = Vec::new();
rdr.read_to_end(&mut buf).await?;
self.tracks.lock().unwrap().push(Track::fetch_found(
&meta_path,
version,
buf.clone(),
));
let rdr: Box<dyn AsyncRead + Send + Unpin> = Box::new(Cursor::new(buf));
Ok(rdr)
}
Err(err) => {
self.tracks
.lock()
.unwrap()
.push(Track::FetchErr(meta_path, version));
Err(err)
}
}
}
.boxed()
}
fn fetch_target<'a>(
&'a self,
target_path: &TargetPath,
) -> BoxFuture<'a, Result<Box<dyn AsyncRead + Send + Unpin + 'a>>> {
self.repo.fetch_target(target_path)
}
}