blob: ebdb959036d4bf41bca4ab907f09346ce78387d9 [file] [log] [blame]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//! Tools to download a Fuchsia package from from a TUF repository.
//! See
//! - [Package](https://fuchsia.dev/fuchsia-src/concepts/packages/package?hl=en)
//! - [TUF](https://theupdateframework.io/)
use {
crate::{
range::{ContentRange, Range},
repository::{Error, RepositoryBackend},
resource::Resource,
},
anyhow::{anyhow, Context as _, Result},
fidl_fuchsia_developer_ffx_ext::RepositorySpec,
futures::TryStreamExt,
hyper::{
client::{connect::Connect, Client},
header::{CONTENT_LENGTH, CONTENT_RANGE, RANGE},
Body, Method, Request, StatusCode, Uri,
},
std::{fmt::Debug, time::SystemTime},
tuf::{
interchange::Json,
repository::{HttpRepositoryBuilder as TufHttpRepositoryBuilder, RepositoryProvider},
},
url::Url,
};
#[derive(Debug)]
pub struct HttpRepository<C> {
client: Client<C, Body>,
metadata_repo_url: Url,
blob_repo_url: Url,
}
impl<C> HttpRepository<C> {
pub fn new(
client: Client<C, Body>,
mut metadata_repo_url: Url,
mut blob_repo_url: Url,
) -> Self {
// `URL.join` treats urls with a trailing slash as a directory, and without as a file.
// In the latter case, it will strip off the last segment before joining paths. Since the
// metadata and blob url are directories, make sure they have a trailing slash.
if !metadata_repo_url.path().ends_with('/') {
metadata_repo_url.set_path(&format!("{}/", metadata_repo_url.path()));
}
if !blob_repo_url.path().ends_with('/') {
blob_repo_url.set_path(&format!("{}/", blob_repo_url.path()));
}
Self { client, metadata_repo_url, blob_repo_url }
}
}
impl<C> HttpRepository<C>
where
C: Connect + Clone + Send + Sync + 'static,
{
async fn fetch_from(
&self,
root: &Url,
resource_path: &str,
range: Range,
) -> Result<Resource, Error> {
let full_url = root.join(resource_path).map_err(|e| anyhow!(e))?;
let uri = full_url.as_str().parse::<Uri>().map_err(|e| anyhow!(e))?;
let mut builder = Request::builder().method(Method::GET).uri(uri);
// Add a 'Range' header if we're requesting a subset of the file.
if let Some(http_range) = range.to_http_request_header() {
builder = builder.header(RANGE, http_range);
}
let request = builder.body(Body::empty()).context("creating http request")?;
let resp = self
.client
.request(request)
.await
.context(format!("fetching resource {}", full_url.as_str()))?;
// Check if the response was successful, or propagate the error.
//
// Note that according to [RFC-7233], it's possible for a 'Range' request from the client to
// get a `200 OK` response and the full resource in return. Furthermore, the server is
// allowed to return a `206 Partial Content` and a different subset `Content-Range` that was
// requested by the `Range` request. The RFC mentions that a server may do this to coalesce
// overlapping ranges, or separated by a gap. So we'll parse these headers and pass along
// the response range to the caller.
//
// [RFC-7233]: https://datatracker.ietf.org/doc/html/rfc7233#section-4.1
let content_range = match resp.status() {
StatusCode::OK => {
// The package resolver currently requires a 'Content-Length' header, so error out
// if one wasn't provided.
let content_length = resp.headers().get(CONTENT_LENGTH).ok_or_else(|| {
Error::Other(anyhow!("response missing Content-Length header"))
})?;
ContentRange::from_http_content_length_header(content_length)
.context("parsing Content-Length header")?
}
StatusCode::PARTIAL_CONTENT => {
// According to [RFC-7233], a Partial Content status must come with a
// 'Content-Range' header.
//
// [RFC-7233]: https://datatracker.ietf.org/doc/html/rfc7233#section-4.1
let content_range = resp.headers().get(CONTENT_RANGE).ok_or_else(|| {
Error::Other(anyhow!(
"received Partial Content status, but missing Content-Range header"
))
})?;
ContentRange::from_http_content_range_header(content_range)
.context("parsing Content-Range header")?
}
StatusCode::NOT_FOUND => {
return Err(Error::NotFound);
}
StatusCode::RANGE_NOT_SATISFIABLE => {
return Err(Error::RangeNotSatisfiable);
}
status => {
if status.is_success() {
return Err(Error::Other(anyhow!("unexpected status code: {}", status)));
} else {
return Err(Error::Other(anyhow!("error downloading resource: {}", status)));
}
}
};
let body = resp.into_body();
Ok(Resource {
content_range,
stream: Box::pin(body.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))),
})
}
}
#[async_trait::async_trait]
impl<C> RepositoryBackend for HttpRepository<C>
where
C: Connect + Clone + Debug + Send + Sync + 'static,
{
fn spec(&self) -> RepositorySpec {
RepositorySpec::Http {
metadata_repo_url: self.metadata_repo_url.as_str().to_owned(),
blob_repo_url: self.blob_repo_url.as_str().to_owned(),
}
}
async fn fetch_metadata(&self, resource_path: &str, range: Range) -> Result<Resource, Error> {
self.fetch_from(&self.metadata_repo_url, resource_path, range).await
}
async fn fetch_blob(&self, resource_path: &str, range: Range) -> Result<Resource, Error> {
self.fetch_from(&self.blob_repo_url, resource_path, range).await
}
fn get_tuf_repo(
&self,
) -> Result<Box<(dyn RepositoryProvider<Json> + Send + Sync + 'static)>, Error> {
Ok(Box::new(
TufHttpRepositoryBuilder::<_, Json>::new(
self.metadata_repo_url.clone().into(),
self.client.clone(),
)
.build(),
))
}
async fn blob_len(&self, path: &str) -> Result<u64> {
// FIXME(http://fxbug.dev/98376): It may be more efficient to try to make a HEAD request and
// see if that includes the content length before falling back to us requesting the blob and
// dropping the stream.
Ok(self.fetch_blob(path, Range::Full).await?.total_len())
}
async fn blob_modification_time(&self, _path: &str) -> Result<Option<SystemTime>> {
Ok(None)
}
}
#[cfg(test)]
mod tests {
use {
super::*,
crate::{
manager::RepositoryManager, repository::repo_tests, server::RepositoryServer,
test_utils::make_pm_repository,
},
assert_matches::assert_matches,
camino::Utf8Path,
fuchsia_async as fasync,
fuchsia_hyper::{new_client, HyperConnector},
std::{net::Ipv4Addr, sync::Arc},
};
struct TestEnv {
tmp: tempfile::TempDir,
repo: HttpRepository<HyperConnector>,
server: RepositoryServer,
task: fasync::Task<()>,
}
impl TestEnv {
async fn new() -> Self {
let tmp = tempfile::tempdir().unwrap();
let dir = Utf8Path::from_path(tmp.path()).unwrap();
// Create a repository and serve it with the server.
let remote_repo = make_pm_repository("tuf", dir.to_path_buf()).await;
let manager = RepositoryManager::new();
manager.add(Arc::new(remote_repo));
let addr = (Ipv4Addr::LOCALHOST, 0).into();
let (server_fut, _, server) =
RepositoryServer::builder(addr, Arc::clone(&manager)).start().await.unwrap();
// Run the server in the background.
let task = fasync::Task::local(server_fut);
let tuf_url = server.local_url() + "/tuf";
let blob_url = server.local_url() + "/tuf/blobs";
let repo = HttpRepository::new(
new_client(),
Url::parse(&tuf_url).unwrap(),
Url::parse(&blob_url).unwrap(),
);
TestEnv { tmp, repo, server, task }
}
async fn stop(self) {
let TestEnv { tmp: _tmp, repo, server, task } = self;
server.stop();
// Explicitly drop the repo so we close all our client connections before we wait for
// the server to shut down.
drop(repo);
task.await;
}
}
#[async_trait::async_trait]
impl repo_tests::TestEnv for TestEnv {
fn supports_range(&self) -> bool {
true
}
async fn read_metadata(&self, path: &str, range: Range) -> Result<Vec<u8>, Error> {
let mut body = vec![];
self.repo.fetch_metadata(path, range).await?.read_to_end(&mut body).await?;
Ok(body)
}
async fn read_blob(&self, path: &str, range: Range) -> Result<Vec<u8>, Error> {
let mut body = vec![];
self.repo.fetch_blob(path, range).await?.read_to_end(&mut body).await?;
Ok(body)
}
fn write_metadata(&self, path: &str, bytes: &[u8]) {
std::fs::write(self.tmp.path().join("repository").join(path), bytes).unwrap();
}
fn write_blob(&self, path: &str, bytes: &[u8]) {
std::fs::write(self.tmp.path().join("repository").join("blobs").join(path), bytes)
.unwrap();
}
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_fetch_missing() {
let env = TestEnv::new().await;
repo_tests::check_fetch_missing(&env).await;
env.stop().await;
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_fetch_empty() {
let env = TestEnv::new().await;
repo_tests::check_fetch_empty(&env).await;
env.stop().await;
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_fetch_small() {
let env = TestEnv::new().await;
repo_tests::check_fetch_small(&env).await;
env.stop().await;
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_fetch_range_small() {
let env = TestEnv::new().await;
repo_tests::check_fetch_range_small(&env).await;
env.stop().await;
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_fetch() {
let env = TestEnv::new().await;
repo_tests::check_fetch(&env).await;
env.stop().await;
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_fetch_range() {
let env = TestEnv::new().await;
repo_tests::check_fetch_range(&env).await;
env.stop().await;
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_range_fetch_not_satisifiable() {
let env = TestEnv::new().await;
repo_tests::check_fetch_range_not_satisfiable(&env).await;
env.stop().await;
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_blob_modification_time() {
let env = TestEnv::new().await;
std::fs::write(env.tmp.path().join("repository").join("blobs").join("empty-blob"), b"")
.unwrap();
// We don't support modification time.
assert_matches!(env.repo.blob_modification_time("empty-blob").await, Ok(None));
env.stop().await;
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_watch() {
let env = TestEnv::new().await;
// We don't support watch.
assert_matches!(env.repo.supports_watch(), false);
assert!(env.repo.watch().is_err());
env.stop().await;
}
}