blob: ff1bc9659c8aac0c619e05f6bee242baacb1bd97 [file] [log] [blame]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//! Tools to download a Fuchsia package from from a TUF repository.
//! See
//! - [Package](https://fuchsia.dev/fuchsia-src/concepts/packages/package?hl=en)
//! - [TUF](https://theupdateframework.io/)
use {
crate::{
range::{ContentRange, Range},
repository::{Error, RepoProvider, RepositorySpec},
resource::Resource,
},
anyhow::{anyhow, Context as _, Result},
futures::{future::BoxFuture, AsyncRead, FutureExt as _, TryStreamExt as _},
hyper::{
client::{connect::Connect, Client},
header::{CONTENT_LENGTH, CONTENT_RANGE, RANGE},
Body, Method, Request, StatusCode, Uri,
},
std::{collections::BTreeSet, fmt::Debug, time::SystemTime},
tuf::{
metadata::{MetadataPath, MetadataVersion, TargetPath},
pouf::Pouf1,
repository::{
HttpRepository as TufHttpRepository, HttpRepositoryBuilder as TufHttpRepositoryBuilder,
RepositoryProvider as TufRepositoryProvider,
},
},
url::Url,
};
#[derive(Debug)]
pub struct HttpRepository<C>
where
C: Connect + Send + Sync + 'static,
{
client: Client<C, Body>,
tuf_repo: TufHttpRepository<C, Pouf1>,
metadata_repo_url: Url,
blob_repo_url: Url,
aliases: BTreeSet<String>,
}
impl<C> HttpRepository<C>
where
C: Connect + Clone + Debug + Send + Sync + 'static,
{
pub fn new(
client: Client<C, Body>,
mut metadata_repo_url: Url,
mut blob_repo_url: Url,
aliases: BTreeSet<String>,
) -> Self {
// `URL.join` treats urls with a trailing slash as a directory, and without as a file.
// In the latter case, it will strip off the last segment before joining paths. Since the
// metadata and blob url are directories, make sure they have a trailing slash.
if !metadata_repo_url.path().ends_with('/') {
metadata_repo_url.set_path(&format!("{}/", metadata_repo_url.path()));
}
if !blob_repo_url.path().ends_with('/') {
blob_repo_url.set_path(&format!("{}/", blob_repo_url.path()));
}
let tuf_repo =
TufHttpRepositoryBuilder::<_, Pouf1>::new(metadata_repo_url.clone(), client.clone())
.build();
Self { client, tuf_repo, metadata_repo_url, blob_repo_url, aliases }
}
}
impl<C> HttpRepository<C>
where
C: Connect + Clone + Send + Sync + 'static,
{
fn fetch_from<'a>(
&'a self,
root: &Url,
resource_path: &str,
range: Range,
) -> BoxFuture<'a, Result<Resource, Error>> {
let full_url = root.join(resource_path);
async move {
let full_url = full_url.map_err(|e| anyhow!(e))?;
let uri = full_url.as_str().parse::<Uri>().map_err(|e| anyhow!(e))?;
let mut builder = Request::builder().method(Method::GET).uri(uri);
// Add a 'Range' header if we're requesting a subset of the file.
if let Some(http_range) = range.to_http_request_header() {
builder = builder.header(RANGE, http_range);
}
let request = builder.body(Body::empty()).context("creating http request")?;
let resp = self
.client
.request(request)
.await
.context(format!("fetching resource {}", full_url.as_str()))?;
// Check if the response was successful, or propagate the error.
//
// Note that according to [RFC-7233], it's possible for a 'Range' request from the
// client to get a `200 OK` response and the full resource in return. Furthermore, the
// server is allowed to return a `206 Partial Content` and a different subset
// `Content-Range` that was requested by the `Range` request. The RFC mentions that a
// server may do this to coalesce overlapping ranges, or separated by a gap. So we'll
// parse these headers and pass along the response range to the caller.
//
// [RFC-7233]: https://datatracker.ietf.org/doc/html/rfc7233#section-4.1
let content_range = match resp.status() {
StatusCode::OK => {
// The package resolver currently requires a 'Content-Length' header, so error out
// if one wasn't provided.
let content_length = resp.headers().get(CONTENT_LENGTH).ok_or_else(|| {
Error::Other(anyhow!("response missing Content-Length header"))
})?;
ContentRange::from_http_content_length_header(content_length)
.context("parsing Content-Length header")?
}
StatusCode::PARTIAL_CONTENT => {
// According to [RFC-7233], a Partial Content status must come with a
// 'Content-Range' header.
//
// [RFC-7233]: https://datatracker.ietf.org/doc/html/rfc7233#section-4.1
let content_range = resp.headers().get(CONTENT_RANGE).ok_or_else(|| {
Error::Other(anyhow!(
"received Partial Content status, but missing Content-Range header"
))
})?;
ContentRange::from_http_content_range_header(content_range)
.context("parsing Content-Range header")?
}
StatusCode::NOT_FOUND => {
return Err(Error::NotFound);
}
StatusCode::RANGE_NOT_SATISFIABLE => {
return Err(Error::RangeNotSatisfiable);
}
status => {
if status.is_success() {
return Err(Error::Other(anyhow!("unexpected status code: {}", status)));
} else {
return Err(Error::Other(anyhow!(
"error downloading resource: {}",
status
)));
}
}
};
let body = resp.into_body();
Ok(Resource {
content_range,
stream: Box::pin(
body.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)),
),
})
}
.boxed()
}
}
impl<C> RepoProvider for HttpRepository<C>
where
C: Connect + Clone + Debug + Send + Sync + 'static,
{
fn spec(&self) -> RepositorySpec {
RepositorySpec::Http {
metadata_repo_url: self.metadata_repo_url.as_str().to_owned(),
blob_repo_url: self.blob_repo_url.as_str().to_owned(),
aliases: self.aliases.clone(),
}
}
fn aliases(&self) -> &BTreeSet<String> {
&self.aliases
}
fn fetch_metadata_range<'a>(
&'a self,
resource_path: &str,
range: Range,
) -> BoxFuture<'a, Result<Resource, Error>> {
self.fetch_from(&self.metadata_repo_url, resource_path, range)
}
fn fetch_blob_range<'a>(
&'a self,
resource_path: &str,
range: Range,
) -> BoxFuture<'a, Result<Resource, Error>> {
self.fetch_from(&self.blob_repo_url, resource_path, range)
}
fn blob_modification_time<'a>(
&'a self,
_path: &str,
) -> BoxFuture<'a, Result<Option<SystemTime>>> {
async move { Ok(None) }.boxed()
}
}
impl<C> TufRepositoryProvider<Pouf1> for HttpRepository<C>
where
C: Connect + Clone + Debug + Send + Sync + 'static,
{
fn fetch_metadata<'a>(
&'a self,
meta_path: &MetadataPath,
version: MetadataVersion,
) -> BoxFuture<'a, tuf::Result<Box<dyn AsyncRead + Send + Unpin + 'a>>> {
self.tuf_repo.fetch_metadata(meta_path, version)
}
fn fetch_target<'a>(
&'a self,
target_path: &TargetPath,
) -> BoxFuture<'a, tuf::Result<Box<dyn AsyncRead + Send + Unpin + 'a>>> {
self.tuf_repo.fetch_target(target_path)
}
}
#[cfg(test)]
mod tests {
use {
super::*,
crate::{
manager::RepositoryManager,
repo_client::RepoClient,
repository::repo_tests::{self, TestEnv as _},
server::RepositoryServer,
test_utils::make_pm_repository,
util::CHUNK_SIZE,
},
assert_matches::assert_matches,
camino::Utf8Path,
fuchsia_async as fasync,
fuchsia_hyper::{new_client, HyperConnector},
std::{fs::File, io::Write, net::Ipv4Addr, sync::Arc},
};
struct TestEnv {
tmp: tempfile::TempDir,
repo: HttpRepository<HyperConnector>,
server: RepositoryServer,
task: fasync::Task<()>,
}
impl TestEnv {
async fn new() -> Self {
let tmp = tempfile::tempdir().unwrap();
let dir = Utf8Path::from_path(tmp.path()).unwrap();
// Create a repository and serve it with the server.
let remote_backend = Box::new(make_pm_repository(&dir).await) as Box<_>;
let remote_repo = RepoClient::from_trusted_remote(remote_backend).await.unwrap();
let manager = RepositoryManager::new();
manager.add("tuf", remote_repo);
let addr = (Ipv4Addr::LOCALHOST, 0).into();
let (server_fut, _, server) =
RepositoryServer::builder(addr, Arc::clone(&manager)).start().await.unwrap();
// Run the server in the background.
let task = fasync::Task::local(server_fut);
let tuf_url = server.local_url() + "/tuf";
let blob_url = server.local_url() + "/tuf/blobs";
let repo = HttpRepository::new(
new_client(),
Url::parse(&tuf_url).unwrap(),
Url::parse(&blob_url).unwrap(),
BTreeSet::new(),
);
TestEnv { tmp, repo, server, task }
}
}
#[async_trait::async_trait]
impl repo_tests::TestEnv for TestEnv {
fn supports_range(&self) -> bool {
true
}
fn repo(&self) -> &dyn RepoProvider {
&self.repo
}
fn write_metadata(&self, path: &str, bytes: &[u8]) {
let file_path = self.tmp.path().join("repository").join(path);
let mut f = File::create(file_path).unwrap();
f.write_all(bytes).unwrap();
}
fn write_blob(&self, path: &str, bytes: &[u8]) {
let file_path = self.tmp.path().join("repository").join("blobs").join(path);
let mut f = File::create(file_path).unwrap();
f.write_all(bytes).unwrap();
}
async fn stop(self) {
let TestEnv { tmp: _tmp, repo, server, task } = self;
server.stop();
// Explicitly drop the repo so we close all our client connections before we wait for
// the server to shut down.
drop(repo);
task.await;
}
}
repo_tests::repo_test_suite! {
env = TestEnv::new().await;
chunk_size = CHUNK_SIZE;
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_blob_modification_time() {
let env = TestEnv::new().await;
std::fs::write(env.tmp.path().join("repository").join("blobs").join("empty-blob"), b"")
.unwrap();
// We don't support modification time.
assert_matches!(env.repo.blob_modification_time("empty-blob").await, Ok(None));
env.stop().await;
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_watch() {
let env = TestEnv::new().await;
// We don't support watch.
assert_matches!(env.repo.supports_watch(), false);
assert!(env.repo.watch().is_err());
env.stop().await;
}
}