Switch to hyper 0.12
diff --git a/Cargo.toml b/Cargo.toml
index 08b9194..b4f0f35 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -25,8 +25,10 @@
chrono = { version = "0.4", features = [ "serde" ] }
data-encoding = "2.0.0-rc.2"
derp = "0.0.11"
-futures-preview = "0.3.0-alpha.10"
-hyper = "0.10"
+futures_01 = { version = "0.1", package = "futures" }
+futures-preview = { version = "0.3.0-alpha.10", features = [ "compat" ] }
+http = "0.1"
+hyper = "0.12"
itoa = "0.4"
log = "0.4"
ring = { version = "0.13", features = [ "rsa_signing" ] }
@@ -35,6 +37,7 @@
serde_json = "1"
tempfile = "3"
untrusted = "0.6"
+url = "1"
[dev-dependencies]
lazy_static = "1"
diff --git a/src/client.rs b/src/client.rs
index 6dff236..4815084 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -6,7 +6,6 @@
//! #![feature(async_await, await_macro, futures_api, pin)]
//! # use futures::executor::block_on;
//! # use hyper::client::Client as HttpClient;
-//! # use hyper::Url;
//! # use std::path::PathBuf;
//! # use tuf::{Result, Tuf};
//! # use tuf::crypto::KeyId;
@@ -15,6 +14,7 @@
//! # MetadataVersion};
//! # use tuf::interchange::Json;
//! # use tuf::repository::{Repository, FileSystemRepository, HttpRepository};
+//!
//! static TRUSTED_ROOT_KEY_IDS: &'static [&str] = &[
//! "diNfThTFm0PI8R-Bq7NztUIvZbZiaC_weJBgcqaHlWw=",
//! "ar9AgoRsmeEcf6Ponta_1TZu1ds5uXbDemBig30O7ck=",
@@ -30,7 +30,7 @@
//! let local = FileSystemRepository::<Json>::new(PathBuf::from("~/.rustup"))?;
//!
//! let remote = HttpRepository::new(
-//! Url::parse("https://static.rust-lang.org/").unwrap(),
+//! url::Url::parse("https://static.rust-lang.org/").unwrap(),
//! HttpClient::new(),
//! Some("rustup/1.4.0".into()),
//! None);
diff --git a/src/error.rs b/src/error.rs
index e708598..ddd61d6 100644
--- a/src/error.rs
+++ b/src/error.rs
@@ -2,6 +2,7 @@
use data_encoding::DecodeError;
use derp;
+use http;
use hyper;
use serde_json;
use std::fmt;
@@ -89,14 +90,14 @@
}
}
-impl From<hyper::error::Error> for Error {
- fn from(err: hyper::error::Error) -> Error {
- Error::Opaque(format!("Hyper: {:?}", err))
+impl From<http::Error> for Error {
+ fn from(err: http::Error) -> Error {
+ Error::Opaque(format!("Http: {:?}", err))
}
}
-impl From<hyper::error::ParseError> for Error {
- fn from(err: hyper::error::ParseError) -> Error {
+impl From<hyper::Error> for Error {
+ fn from(err: hyper::Error) -> Error {
Error::Opaque(format!("Hyper: {:?}", err))
}
}
diff --git a/src/into_async_read.rs b/src/into_async_read.rs
new file mode 100644
index 0000000..18ce7c7
--- /dev/null
+++ b/src/into_async_read.rs
@@ -0,0 +1,93 @@
+// FIXME: remove once https://github.com/rust-lang-nursery/futures-rs/pull/1367 lands in a release.
+
+use futures::io::AsyncRead;
+use futures::ready;
+use futures::stream::TryStream;
+use futures::task::{LocalWaker, Poll};
+use std::cmp;
+use std::io::{Error, Result};
+use std::marker::Unpin;
+use std::pin::Pin;
+
+pub struct IntoAsyncRead<St>
+where
+ St: TryStream<Error = Error> + Unpin,
+ St::Ok: AsRef<[u8]>,
+{
+ stream: St,
+ state: ReadState<St::Ok>,
+}
+
+impl<St> Unpin for IntoAsyncRead<St>
+where
+ St: TryStream<Error = Error> + Unpin,
+ St::Ok: AsRef<[u8]>,
+{
+}
+
+#[derive(Debug)]
+enum ReadState<T: AsRef<[u8]>> {
+ Ready { chunk: T, chunk_start: usize },
+ PendingChunk,
+ Eof,
+}
+
+impl<St> IntoAsyncRead<St>
+where
+ St: TryStream<Error = Error> + Unpin,
+ St::Ok: AsRef<[u8]>,
+{
+ pub(super) fn new(stream: St) -> Self {
+ IntoAsyncRead {
+ stream,
+ state: ReadState::PendingChunk,
+ }
+ }
+}
+
+impl<St> AsyncRead for IntoAsyncRead<St>
+where
+ St: TryStream<Error = Error> + Unpin,
+ St::Ok: AsRef<[u8]>,
+{
+ fn poll_read(&mut self, lw: &LocalWaker, buf: &mut [u8]) -> Poll<Result<usize>> {
+ loop {
+ match &mut self.state {
+ ReadState::Ready { chunk, chunk_start } => {
+ let chunk = chunk.as_ref();
+ let len = cmp::min(buf.len(), chunk.len() - *chunk_start);
+
+ buf[..len].copy_from_slice(&chunk[*chunk_start..*chunk_start + len]);
+ *chunk_start += len;
+
+ if chunk.len() == *chunk_start {
+ self.state = ReadState::PendingChunk;
+ }
+
+ return Poll::Ready(Ok(len));
+ }
+ ReadState::PendingChunk => {
+ match ready!(Pin::new(&mut self.stream).try_poll_next(lw)) {
+ Some(Ok(chunk)) => {
+ self.state = ReadState::Ready {
+ chunk,
+ chunk_start: 0,
+ };
+ continue;
+ }
+ Some(Err(err)) => {
+ return Poll::Ready(Err(err));
+ }
+ None => {
+ self.state = ReadState::Eof;
+ return Poll::Ready(Ok(0));
+ }
+ }
+ }
+ ReadState::Eof => {
+ return Poll::Ready(Ok(0));
+ }
+ }
+ }
+ }
+}
diff --git a/src/lib.rs b/src/lib.rs
index 456c329..3eb3ac0 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -119,6 +119,7 @@
pub mod repository;
pub mod tuf;
+mod into_async_read;
mod shims;
mod util;
diff --git a/src/repository.rs b/src/repository.rs
index 518d468..4f798ca 100644
--- a/src/repository.rs
+++ b/src/repository.rs
@@ -1,27 +1,32 @@
//! Interfaces for interacting with different types of TUF repositories.
-use futures::io::{AllowStdIo, AsyncRead, AsyncReadExt};
-use hyper::client::response::Response;
-use hyper::header::{Headers, UserAgent};
-use hyper::status::StatusCode;
-use hyper::{Client, Url};
+use futures::compat::{Future01CompatExt, Stream01CompatExt};
+use futures::io::{AllowStdIo, AsyncRead};
+use futures::prelude::*;
+use http::{Response, StatusCode, Uri};
+use hyper::body::Body;
+use hyper::client::connect::Connect;
+use hyper::Client;
+use hyper::Request;
use log::debug;
use std::collections::HashMap;
use std::fs::{DirBuilder, File};
-use std::io::Cursor;
+use std::io::{self, Cursor};
use std::marker::PhantomData;
use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock};
-use tempfile::NamedTempFile;
+use tempfile::{self, NamedTempFile};
use crate::crypto::{self, HashAlgorithm, HashValue};
use crate::error::Error;
use crate::interchange::DataInterchange;
+use crate::into_async_read::IntoAsyncRead;
use crate::metadata::{
Metadata, MetadataPath, MetadataVersion, SignedMetadata, TargetDescription, TargetPath,
};
use crate::util::SafeReader;
use crate::{Result, TufFuture};
+use url::Url;
/// Top-level trait that represents a TUF repository and contains all the ways it can be interacted
/// with.
@@ -254,19 +259,21 @@
}
/// A repository accessible over HTTP.
-pub struct HttpRepository<D>
+pub struct HttpRepository<C, D>
where
+ C: Connect + Sync,
D: DataInterchange,
{
url: Url,
- client: Client,
+ client: Client<C>,
user_agent: String,
metadata_prefix: Option<Vec<String>>,
interchange: PhantomData<D>,
}
-impl<D> HttpRepository<D>
+impl<C, D> HttpRepository<C, D>
where
+ C: Connect + Sync + 'static,
D: DataInterchange,
{
/// Create a new repository with the given `Url` and `Client`.
@@ -281,7 +288,7 @@
/// `https://tuf.example.com/meta/root.json`.
pub fn new(
url: Url,
- client: Client,
+ client: Client<C>,
user_agent_prefix: Option<String>,
metadata_prefix: Option<Vec<String>>,
) -> Self {
@@ -299,10 +306,11 @@
}
}
- fn get(&self, prefix: &Option<Vec<String>>, components: &[String]) -> Result<Response> {
- let mut headers = Headers::new();
- headers.set(UserAgent(self.user_agent.clone()));
-
+ async fn get<'a>(
+ &'a self,
+ prefix: &'a Option<Vec<String>>,
+ components: &'a [String],
+ ) -> Result<Response<Body>> {
let mut url = self.url.clone();
{
let mut segments = url.path_segments_mut().map_err(|_| {
@@ -314,16 +322,25 @@
segments.extend(components);
}
- let req = self.client.get(url.clone()).headers(headers);
- let resp = req.send()?;
+ let uri: Uri = url.into_string().parse().map_err(|_| {
+ Error::IllegalArgument(format!("URL was 'cannot-be-a-base': {:?}", self.url))
+ })?;
- if !resp.status.is_success() {
- if resp.status == StatusCode::NotFound {
+ let req = Request::builder()
+ .uri(uri)
+ .header("User-Agent", &*self.user_agent)
+ .body(Body::default())?;
+
+ let resp = await!(self.client.request(req).compat())?;
+ let status = resp.status();
+
+ if !status.is_success() {
+ if status == StatusCode::NOT_FOUND {
Err(Error::NotFound)
} else {
Err(Error::Opaque(format!(
"Error getting {:?}: {:?}",
- url, resp
+ self.url, resp
)))
}
} else {
@@ -332,8 +349,9 @@
}
}
-impl<D> Repository<D> for HttpRepository<D>
+impl<C, D> Repository<D> for HttpRepository<C, D>
where
+ C: Connect + Sync + 'static,
D: DataInterchange,
{
/// This always returns `Err` as storing over HTTP is not yet supported.
@@ -370,10 +388,16 @@
async move {
Self::check::<M>(meta_path)?;
- let resp = self.get(&self.metadata_prefix, &meta_path.components::<D>(&version))?;
+ let components = meta_path.components::<D>(&version);
+ let resp = await!(self.get(&self.metadata_prefix, &components))?;
+
+ let stream = resp
+ .into_body()
+ .compat()
+ .map_err(|err| io::Error::new(io::ErrorKind::Other, err));
let mut reader = SafeReader::new(
- AllowStdIo::new(resp),
+ IntoAsyncRead::new(stream),
max_size.unwrap_or(::std::usize::MAX) as u64,
min_bytes_per_second,
hash_data,
@@ -403,10 +427,17 @@
) -> TufFuture<'a, Result<Box<dyn AsyncRead>>> {
Box::pinned(
async move {
- let resp = self.get(&None, &target_path.components())?;
let (alg, value) = crypto::hash_preference(target_description.hashes())?;
+ let components = target_path.components();
+ let resp = await!(self.get(&None, &components))?;
+
+ let stream = resp
+ .into_body()
+ .compat()
+ .map_err(|err| io::Error::new(io::ErrorKind::Other, err));
+
let reader = SafeReader::new(
- AllowStdIo::new(resp),
+ IntoAsyncRead::new(stream),
target_description.size(),
min_bytes_per_second,
Some((alg, value.clone())),