blob: 72e1dbe56727530cf28078895a41b47c6e6ccffe [file] [log] [blame]
// Copyright 2022 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
crate::repository::{get_tuf_client, Repository},
anyhow::{anyhow, Context, Result},
async_lock::Mutex,
errors::ffx_bail,
fuchsia_merkle::{Hash, MerkleTreeBuilder},
fuchsia_pkg::MetaContents,
futures::{
future::BoxFuture,
io::Cursor,
stream::{self, StreamExt},
AsyncRead, AsyncReadExt as _, AsyncWriteExt as _, FutureExt as _, TryStreamExt as _,
},
serde_json::Value,
std::{
fs::File,
path::{Path, PathBuf},
sync::Arc,
},
tempfile::NamedTempFile,
tuf::{
interchange::DataInterchange,
metadata::{MetadataPath, MetadataVersion, TargetDescription, TargetPath},
repository::{FileSystemRepository, RepositoryProvider, RepositoryStorage},
Error,
},
};
/// Download the metadata and all package blobs from a repository.
///
/// `repo`: Download the package from this repository.
/// `metadata_dir`: Write repository metadata to this directory.
/// `output_blobs_dir`: Write the package blobs into this directory.
/// `concurrency`: Maximum number of blobs to download at the same time.
pub async fn resolve_repository(
repo: &Repository,
metadata_dir: impl AsRef<Path>,
blobs_dir: impl AsRef<Path>,
concurrency: usize,
) -> Result<()> {
let metadata_dir = metadata_dir.as_ref();
let blobs_dir = blobs_dir.as_ref();
let upstream_repo = repo.get_tuf_repo()?;
// Cache the TUF metadata from the upstream repository into a temporary directory.
let mut local_repo =
FileSystemRepository::builder(&metadata_dir).targets_prefix("targets").build()?;
let mut batch_repo = local_repo.batch_update();
let trusted_targets = {
let cache_repo = CacheRepository::new(upstream_repo, &mut batch_repo);
// Fetch the metadata, which will verify that it is correct.
let mut tuf_client = get_tuf_client(cache_repo).await?;
tuf_client.update().await?;
tuf_client.database().trusted_targets().cloned()
};
// Commit the metadata after we've verified the TUF metadata.
batch_repo.commit().await?;
// Exit early if there are no targets.
if let Some(trusted_targets) = trusted_targets {
// Download all the packages.
for desc in trusted_targets.targets().values() {
let merkle = merkle_from_description(&desc)?;
fetch_package(repo, merkle, &blobs_dir, concurrency).await?;
}
};
Ok(())
}
/// Download a package from a repository and write the blobs to a directory.
///
/// `repo`: Download the package from this repository.
/// `package_path`: Path to the package in the repository.
/// `output_blobs_dir`: Write the package blobs into this directory.
/// `concurrency`: Maximum number of blobs to download at the same time.
pub async fn resolve_package(
repo: &Repository,
package_path: &str,
output_blobs_dir: impl AsRef<Path>,
concurrency: usize,
) -> Result<Hash> {
let output_blobs_dir = output_blobs_dir.as_ref();
let desc = repo
.get_target_description(package_path)
.await?
.context("missing target description here")?;
let meta_far_hash = merkle_from_description(&desc)?;
fetch_package(repo, meta_far_hash, &output_blobs_dir, concurrency).await
}
pub async fn fetch_package(
repo: &Repository,
meta_far_hash: Hash,
output_blobs_dir: impl AsRef<Path>,
concurrency: usize,
) -> Result<Hash> {
let output_blobs_dir = output_blobs_dir.as_ref();
if !output_blobs_dir.exists() {
async_fs::create_dir_all(output_blobs_dir).await?;
}
if output_blobs_dir.is_file() {
ffx_bail!("Download path points at a file: {}", output_blobs_dir.display());
}
// First, download the meta.far.
let meta_far_path =
download_blob_to_destination(&repo, &output_blobs_dir, &meta_far_hash).await?;
let mut archive = File::open(&meta_far_path)?;
let mut meta_far = fuchsia_archive::Reader::new(&mut archive)?;
let meta_contents = meta_far.read_file("meta/contents")?;
let meta_contents = MetaContents::deserialize(meta_contents.as_slice())?.into_contents();
// Download all the blobs.
// FIXME(http://fxbug.dev/97192): Consider replacing this with work_queue to allow the caller to
// globally control the concurrency.
let mut tasks = stream::iter(
meta_contents
.values()
.map(|hash| download_blob_to_destination(&repo, &output_blobs_dir, hash)),
)
.buffer_unordered(concurrency);
// Wait until all the package blobs have finished downloading.
while let Some(_) = tasks.try_next().await? {}
Ok(meta_far_hash)
}
struct CacheRepository<U, C> {
upstream: U,
cache: Arc<Mutex<C>>,
}
impl<U, C> CacheRepository<U, C> {
/// Construct a [CacheRepository].
pub(crate) fn new(upstream: U, cache: C) -> Self {
Self { upstream, cache: Arc::new(Mutex::new(cache)) }
}
}
impl<D, U, C> RepositoryProvider<D> for CacheRepository<U, C>
where
D: DataInterchange + Sync,
U: RepositoryProvider<D> + Send + Sync,
C: RepositoryStorage<D> + Send + Sync,
{
fn fetch_metadata<'a>(
&'a self,
meta_path: &MetadataPath,
version: MetadataVersion,
) -> BoxFuture<'a, Result<Box<dyn AsyncRead + Send + Unpin + 'a>, Error>> {
let meta_path = meta_path.clone();
let metadata_fut = self.upstream.fetch_metadata(&meta_path, version.clone());
let cache = Arc::clone(&self.cache);
async move {
let mut metadata = metadata_fut.await?;
let mut bytes = vec![];
metadata.read_to_end(&mut bytes).await?;
let mut cache = cache.lock().await;
cache.store_metadata(&meta_path, version, &mut Cursor::new(&bytes)).await?;
let reader: Box<dyn AsyncRead + Send + Unpin> = Box::new(Cursor::new(bytes));
Ok(reader)
}
.boxed()
}
fn fetch_target<'a>(
&'a self,
target_path: &TargetPath,
) -> BoxFuture<'a, Result<Box<dyn AsyncRead + Send + Unpin + 'a>, Error>> {
let target_path = target_path.clone();
let target_fut = self.upstream.fetch_target(&target_path);
let cache = Arc::clone(&self.cache);
async move {
let mut target = target_fut.await?;
let mut bytes = vec![];
target.read_to_end(&mut bytes).await?;
let mut cache = cache.lock().await;
cache.store_target(&target_path, &mut Cursor::new(&bytes)).await?;
let reader: Box<dyn AsyncRead + Send + Unpin> = Box::new(Cursor::new(bytes));
Ok(reader)
}
.boxed()
}
}
fn merkle_from_description(desc: &TargetDescription) -> Result<Hash> {
let merkle = desc.custom().get("merkle").context("missing merkle")?;
if let Value::String(hash) = merkle {
Ok(hash.parse()?)
} else {
ffx_bail!("Merkle field is not a String. {:#?}", desc)
}
}
/// Download a blob from the repository and save it to the given
/// destination
/// `path`: Path on the server from which to download the package.
/// `repo`: A [Repository] instance.
/// `destination`: Local path to save the downloaded package.
async fn download_blob_to_destination(
repo: &Repository,
dir: &Path,
blob: &Hash,
) -> Result<PathBuf> {
let blob_str = blob.to_string();
let path = dir.join(&blob_str);
// If the local path already exists, check if has the correct merkle. If so, exit early.
match async_fs::File::open(&path).await {
Ok(mut file) => {
let hash = fuchsia_merkle::from_async_read(&mut file).await?.root();
if blob == &hash {
return Ok(path);
}
}
Err(err) => {
if err.kind() != std::io::ErrorKind::NotFound {
return Err(err.into());
}
}
};
// Otherwise download the blob into a temporary file, and validate that it has the right
// hash.
let mut resource =
repo.fetch_blob(&blob_str).await.with_context(|| format!("fetching {}", blob))?;
let (file, temp_path) = NamedTempFile::new_in(dir)?.into_parts();
let mut file = async_fs::File::from(file);
let mut merkle_builder = MerkleTreeBuilder::new();
while let Some(chunk) = resource.stream.try_next().await? {
merkle_builder.write(&chunk);
file.write_all(&chunk).await?;
}
let hash = merkle_builder.finish().root();
// Error out if the merkle doesn't match what we expected.
if blob == &hash {
// Flush the file to make sure all the bytes got written to disk.
file.flush().await?;
drop(file);
temp_path.persist(&path)?;
Ok(path)
} else {
Err(anyhow!("invalid merkle: expected {:?}, got {:?}", blob, hash))
}
}
#[cfg(test)]
mod tests {
use {
super::*,
crate::test_utils::{make_file_system_repository, PKG1_BIN_HASH, PKG1_HASH, PKG1_LIB_HASH},
camino::Utf8Path,
pretty_assertions::assert_eq,
std::{collections::BTreeSet, fs::create_dir},
};
const DOWNLOAD_CONCURRENCY: usize = 5;
#[fuchsia_async::run_singlethreaded(test)]
async fn test_resolve_package() {
let tmp = tempfile::tempdir().unwrap();
let dir = Utf8Path::from_path(tmp.path()).unwrap();
// Create the repository.
let src_repo_dir = dir.join("src");
let src_metadata_dir = src_repo_dir.join("metadata");
let src_blobs_dir = src_repo_dir.join("blobs");
let repo = make_file_system_repository("tuf", &src_metadata_dir, &src_blobs_dir).await;
// Store downloaded artifacts in this directory.
let result_dir = dir.join("results");
create_dir(&result_dir).unwrap();
// Download the package.
let meta_far_hash =
resolve_package(&repo, "package1", &result_dir, DOWNLOAD_CONCURRENCY).await.unwrap();
// Make sure we downloaded the right hash.
assert_eq!(meta_far_hash.to_string(), PKG1_HASH);
// Check that all the files got downloaded correctly.
let result_paths = std::fs::read_dir(&result_dir)
.unwrap()
.map(|e| e.unwrap().path())
.collect::<BTreeSet<_>>();
assert_eq!(
result_paths,
BTreeSet::from([
result_dir.join(PKG1_HASH).into_std_path_buf(),
result_dir.join(PKG1_BIN_HASH).into_std_path_buf(),
result_dir.join(PKG1_LIB_HASH).into_std_path_buf(),
])
);
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_resolve_repository() {
let tmp = tempfile::tempdir().unwrap();
let dir = Utf8Path::from_path(tmp.path()).unwrap();
// First construct a repository that we'll download.
let src_repo_dir = dir.join("src");
let src_metadata_dir = src_repo_dir.join("metadata");
let src_blobs_dir = src_repo_dir.join("blobs");
let repo = make_file_system_repository("tuf", &src_metadata_dir, &src_blobs_dir).await;
// Download the repository.
let dst_repo_dir = dir.join("dst");
let dst_metadata_dir = dst_repo_dir.join("metadata");
let dst_blobs_dir = dst_repo_dir.join("blobs");
resolve_repository(&repo, &dst_metadata_dir, &dst_blobs_dir, DOWNLOAD_CONCURRENCY)
.await
.unwrap();
// Make sure that we downloaded all the metadata. We don't compare the directories because
// make_repository also produces target files, or unversioned metadata.
for name in ["1.root.json", "1.targets.json", "1.snapshot.json", "timestamp.json"] {
let src_path = src_metadata_dir.join(name);
let dst_path = dst_metadata_dir.join(name);
let src = std::fs::read_to_string(&src_path).unwrap();
let dst = std::fs::read_to_string(&dst_path).unwrap();
assert_eq!(src, dst);
}
// Make sure all the blobs were fetched.
for entry in std::fs::read_dir(&src_blobs_dir).unwrap() {
let entry = entry.unwrap();
let src_path = entry.path();
let blob = src_path.strip_prefix(&src_blobs_dir).unwrap();
let dst_path = dst_blobs_dir.as_std_path().join(blob);
let src = std::fs::read(&src_path).unwrap();
let dst = std::fs::read(&dst_path).unwrap();
assert_eq!(src, dst);
}
}
}