blob: 1fc7d3cb90c38c874d2625a07bb5afe2b42e4799 [file] [log] [blame]
// Copyright 2022 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//! Tools to support working with Fuchsia package repositories hosted on Google Cloud storage.
//!
//! See
//! - [Package](https://fuchsia.dev/fuchsia-src/concepts/packages/package?hl=en)
//! - [TUF](https://theupdateframework.io/)
use {
crate::{
range::{ContentRange, Range},
repository::{Error, RepositoryBackend, RepositorySpec},
resource::Resource,
},
anyhow::{anyhow, Context as _},
futures::{future::BoxFuture, AsyncRead, FutureExt as _, TryStreamExt as _},
hyper::{header::CONTENT_LENGTH, Body, Response, StatusCode},
std::{fmt::Debug, io, time::SystemTime},
tuf::{
interchange::Json,
metadata::{MetadataPath, MetadataVersion, TargetPath},
repository::RepositoryProvider,
},
url::Url,
};
/// Helper trait that lets us mock gcs::client::Client for testing.
#[doc(hidden)]
#[async_trait::async_trait]
pub trait GcsClient {
async fn stream(&self, bucket: &str, object: &str) -> anyhow::Result<Response<Body>>;
}
#[async_trait::async_trait]
impl GcsClient for gcs::client::Client {
async fn stream(&self, bucket: &str, object: &str) -> anyhow::Result<Response<Body>> {
gcs::client::Client::stream(self, bucket, object).await
}
}
/// [GcsRepository] serves a package repository from a Google Cloud Storage bucket.
#[derive(Clone, Debug)]
pub struct GcsRepository<T = gcs::client::Client> {
client: T,
/// URL to the GCS bucket and object prefix that contains the metadata repository. This must
/// have the gs:// URL scheme. The constructor will make sure this has a trailing slash so it
/// is treated as a directory.
metadata_repo_url: Url,
/// URL to the GCS bucket and object prefix that contains the blobs repository. This must have
/// the gs:// URL scheme. The constructor will make sure this has a trailing slash so it is
/// treated as a directory.
blob_repo_url: Url,
}
impl<T> GcsRepository<T>
where
T: GcsClient + Debug,
{
pub fn new(
client: T,
mut metadata_repo_url: Url,
mut blob_repo_url: Url,
) -> Result<Self, anyhow::Error> {
if metadata_repo_url.scheme() != "gs" {
return Err(anyhow!("unsupported scheme {}", metadata_repo_url));
}
if blob_repo_url.scheme() != "gs" {
return Err(anyhow!("unsupported scheme {}", blob_repo_url));
}
// `URL.join` treats urls with a trailing slash as a directory, and without as a file.
// In the latter case, it will strip off the last segment before joining paths. Since the
// metadata and blob url are directories, make sure they have a trailing slash.
if !metadata_repo_url.path().ends_with('/') {
metadata_repo_url.set_path(&format!("{}/", metadata_repo_url.path()));
}
if !blob_repo_url.path().ends_with('/') {
blob_repo_url.set_path(&format!("{}/", blob_repo_url.path()));
}
Ok(Self { client, metadata_repo_url, blob_repo_url })
}
async fn get(&self, root: &Url, path: &str, range: Range) -> Result<Resource, Error> {
let url = root.join(path).map_err(|err| anyhow!(err))?;
let bucket = url.host_str().ok_or_else(|| anyhow!("url must include a bucket: {}", url))?;
let object = url.path();
// FIXME(http://fxbug.dev/98991): The gcs library does not yet support range requests, so
// always fetch the full range.
let resp = self.client.stream(bucket, object).await?;
let content_range = match resp.status() {
StatusCode::OK => {
// The package resolver currently requires a 'Content-Length' header, so error out
// if one wasn't provided.
let content_length = resp.headers().get(CONTENT_LENGTH).ok_or_else(|| {
Error::Other(anyhow!("response missing Content-Length header"))
})?;
ContentRange::from_http_content_length_header(content_length)
.context("parsing Content-Length header")?
}
StatusCode::NOT_FOUND => {
return Err(Error::NotFound);
}
StatusCode::RANGE_NOT_SATISFIABLE => {
return Err(Error::RangeNotSatisfiable);
}
status => {
if status.is_success() {
return Err(Error::Other(anyhow!("unexpected status code: {}", status)));
} else {
return Err(Error::Other(anyhow!("error downloading resource: {}", status)));
}
}
};
// Since we fetched the full length, validate that the range request is inbounds, or error
// out.
match range {
Range::Full => {}
Range::From { first_byte_pos } => {
if first_byte_pos >= content_range.total_len() {
return Err(Error::RangeNotSatisfiable);
}
}
Range::Inclusive { first_byte_pos, last_byte_pos } => {
if first_byte_pos > last_byte_pos
|| first_byte_pos >= content_range.total_len()
|| last_byte_pos >= content_range.total_len()
{
return Err(Error::RangeNotSatisfiable);
}
}
Range::Suffix { len } => {
if len > content_range.total_len() {
return Err(Error::RangeNotSatisfiable);
}
}
}
let body = resp.into_body();
Ok(Resource {
content_range,
stream: Box::pin(body.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))),
})
}
}
#[async_trait::async_trait]
impl<T> RepositoryBackend for GcsRepository<T>
where
T: GcsClient + Clone + Debug + Send + Sync + 'static,
{
fn spec(&self) -> RepositorySpec {
RepositorySpec::Gcs {
metadata_repo_url: self.metadata_repo_url.as_str().to_owned(),
blob_repo_url: self.blob_repo_url.as_str().to_owned(),
}
}
async fn fetch_metadata(&self, resource_path: &str, range: Range) -> Result<Resource, Error> {
self.get(&self.metadata_repo_url, &resource_path, range).await
}
async fn fetch_blob(&self, resource_path: &str, range: Range) -> Result<Resource, Error> {
self.get(&self.blob_repo_url, &resource_path, range).await
}
fn get_tuf_repo(
&self,
) -> Result<Box<(dyn RepositoryProvider<Json> + Send + Sync + 'static)>, Error> {
let repo = GcsRepository::clone(&self);
Ok(Box::new(TufGcsRepository { repo }))
}
async fn blob_len(&self, path: &str) -> Result<u64, anyhow::Error> {
// FIXME(http://fxbug.dev/98993): The gcs library does not expose a more efficient API for
// determining the blob size.
Ok(self.fetch_blob(path, Range::Full).await?.total_len())
}
async fn blob_modification_time(
&self,
_path: &str,
) -> Result<Option<SystemTime>, anyhow::Error> {
// FIXME(http://fxbug.dev/98993): The gcs library does not expose an API to determine the
// blob modification time.
Ok(None)
}
}
struct TufGcsRepository<T> {
repo: GcsRepository<T>,
}
impl<T> RepositoryProvider<Json> for TufGcsRepository<T>
where
T: GcsClient + Debug + Clone + Send + Sync + 'static,
{
fn fetch_metadata<'a>(
&'a self,
meta_path: &MetadataPath,
version: MetadataVersion,
) -> BoxFuture<'a, Result<Box<dyn AsyncRead + Send + Unpin + 'a>, tuf::Error>> {
let path = meta_path.components::<Json>(version).join("/");
let repo = self.repo.clone();
async move {
let resp = RepositoryBackend::fetch_metadata(&repo, &path, Range::Full).await.map_err(
|err| match err {
Error::Tuf(err) => err,
Error::NotFound => tuf::Error::NotFound,
err => tuf::Error::Opaque(err.to_string()),
},
)?;
let reader = resp
.stream
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
.into_async_read();
let reader: Box<dyn AsyncRead + Send + Unpin> = Box::new(reader);
Ok(reader)
}
.boxed()
}
fn fetch_target<'a>(
&'a self,
target_path: &TargetPath,
) -> BoxFuture<'a, Result<Box<dyn AsyncRead + Send + Unpin + 'a>, tuf::Error>> {
let path = target_path.components().join("/");
let repo = self.repo.clone();
async move {
let resp = RepositoryBackend::fetch_metadata(&repo, &path, Range::Full).await.map_err(
|err| match err {
Error::Tuf(err) => err,
Error::NotFound => tuf::Error::NotFound,
err => tuf::Error::Opaque(err.to_string()),
},
)?;
let reader = resp
.stream
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
.into_async_read();
let reader: Box<dyn AsyncRead + Send + Unpin> = Box::new(reader);
Ok(reader)
}
.boxed()
}
}
#[cfg(test)]
mod tests {
use {
super::*,
crate::{
repository::{repo_tests, FileSystemRepository},
test_utils::make_repository,
},
assert_matches::assert_matches,
camino::{Utf8Path, Utf8PathBuf},
std::{fs::File, io::Write as _},
url::Url,
};
#[derive(Debug, Clone)]
struct MockGcsClient {
repo: FileSystemRepository,
}
#[async_trait::async_trait]
impl GcsClient for MockGcsClient {
async fn stream(&self, bucket: &str, mut object: &str) -> anyhow::Result<Response<Body>> {
// The gcs library allows for leading slashes, but FileSystemRepository does not, so
// remove it.
if let Some(o) = object.strip_prefix('/') {
object = o;
}
let res = match bucket {
"my-tuf-repo" => self.repo.fetch_metadata(object, Range::Full).await,
"my-blob-repo" => self.repo.fetch_blob(object, Range::Full).await,
_ => panic!("unknown bucket {:?}", bucket),
};
match res {
Ok(resource) => {
// We don't support range requests.
assert_eq!(resource.content_range.to_http_content_range_header(), None);
Ok(Response::builder()
.status(StatusCode::OK)
.header("Content-Length", resource.content_len())
.body(Body::wrap_stream(resource.stream))
.unwrap())
}
Err(Error::NotFound) => Ok(Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::empty())
.unwrap()),
res => panic!("unexpected result {:#?}", res),
}
}
}
struct TestEnv {
_tmp: tempfile::TempDir,
metadata_repo_path: Utf8PathBuf,
blob_repo_path: Utf8PathBuf,
repo: GcsRepository<MockGcsClient>,
}
impl TestEnv {
async fn new() -> Self {
let tmp = tempfile::tempdir().unwrap();
let dir = Utf8Path::from_path(tmp.path()).unwrap();
// Create a repository and serve it with the server.
let metadata_repo_path = dir.join("tuf");
let blob_repo_path = dir.join("blobs");
std::fs::create_dir(&metadata_repo_path).unwrap();
std::fs::create_dir(&blob_repo_path).unwrap();
make_repository(blob_repo_path.as_std_path(), blob_repo_path.as_std_path()).await;
let remote_repo =
FileSystemRepository::new(metadata_repo_path.clone(), blob_repo_path.clone());
let tuf_url = "gs://my-tuf-repo/";
let blob_url = "gs://my-blob-repo/";
let repo = GcsRepository::new(
MockGcsClient { repo: remote_repo },
Url::parse(&tuf_url).unwrap(),
Url::parse(&blob_url).unwrap(),
)
.unwrap();
TestEnv { _tmp: tmp, metadata_repo_path, blob_repo_path, repo }
}
}
#[async_trait::async_trait]
impl repo_tests::TestEnv for TestEnv {
fn supports_range(&self) -> bool {
false
}
async fn read_metadata(&self, path: &str, range: Range) -> Result<Vec<u8>, Error> {
let mut body = vec![];
self.repo.fetch_metadata(path, range).await?.read_to_end(&mut body).await?;
Ok(body)
}
async fn read_blob(&self, path: &str, range: Range) -> Result<Vec<u8>, Error> {
let mut body = vec![];
self.repo.fetch_blob(path, range).await?.read_to_end(&mut body).await?;
Ok(body)
}
fn write_metadata(&self, path: &str, bytes: &[u8]) {
let file_path = self.metadata_repo_path.join(path);
println!("{:?} {:?}", self.metadata_repo_path, file_path);
let mut f = File::create(file_path).unwrap();
f.write_all(bytes).unwrap();
}
fn write_blob(&self, path: &str, bytes: &[u8]) {
let file_path = self.blob_repo_path.join(path);
let mut f = File::create(file_path).unwrap();
f.write_all(bytes).unwrap();
}
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_fetch_missing() {
let env = TestEnv::new().await;
repo_tests::check_fetch_missing(&env).await;
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_fetch_empty() {
let env = TestEnv::new().await;
repo_tests::check_fetch_empty(&env).await;
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_fetch_small() {
let env = TestEnv::new().await;
repo_tests::check_fetch_small(&env).await;
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_fetch_range_small() {
let env = TestEnv::new().await;
repo_tests::check_fetch_range_small(&env).await;
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_fetch() {
let env = TestEnv::new().await;
repo_tests::check_fetch(&env).await;
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_fetch_range() {
let env = TestEnv::new().await;
repo_tests::check_fetch_range(&env).await;
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_fetch_range_not_satisifiable() {
let env = TestEnv::new().await;
repo_tests::check_fetch_range_not_satisfiable(&env).await;
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_blob_modification_time() {
let env = TestEnv::new().await;
std::fs::write(env.blob_repo_path.join("empty-blob"), b"").unwrap();
// We don't support modification time.
assert_matches!(env.repo.blob_modification_time("empty-blob").await, Ok(None));
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_watch() {
let env = TestEnv::new().await;
// We don't support watch.
assert_matches!(env.repo.supports_watch(), false);
assert!(env.repo.watch().is_err());
}
}