blob: be2f62889a97cbc4633666bb883c3e64a6777cd1 [file] [log] [blame]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
crate::{
range::{ContentRange, Range},
repository::{Error, RepoProvider, RepoStorage, Resource},
util::file_stream,
},
anyhow::{anyhow, Context as _, Result},
camino::{Utf8Component, Utf8Path, Utf8PathBuf},
delivery_blob::DeliveryBlobType,
fuchsia_async as fasync,
fuchsia_merkle::Hash,
futures::{future::BoxFuture, AsyncRead, FutureExt as _},
std::{
collections::BTreeSet,
fs::{self, DirBuilder},
io::{Seek as _, SeekFrom},
os::unix::fs::MetadataExt,
time::SystemTime,
},
tempfile::{NamedTempFile, TempPath},
tracing::warn,
tuf::{
metadata::{MetadataPath, MetadataVersion, TargetPath},
pouf::Pouf1,
repository::{
FileSystemRepository as TufFileSystemRepository,
FileSystemRepositoryBuilder as TufFileSystemRepositoryBuilder,
RepositoryProvider as TufRepositoryProvider, RepositoryStorage as TufRepositoryStorage,
},
},
};
#[cfg(not(target_os = "fuchsia"))]
use {
crate::repository::RepositorySpec,
futures::{stream::BoxStream, Stream, StreamExt as _},
notify::{recommended_watcher, RecursiveMode, Watcher as _},
std::{
ffi::OsStr,
pin::Pin,
task::{Context, Poll},
},
};
/// Describes how package blobs should be copied into the repository.
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
pub enum CopyMode {
/// Copy package blobs into the repository. This will skip copying the blob if it already exists
/// in the repository.
///
/// This will create a Copy-on-Write (reflink) on file systems that support it.
#[default]
Copy,
/// Copy package blobs into the repository. This will overwrite a blob if it already exists in
/// the repository.
///
/// This will create a Copy-on-Write (reflink) on file systems that support it.
CopyOverwrite,
/// Create hard links from the package blobs into the repository.
HardLink,
}
/// A builder to create a repository contained on the local file system.
pub struct FileSystemRepositoryBuilder {
metadata_repo_path: Utf8PathBuf,
blob_repo_path: Utf8PathBuf,
copy_mode: CopyMode,
aliases: BTreeSet<String>,
delivery_blob_type: DeliveryBlobType,
}
impl FileSystemRepositoryBuilder {
/// Creates a [FileSystemRepositoryBuilder] where the TUF metadata is stored in
/// `metadata_repo_path`, and the blobs are stored in `blob_repo_path`.
pub fn new(metadata_repo_path: Utf8PathBuf, blob_repo_path: Utf8PathBuf) -> Self {
FileSystemRepositoryBuilder {
metadata_repo_path,
blob_repo_path,
copy_mode: CopyMode::Copy,
aliases: BTreeSet::new(),
delivery_blob_type: DeliveryBlobType::Type1,
}
}
/// Select which [CopyMode] to use when copying files into the repository.
pub fn copy_mode(mut self, copy_mode: CopyMode) -> Self {
self.copy_mode = copy_mode;
self
}
/// alias this repository to this name when this repository is registered on a target.
pub fn alias(mut self, alias: String) -> Self {
self.aliases.insert(alias);
self
}
/// alias this repository to these names when this repository is registered on a target.
pub fn aliases(mut self, aliases: impl IntoIterator<Item = String>) -> Self {
for alias in aliases {
self = self.alias(alias);
}
self
}
/// Set the type of delivery blob to generate when copying blobs into the repository.
pub fn delivery_blob_type(mut self, delivery_blob_type: DeliveryBlobType) -> Self {
self.delivery_blob_type = delivery_blob_type;
self
}
/// Set the path to the blob repo.
pub fn blob_repo_path(mut self, blob_repo_path: Utf8PathBuf) -> Self {
self.blob_repo_path = blob_repo_path;
self
}
/// Build a [FileSystemRepository].
pub fn build(self) -> FileSystemRepository {
FileSystemRepository {
metadata_repo_path: self.metadata_repo_path.clone(),
blob_repo_path: self.blob_repo_path,
copy_mode: self.copy_mode,
aliases: self.aliases,
delivery_blob_type: self.delivery_blob_type,
tuf_repo: TufFileSystemRepositoryBuilder::new(self.metadata_repo_path)
.targets_prefix("targets")
.build(),
}
}
}
/// Serve a repository from the file system.
#[derive(Debug)]
pub struct FileSystemRepository {
metadata_repo_path: Utf8PathBuf,
blob_repo_path: Utf8PathBuf,
copy_mode: CopyMode,
aliases: BTreeSet<String>,
delivery_blob_type: DeliveryBlobType,
tuf_repo: TufFileSystemRepository<Pouf1>,
}
impl FileSystemRepository {
/// Construct a [FileSystemRepositoryBuilder].
pub fn builder(
metadata_repo_path: Utf8PathBuf,
blob_repo_path: Utf8PathBuf,
) -> FileSystemRepositoryBuilder {
FileSystemRepositoryBuilder::new(metadata_repo_path, blob_repo_path)
}
/// Construct a [FileSystemRepository].
pub fn new(metadata_repo_path: Utf8PathBuf, blob_repo_path: Utf8PathBuf) -> Self {
Self::builder(metadata_repo_path, blob_repo_path).build()
}
pub fn blob_repo_path(&self) -> &Utf8PathBuf {
&self.blob_repo_path
}
fn fetch<'a>(
&'a self,
repo_path: &Utf8Path,
resource_path: &str,
range: Range,
) -> BoxFuture<'a, Result<Resource, Error>> {
let file_path = sanitize_path(repo_path, resource_path);
async move {
let file_path = file_path?;
let mut file = std::fs::File::open(&file_path).map_err(|err| {
if err.kind() == std::io::ErrorKind::NotFound {
Error::NotFound
} else {
Error::Io(err)
}
})?;
let total_len = file.metadata().map_err(Error::Io)?.len();
let content_range = match range {
Range::Full => ContentRange::Full { complete_len: total_len },
Range::Inclusive { first_byte_pos, last_byte_pos } => {
if first_byte_pos > last_byte_pos
|| first_byte_pos >= total_len
|| last_byte_pos >= total_len
{
return Err(Error::RangeNotSatisfiable);
}
file.seek(SeekFrom::Start(first_byte_pos)).map_err(Error::Io)?;
ContentRange::Inclusive {
first_byte_pos,
last_byte_pos,
complete_len: total_len,
}
}
Range::From { first_byte_pos } => {
if first_byte_pos >= total_len {
return Err(Error::RangeNotSatisfiable);
}
file.seek(SeekFrom::Start(first_byte_pos)).map_err(Error::Io)?;
ContentRange::Inclusive {
first_byte_pos,
last_byte_pos: total_len - 1,
complete_len: total_len,
}
}
Range::Suffix { len } => {
if len > total_len {
return Err(Error::RangeNotSatisfiable);
}
let start = total_len - len;
file.seek(SeekFrom::Start(start)).map_err(Error::Io)?;
ContentRange::Inclusive {
first_byte_pos: start,
last_byte_pos: total_len - 1,
complete_len: total_len,
}
}
};
let content_len = content_range.content_len();
Ok(Resource {
content_range,
stream: Box::pin(file_stream(content_len, file, Some(file_path))),
})
}
.boxed()
}
}
impl RepoProvider for FileSystemRepository {
#[cfg(not(target_os = "fuchsia"))]
fn spec(&self) -> RepositorySpec {
RepositorySpec::FileSystem {
metadata_repo_path: self.metadata_repo_path.clone(),
blob_repo_path: self.blob_repo_path.clone(),
aliases: self.aliases.clone(),
}
}
fn aliases(&self) -> &BTreeSet<String> {
&self.aliases
}
fn fetch_metadata_range<'a>(
&'a self,
resource_path: &str,
range: Range,
) -> BoxFuture<'a, Result<Resource, Error>> {
self.fetch(&self.metadata_repo_path, resource_path, range)
}
fn fetch_blob_range<'a>(
&'a self,
resource_path: &str,
range: Range,
) -> BoxFuture<'a, Result<Resource, Error>> {
self.fetch(&self.blob_repo_path, resource_path, range)
}
#[cfg(not(target_os = "fuchsia"))]
fn supports_watch(&self) -> bool {
true
}
#[cfg(not(target_os = "fuchsia"))]
fn watch(&self) -> Result<BoxStream<'static, ()>> {
// Since all we are doing is signaling that the timestamp file is changed, it's it's fine
// if the channel is full, since that just means we haven't consumed our notice yet.
let (mut sender, receiver) = futures::channel::mpsc::channel(1);
let mut watcher = recommended_watcher(move |event: notify::Result<notify::Event>| {
let event = match event {
Ok(event) => event,
Err(err) => {
warn!("error receving notify event: {}", err);
return;
}
};
// Send an event if any applied to timestamp.json.
let timestamp_name = OsStr::new("timestamp.json");
if event.paths.iter().any(|p| p.file_name() == Some(timestamp_name)) {
if let Err(e) = sender.try_send(()) {
if e.is_full() {
// It's okay to ignore a full channel, since that just means that the other
// side of the channel still has an outstanding notice, which should be the
// same effect if we re-sent the event.
} else if !e.is_disconnected() {
warn!("Error sending event: {:?}", e);
}
}
}
})?;
// Watch the repo path instead of directly watching timestamp.json to avoid
// https://github.com/notify-rs/notify/issues/165.
watcher.watch(self.metadata_repo_path.as_std_path(), RecursiveMode::NonRecursive)?;
Ok(WatchStream { _watcher: watcher, receiver }.boxed())
}
fn blob_modification_time<'a>(
&'a self,
path: &str,
) -> BoxFuture<'a, Result<Option<SystemTime>>> {
let file_path = sanitize_path(&self.blob_repo_path, path);
async move {
let file_path = file_path?;
Ok(Some(fs::metadata(&file_path)?.modified()?))
}
.boxed()
}
fn blob_type(&self) -> DeliveryBlobType {
self.delivery_blob_type
}
}
impl TufRepositoryProvider<Pouf1> for FileSystemRepository {
fn fetch_metadata<'a>(
&'a self,
meta_path: &MetadataPath,
version: MetadataVersion,
) -> BoxFuture<'a, tuf::Result<Box<dyn AsyncRead + Send + Unpin + 'a>>> {
self.tuf_repo.fetch_metadata(meta_path, version)
}
fn fetch_target<'a>(
&'a self,
target_path: &TargetPath,
) -> BoxFuture<'a, tuf::Result<Box<dyn AsyncRead + Send + Unpin + 'a>>> {
self.tuf_repo.fetch_target(target_path)
}
}
impl TufRepositoryStorage<Pouf1> for FileSystemRepository {
fn store_metadata<'a>(
&'a self,
meta_path: &MetadataPath,
version: MetadataVersion,
metadata: &'a mut (dyn AsyncRead + Send + Unpin + 'a),
) -> BoxFuture<'a, tuf::Result<()>> {
self.tuf_repo.store_metadata(meta_path, version, metadata)
}
fn store_target<'a>(
&'a self,
target_path: &TargetPath,
target: &'a mut (dyn AsyncRead + Send + Unpin + 'a),
) -> BoxFuture<'a, tuf::Result<()>> {
self.tuf_repo.store_target(target_path, target)
}
}
impl RepoStorage for FileSystemRepository {
fn store_blob<'a>(
&'a self,
hash: &Hash,
len: u64,
src: &Utf8Path,
) -> BoxFuture<'a, Result<()>> {
let src = src.to_path_buf();
let hash_str = hash.to_string();
let hash = *hash;
async move {
let dst = sanitize_path(&self.blob_repo_path, &hash_str)?;
let src_metadata = fs::metadata(&src)?;
let dst_metadata = match fs::metadata(&dst) {
Ok(metadata) => Some(metadata),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => None,
Err(e) => return Err(anyhow!(e)),
};
let dst_is_hardlink = if let Some(dst_metadata) = &dst_metadata {
dst_metadata.nlink() > 1
} else {
false
};
if src_metadata.len() != len {
return Err(anyhow!(BlobSizeMismatchError {
hash,
path: src.clone(),
manifest_size: len,
file_size: src_metadata.len(),
}));
}
let dst_len = dst_metadata.as_ref().map(|m| m.len());
let dst_exists = dst_metadata.is_some();
let dst_dirty = !dst_exists || dst_len != Some(len);
match self.copy_mode {
CopyMode::Copy => {
if dst_dirty || dst_is_hardlink {
copy_blob(&src, &dst).await?
}
}
CopyMode::CopyOverwrite => copy_blob(&src, &dst).await?,
CopyMode::HardLink => {
let is_hardlink = if let Some(dst_metadata) = &dst_metadata {
src_metadata.dev() == dst_metadata.dev()
&& src_metadata.ino() == dst_metadata.ino()
} else {
false
};
if is_hardlink {
// No work to do if src and dest are already hardlinks.
} else {
match fs::hard_link(&src, &dst) {
Ok(()) => {
// FIXME(b/271694204): Workaround an unknown issue where hardlinks
// aren't readable immediately after creation in some environments.
if fs::metadata(&dst).is_err() {
warn!("Hardlink at {dst:?} not yet readable");
fuchsia_async::Timer::new(std::time::Duration::from_secs(1)).await;
if fs::metadata(&dst).is_err() {
warn!("Hardlink at {dst:?} still not readable, falling back to copy");
copy_blob(&src, &dst).await?
}
}
}
Err(_) if dst_dirty => copy_blob(&src, &dst).await?,
Err(_) => {
// The dest file exists and has the right size,
// but we failed to make it a hardlink.
}
}
}
}
}
let dst = sanitize_path(
&self.blob_repo_path,
&format!("{}/{hash_str}", u32::from(self.delivery_blob_type)),
)?;
if self.copy_mode == CopyMode::CopyOverwrite || !path_exists(&dst).await? {
generate_delivery_blob(&src, &dst, self.delivery_blob_type).await?;
}
Ok(())
}
.boxed()
}
}
async fn path_exists(path: &Utf8Path) -> std::io::Result<bool> {
match fs::File::open(&path) {
Ok(_) => Ok(true),
Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(false),
Err(err) => Err(err),
}
}
async fn create_temp_file(path: &Utf8Path) -> Result<TempPath> {
let temp_file = if let Some(parent) = path.parent() {
DirBuilder::new().recursive(true).create(parent)?;
NamedTempFile::new_in(parent)?
} else {
NamedTempFile::new_in(".")?
};
Ok(temp_file.into_temp_path())
}
// Set the blob at `path` to be read-only.
async fn set_blob_read_only(path: &Utf8Path) -> Result<()> {
let file = fs::File::open(path)?;
let mut permissions = file.metadata()?.permissions();
permissions.set_readonly(true);
file.set_permissions(permissions)?;
Ok(())
}
// Performs a Copy-on-Write (reflink) of the file at `src_path` to `dst_path`.
#[cfg(target_os = "linux")]
async fn reflink(src_path: &Utf8Path, dst_path: &Utf8Path) -> Result<(), std::io::Error> {
use std::os::fd::AsRawFd;
let src = fs::File::open(src_path)?;
let dst = fs::File::create(dst_path)?;
// Safe because this is a synchronous syscall and the raw fds don't outlive the call.
let res = unsafe { libc::ioctl(dst.as_raw_fd(), libc::FICLONE, src.as_raw_fd()) };
match res {
-1 => {
let err = std::io::Error::last_os_error();
drop(dst);
let _ = fs::remove_file(dst_path);
match err.raw_os_error().unwrap() {
// The filesystem does not support reflinks.
libc::EOPNOTSUPP |
// src_path and dst_path are different filesystems.
libc::EXDEV |
// An invalid ioctl number was specified in an ioctl system call.
libc::ENOTTY => {
Err(std::io::Error::new(std::io::ErrorKind::Unsupported, err))
}
_ => Err(err),
}
}
_ => Ok(()),
}
}
#[cfg(not(target_os = "linux"))]
async fn reflink(_src_path: &Utf8Path, _dst_path: &Utf8Path) -> Result<(), std::io::Error> {
use libc as _;
Err(std::io::Error::from(std::io::ErrorKind::Unsupported))
}
async fn copy_blob(src: &Utf8Path, dst: &Utf8Path) -> Result<()> {
let temp_path = create_temp_file(dst).await?;
match reflink(src, (*temp_path).try_into()?).await {
Ok(()) => {}
Err(e) if e.kind() == std::io::ErrorKind::Unsupported => {
let src = src.to_owned();
let temp_path = temp_path.to_path_buf();
fasync::unblock(move || fs::copy(src, &temp_path)).await?;
}
Err(e) => return Err(anyhow!(e)),
}
temp_path.persist(dst)?;
set_blob_read_only(dst).await
}
pub(crate) async fn generate_delivery_blob(
src: &Utf8Path,
dst: &Utf8Path,
blob_type: DeliveryBlobType,
) -> Result<()> {
let src_blob = fs::read(src).with_context(|| format!("reading {src}"))?;
let temp_path = create_temp_file(dst).await?;
let file = std::fs::File::create(&temp_path)?;
fasync::unblock(move || {
delivery_blob::generate_to(blob_type, &src_blob, std::io::BufWriter::new(file))
})
.await
.context("generate delivery blob")?;
temp_path.persist(dst)?;
set_blob_read_only(dst).await
}
#[cfg(not(target_os = "fuchsia"))]
#[pin_project::pin_project]
struct WatchStream {
_watcher: notify::RecommendedWatcher,
#[pin]
receiver: futures::channel::mpsc::Receiver<()>,
}
#[cfg(not(target_os = "fuchsia"))]
impl Stream for WatchStream {
type Item = ();
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project().receiver.poll_next(cx)
}
}
/// Make sure the resource is inside the repo_path.
fn sanitize_path(repo_path: &Utf8Path, resource_path: &str) -> Result<Utf8PathBuf, Error> {
let resource_path = Utf8Path::new(resource_path);
let mut parts = vec![];
for component in resource_path.components() {
match component {
Utf8Component::Normal(part) => {
parts.push(part);
}
_ => {
warn!("invalid resource_path: {}", resource_path);
return Err(Error::InvalidPath(resource_path.into()));
}
}
}
let path = parts.into_iter().collect::<Utf8PathBuf>();
Ok(repo_path.join(path))
}
#[derive(Debug, thiserror::Error)]
#[error(
"blob {hash} at {path:?} is {file_size} bytes in size, \
but the package manifest indicates it should be {manifest_size} bytes in size"
)]
struct BlobSizeMismatchError {
hash: Hash,
path: Utf8PathBuf,
manifest_size: u64,
file_size: u64,
}
#[cfg(test)]
mod tests {
use {
super::*,
crate::{
repository::repo_tests::{self, TestEnv as _},
util::CHUNK_SIZE,
},
assert_matches::assert_matches,
fuchsia_async as fasync,
futures::{FutureExt, StreamExt},
std::{fs::File, io::Write as _, time::Duration},
};
struct TestEnv {
_tmp: tempfile::TempDir,
metadata_path: Utf8PathBuf,
blob_path: Utf8PathBuf,
repo: FileSystemRepository,
}
impl TestEnv {
fn new() -> Self {
let tmp = tempfile::tempdir().unwrap();
let dir = Utf8Path::from_path(tmp.path()).unwrap();
let metadata_path = dir.join("metadata");
let blob_path = dir.join("blobs");
std::fs::create_dir(&metadata_path).unwrap();
std::fs::create_dir(&blob_path).unwrap();
Self {
_tmp: tmp,
metadata_path: metadata_path.clone(),
blob_path: blob_path.clone(),
repo: FileSystemRepository::new(metadata_path, blob_path),
}
}
}
#[async_trait::async_trait]
impl repo_tests::TestEnv for TestEnv {
fn supports_range(&self) -> bool {
true
}
fn write_metadata(&self, path: &str, bytes: &[u8]) {
let file_path = self.metadata_path.join(path);
let mut f = File::create(file_path).unwrap();
f.write_all(bytes).unwrap();
}
fn write_blob(&self, path: &str, bytes: &[u8]) {
let file_path = self.blob_path.join(path);
let mut f = File::create(file_path).unwrap();
f.write_all(bytes).unwrap();
}
fn repo(&self) -> &dyn RepoProvider {
&self.repo
}
}
repo_tests::repo_test_suite! {
env = TestEnv::new();
chunk_size = CHUNK_SIZE;
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_blob_modification_time() {
let env = TestEnv::new();
let f = File::create(env.blob_path.join("empty-blob")).unwrap();
let blob_mtime = f.metadata().unwrap().modified().unwrap();
drop(f);
assert_matches!(
env.repo.blob_modification_time("empty-blob").await,
Ok(Some(t)) if t == blob_mtime
);
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_reject_invalid_paths() {
let env = TestEnv::new();
env.write_metadata("empty", b"");
assert_matches!(repo_tests::read_metadata(&env, "empty", Range::Full).await, Ok(body) if body == b"");
assert_matches!(repo_tests::read_metadata(&env, "subdir/../empty", Range::Full).await,
Err(Error::InvalidPath(path)) if path == Utf8Path::new("subdir/../empty")
);
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_watch() {
let env = TestEnv::new();
// We support watch.
assert!(env.repo.supports_watch());
let mut watch_stream = env.repo.watch().unwrap().fuse();
// Try to read from the stream. This should not return anything since we haven't created a
// file yet.
futures::select! {
_ = watch_stream.next() => panic!("should not have received an event"),
_ = fasync::Timer::new(Duration::from_millis(10)).fuse() => (),
};
// Next, write to the file and make sure we observe an event.
env.write_metadata("timestamp.json", br#"{"version":1}"#);
futures::select! {
result = watch_stream.next() => {
assert_eq!(result, Some(()));
},
_ = fasync::Timer::new(Duration::from_secs(10)).fuse() => {
panic!("wrote to timestamp.json, but did not get an event");
},
};
// Write to the file again and make sure we receive another event.
env.write_metadata("timestamp.json", br#"{"version":2}"#);
futures::select! {
result = watch_stream.next() => {
assert_eq!(result, Some(()));
},
_ = fasync::Timer::new(Duration::from_secs(10)).fuse() => {
panic!("wrote to timestamp.json, but did not get an event");
},
};
// FIXME(https://github.com/notify-rs/notify/pull/337): On OSX, notify uses a
// crossbeam-channel in `Drop` to shut down the interior thread. Unfortunately this can
// trip over an issue where OSX will tear down the thread local storage before shutting
// down the thread, which can trigger a panic. To avoid this issue, sleep a little bit
// after shutting down our stream.
drop(watch_stream);
fasync::Timer::new(Duration::from_millis(100)).await;
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_store_blob_verifies_src_length() {
let tmp = tempfile::tempdir().unwrap();
let dir = Utf8Path::from_path(tmp.path()).unwrap();
let metadata_repo_path = dir.join("metadata");
let blob_repo_path = dir.join("blobs");
std::fs::create_dir(&metadata_repo_path).unwrap();
std::fs::create_dir(&blob_repo_path).unwrap();
let repo = FileSystemRepository::builder(metadata_repo_path, blob_repo_path.clone())
.copy_mode(CopyMode::Copy)
.build();
// Store the blob.
let contents = b"hello world";
let path = dir.join("my-blob");
std::fs::write(&path, contents).unwrap();
let hash = fuchsia_merkle::from_slice(contents).root();
let err = repo.store_blob(&hash, contents.len() as u64 + 1, &path).await.unwrap_err();
assert_matches!(err.downcast_ref::<BlobSizeMismatchError>(), Some(_));
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_store_blob_copy_detects_length_mismatch() {
let tmp = tempfile::tempdir().unwrap();
let dir = Utf8Path::from_path(tmp.path()).unwrap();
let metadata_repo_path = dir.join("metadata");
let blob_repo_path = dir.join("blobs");
std::fs::create_dir(&metadata_repo_path).unwrap();
std::fs::create_dir(&blob_repo_path).unwrap();
let repo = FileSystemRepository::builder(metadata_repo_path, blob_repo_path.clone())
.copy_mode(CopyMode::Copy)
.build();
// Store the blob.
let contents = b"hello world";
let path = dir.join("my-blob");
std::fs::write(&path, contents).unwrap();
let hash = fuchsia_merkle::from_slice(contents).root();
assert_matches!(repo.store_blob(&hash, contents.len() as u64, &path).await, Ok(()));
// Make sure we can read it back.
let blob_path = blob_repo_path.join(hash.to_string());
let actual = std::fs::read(&blob_path).unwrap();
assert_eq!(&actual, &contents[..]);
assert!(std::fs::metadata(&blob_path).unwrap().permissions().readonly());
// Next, overwrite a blob that already exists.
let contents2 = b"another hello world";
let path2 = dir.join("my-blob2");
std::fs::write(&path2, contents2).unwrap();
assert_matches!(repo.store_blob(&hash, contents2.len() as u64, &path2).await, Ok(()));
// Make sure we get the new contents back.
let actual = std::fs::read(&blob_path).unwrap();
assert_eq!(&actual, &contents2[..]);
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_store_blob_copy_skips_present_blobs_of_correct_length() {
let tmp = tempfile::tempdir().unwrap();
let dir = Utf8Path::from_path(tmp.path()).unwrap();
let metadata_repo_path = dir.join("metadata");
let blob_repo_path = dir.join("blobs");
std::fs::create_dir(&metadata_repo_path).unwrap();
std::fs::create_dir(&blob_repo_path).unwrap();
let repo = FileSystemRepository::builder(metadata_repo_path, blob_repo_path.clone())
.copy_mode(CopyMode::Copy)
.build();
// Store the blob.
let contents = b"hello world.";
let path = dir.join("my-blob");
std::fs::write(&path, contents).unwrap();
let hash = fuchsia_merkle::from_slice(contents).root();
assert_matches!(repo.store_blob(&hash, contents.len() as u64, &path).await, Ok(()));
// Make sure we can read it back.
let blob_path = blob_repo_path.join(hash.to_string());
let actual = std::fs::read(&blob_path).unwrap();
assert_eq!(&actual, &contents[..]);
assert!(std::fs::metadata(&blob_path).unwrap().permissions().readonly());
// Next, we won't overwrite a blob that already exists.
let contents2 = b"Hello World!";
let path2 = dir.join("my-blob2");
std::fs::write(&path2, contents2).unwrap();
assert_matches!(repo.store_blob(&hash, contents2.len() as u64, &path2).await, Ok(()));
// Make sure we get the original contents back.
let actual = std::fs::read(&blob_path).unwrap();
assert_eq!(&actual, &contents[..]);
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_store_blob_copy_breaks_hardlinks() {
let tmp = tempfile::tempdir().unwrap();
let dir = Utf8Path::from_path(tmp.path()).unwrap();
let metadata_repo_path = dir.join("metadata");
let blob_repo_path = dir.join("blobs");
std::fs::create_dir(&metadata_repo_path).unwrap();
std::fs::create_dir(&blob_repo_path).unwrap();
let repo =
FileSystemRepository::builder(metadata_repo_path.clone(), blob_repo_path.clone())
.copy_mode(CopyMode::HardLink)
.build();
// Store the blob.
let contents = b"hello world.";
let path = dir.join("my-blob");
std::fs::write(&path, contents).unwrap();
let hash = fuchsia_merkle::from_slice(contents).root();
assert_matches!(repo.store_blob(&hash, contents.len() as u64, &path).await, Ok(()));
// Make sure we can read it back.
let blob_path = blob_repo_path.join(hash.to_string());
let actual = std::fs::read(&blob_path).unwrap();
assert_eq!(&actual, &contents[..]);
assert_eq!(std::fs::metadata(&blob_path).unwrap().nlink(), 2);
// Switch to Copy mode.
drop(repo);
let repo = FileSystemRepository::builder(metadata_repo_path, blob_repo_path.clone())
.copy_mode(CopyMode::Copy)
.build();
// Store the blob again
let contents2 = b"Hello World!";
let path2 = dir.join("my-blob2");
std::fs::write(&path2, contents2).unwrap();
assert_matches!(repo.store_blob(&hash, contents2.len() as u64, &path2).await, Ok(()));
// Make sure we can read it back.
let blob_path = blob_repo_path.join(hash.to_string());
let actual = std::fs::read(&blob_path).unwrap();
assert_eq!(&actual, &contents2[..]);
assert!(std::fs::metadata(&blob_path).unwrap().permissions().readonly());
assert_eq!(std::fs::metadata(&blob_path).unwrap().nlink(), 1);
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_store_blob_copy_overwrite() {
let tmp = tempfile::tempdir().unwrap();
let dir = Utf8Path::from_path(tmp.path()).unwrap();
let metadata_repo_path = dir.join("metadata");
let blob_repo_path = dir.join("blobs");
std::fs::create_dir(&metadata_repo_path).unwrap();
std::fs::create_dir(&blob_repo_path).unwrap();
let repo = FileSystemRepository::builder(metadata_repo_path, blob_repo_path.clone())
.copy_mode(CopyMode::CopyOverwrite)
.build();
// Store the blob.
let contents = b"hello world";
let path = dir.join("my-blob");
std::fs::write(&path, contents).unwrap();
let hash = fuchsia_merkle::from_slice(contents).root();
assert_matches!(repo.store_blob(&hash, contents.len() as u64, &path).await, Ok(()));
// Make sure we can read it back.
let blob_path = blob_repo_path.join(hash.to_string());
let actual = std::fs::read(&blob_path).unwrap();
assert_eq!(&actual, &contents[..]);
assert!(std::fs::metadata(&blob_path).unwrap().permissions().readonly());
// Next, overwrite a blob that already exists.
let contents2 = b"another blob";
let path2 = dir.join("my-blob2");
std::fs::write(&path2, contents2).unwrap();
assert_matches!(repo.store_blob(&hash, contents2.len() as u64, &path2).await, Ok(()));
// Make sure we get the new contents back.
let actual = std::fs::read(&blob_path).unwrap();
assert_eq!(&actual, &contents2[..]);
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_store_blob_hard_link() {
let tmp = tempfile::tempdir().unwrap();
let dir = Utf8Path::from_path(tmp.path()).unwrap();
let metadata_repo_path = dir.join("metadata");
let blob_repo_path = dir.join("blobs");
std::fs::create_dir(&metadata_repo_path).unwrap();
std::fs::create_dir(&blob_repo_path).unwrap();
let repo = FileSystemRepository::builder(metadata_repo_path, blob_repo_path.clone())
.copy_mode(CopyMode::HardLink)
.build();
// Store the blob.
let contents = b"hello world";
let path = dir.join("my-blob");
std::fs::write(&path, contents).unwrap();
let hash = fuchsia_merkle::from_slice(contents).root();
assert_matches!(repo.store_blob(&hash, contents.len() as u64, &path).await, Ok(()));
// Make sure we can read it back.
let blob_path = blob_repo_path.join(hash.to_string());
let actual = std::fs::read(&blob_path).unwrap();
assert_eq!(&actual, &contents[..]);
#[cfg(target_family = "unix")]
async fn check_links(blob_path: &Utf8Path) {
use std::os::unix::fs::MetadataExt as _;
assert_eq!(std::fs::metadata(blob_path).unwrap().nlink(), 2);
}
#[cfg(not(target_family = "unix"))]
async fn check_links(_blob_path: &Utf8Path) {}
// Make sure the hard link count was incremented.
check_links(&blob_path).await;
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_store_delivery_blob() {
let tmp = tempfile::tempdir().unwrap();
let dir = Utf8Path::from_path(tmp.path()).unwrap();
let metadata_repo_path = dir.join("metadata");
let blob_repo_path = dir.join("blobs");
std::fs::create_dir(&metadata_repo_path).unwrap();
std::fs::create_dir(&blob_repo_path).unwrap();
let repo = FileSystemRepository::builder(metadata_repo_path, blob_repo_path.clone())
.delivery_blob_type(DeliveryBlobType::Type1)
.build();
// Store the blob.
let contents = b"hello world";
let path = dir.join("my-blob");
std::fs::write(&path, contents).unwrap();
let hash = fuchsia_merkle::from_slice(contents).root();
assert_matches!(repo.store_blob(&hash, contents.len() as u64, &path).await, Ok(()));
// Make sure we can read the delivery blob.
let blob_path = blob_repo_path.join("1").join(hash.to_string());
let delivery_blob = std::fs::read(&blob_path).unwrap();
assert!(!delivery_blob.is_empty());
assert!(std::fs::metadata(&blob_path).unwrap().permissions().readonly());
// Next, we won't overwrite a blob that already exists.
let contents2 = b"another blob";
let path2 = dir.join("my-blob2");
std::fs::write(&path2, contents2).unwrap();
assert_matches!(repo.store_blob(&hash, contents2.len() as u64, &path2).await, Ok(()));
// Make sure we get the original contents back.
let actual = std::fs::read(&blob_path).unwrap();
assert_eq!(delivery_blob, actual);
}
}