Port over to futures and async/await
This is an experiment on porting rust-tuf over to using futures
and async/await, which is what we use on Fuchsia. This is in contrast
with #151, which uses futures 0.1, which is compatible with stable
rust, but dramatically more invasive of a change, which is why
I haven't rebased it on top of remotes/origin/develop yet.
The downside of this is that landing this patch would make rust-tuf
require nightly until async/await is stablized. Unfortunately there
is no clear date on when that is going to occur.
diff --git a/.travis.yml b/.travis.yml
index fc0b3e6..6f98dd7 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -5,8 +5,6 @@
language: rust
cache: cargo
rust:
- - stable
- - beta
- nightly
env:
diff --git a/Cargo.toml b/Cargo.toml
index fa9cede..08b9194 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -25,6 +25,7 @@
chrono = { version = "0.4", features = [ "serde" ] }
data-encoding = "2.0.0-rc.2"
derp = "0.0.11"
+futures-preview = "0.3.0-alpha.10"
hyper = "0.10"
itoa = "0.4"
log = "0.4"
diff --git a/appveyor.yml b/appveyor.yml
index b146c3a..0348f0d 100644
--- a/appveyor.yml
+++ b/appveyor.yml
@@ -10,24 +10,6 @@
environment:
matrix:
- # Rust - Stable
- - TARGET: i686-pc-windows-gnu
- RUST_VERSION: stable
- BITS: 32
- MSYS2: 1
- - TARGET: x86_64-pc-windows-msvc
- RUST_VERSION: stable
- BITS: 64
-
- # Rust - Beta
- - TARGET: i686-pc-windows-gnu
- RUST_VERSION: beta
- BITS: 32
- MSYS2: 1
- - TARGET: x86_64-pc-windows-msvc
- RUST_VERSION: beta
- BITS: 64
-
# Rust - Nightly
- TARGET: i686-pc-windows-gnu
RUST_VERSION: nightly
diff --git a/src/client.rs b/src/client.rs
index 24f6c99..6dff236 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -3,10 +3,12 @@
//! # Example
//!
//! ```no_run
+//! #![feature(async_await, await_macro, futures_api, pin)]
+//! # use futures::executor::block_on;
//! # use hyper::client::Client as HttpClient;
//! # use hyper::Url;
//! # use std::path::PathBuf;
-//! # use tuf::Tuf;
+//! # use tuf::{Result, Tuf};
//! # use tuf::crypto::KeyId;
//! # use tuf::client::{Client, Config};
//! # use tuf::metadata::{RootMetadata, SignedMetadata, Role, MetadataPath,
@@ -19,13 +21,13 @@
//! "T5vfRrM1iHpgzGwAHe7MbJH_7r4chkOAphV3OPCCv0I=",
//! ];
//!
-//! # fn main() {
+//! # fn main() -> Result<()> {
+//! # block_on(async {
//! let key_ids: Vec<KeyId> = TRUSTED_ROOT_KEY_IDS.iter()
//! .map(|k| KeyId::from_string(k).unwrap())
//! .collect();
//!
-//! let local = FileSystemRepository::<Json>::new(PathBuf::from("~/.rustup"))
-//! .unwrap();
+//! let local = FileSystemRepository::<Json>::new(PathBuf::from("~/.rustup"))?;
//!
//! let remote = HttpRepository::new(
//! Url::parse("https://static.rust-lang.org/").unwrap(),
@@ -33,30 +35,32 @@
//! Some("rustup/1.4.0".into()),
//! None);
//!
-//! let mut client = Client::with_root_pinned(
+//! let mut client = await!(Client::with_root_pinned(
//! &key_ids,
//! Config::default(),
//! local,
//! remote,
-//! ).unwrap();
-//! let _ = client.update().unwrap();
+//! ))?;
+//!
+//! let _ = await!(client.update())?;
+//! # Ok(())
+//! # })
//! # }
//! ```
use chrono::offset::Utc;
+use futures::io::{AsyncRead, AsyncReadExt, AsyncWrite};
use log::{error, warn};
-use std::io::{Read, Write};
use crate::crypto::{self, KeyId};
use crate::error::Error;
use crate::interchange::DataInterchange;
use crate::metadata::{
- Metadata, MetadataPath, MetadataVersion, Role, RootMetadata, SignedMetadata, SnapshotMetadata,
+ Metadata, MetadataPath, MetadataVersion, Role, SignedMetadata, SnapshotMetadata,
TargetDescription, TargetPath, TargetsMetadata, VirtualTargetPath,
};
use crate::repository::Repository;
use crate::tuf::Tuf;
-use crate::util::SafeReader;
use crate::Result;
/// Translates real paths (where a file is stored) into virtual paths (how it is addressed in TUF)
@@ -67,9 +71,9 @@
/// ```
/// # use tuf::client::{PathTranslator, DefaultTranslator};
/// # use tuf::metadata::{VirtualTargetPath, TargetPath};
-/// # let path = TargetPath::new("foo".into()).unwrap();
-/// # let virt = VirtualTargetPath::new("foo".into()).unwrap();
-/// # let translator = DefaultTranslator::new();
+/// let path = TargetPath::new("foo".into()).unwrap();
+/// let virt = VirtualTargetPath::new("foo".into()).unwrap();
+/// let translator = DefaultTranslator::new();
/// assert_eq!(path,
/// translator.virtual_to_real(&translator.real_to_virtual(&path).unwrap()).unwrap());
/// assert_eq!(virt,
@@ -121,23 +125,26 @@
impl<D, L, R, T> Client<D, L, R, T>
where
D: DataInterchange,
- L: Repository<D>,
- R: Repository<D>,
- T: PathTranslator,
+ L: Repository<D> + 'static,
+ R: Repository<D> + 'static,
+ T: PathTranslator + 'static,
{
/// Create a new TUF client. It will attempt to load initial root metadata from the local repo
/// and return an error if it cannot do so.
///
/// **WARNING**: This method offers weaker security guarantees than the related method
/// `with_root_pinned`.
- pub fn new(config: Config<T>, local: L, remote: R) -> Result<Self> {
- let root = local.fetch_metadata(
- &MetadataPath::from_role(&Role::Root),
- &MetadataVersion::Number(1),
+ pub async fn new(config: Config<T>, local: L, remote: R) -> Result<Self> {
+ let root_path = MetadataPath::from_role(&Role::Root);
+ let root_version = MetadataVersion::Number(1);
+
+ let root = await!(local.fetch_metadata(
+ &root_path,
+ &root_version,
&config.max_root_size,
config.min_bytes_per_second,
None,
- )?;
+ ))?;
let tuf = Tuf::from_root(root)?;
@@ -153,42 +160,40 @@
/// repositories using the provided key IDs to pin the verification.
///
/// This is the preferred method of creating a client.
- pub fn with_root_pinned<'a, I>(
- trusted_root_keys: I,
+ pub async fn with_root_pinned(
+ trusted_root_keys: &[KeyId],
config: Config<T>,
local: L,
remote: R,
- ) -> Result<Self>
- where
- I: IntoIterator<Item = &'a KeyId>,
- T: PathTranslator,
- {
+ ) -> Result<Self> {
let root_path = MetadataPath::from_role(&Role::Root);
+ let root_version = MetadataVersion::Number(1);
- let root = local
- .fetch_metadata(
- &root_path,
- &MetadataVersion::Number(1),
- &config.max_root_size,
- config.min_bytes_per_second,
- None,
- )
- .or_else(|_| -> Result<SignedMetadata<_, RootMetadata>> {
+ let root = match await!(local.fetch_metadata(
+ &root_path,
+ &root_version,
+ &config.max_root_size,
+ config.min_bytes_per_second,
+ None,
+ )) {
+ Ok(root) => root,
+ Err(_) => {
// FIXME: should we be fetching the latest version instead of version 1?
- let root = remote.fetch_metadata(
+ let root = await!(remote.fetch_metadata(
&root_path,
- &MetadataVersion::Number(1),
+ &root_version,
&config.max_root_size,
config.min_bytes_per_second,
None,
- )?;
+ ))?;
- local.store_metadata(&root_path, &MetadataVersion::Number(1), &root)?;
+ await!(local.store_metadata(&root_path, &MetadataVersion::Number(1), &root))?;
// FIXME: should we also the root as `MetadataVersion::None`?
- Ok(root)
- })?;
+ root
+ }
+ };
let tuf = Tuf::from_root_pinned(root, trusted_root_keys)?;
@@ -203,26 +208,26 @@
/// Update TUF metadata from the remote repository.
///
/// Returns `true` if an update occurred and `false` otherwise.
- pub fn update(&mut self) -> Result<bool> {
- let r = self.update_root()?;
- let ts = self.update_timestamp()?;
- let sn = self.update_snapshot()?;
- let ta = self.update_targets()?;
+ pub async fn update(&mut self) -> Result<bool> {
+ let r = await!(self.update_root())?;
+ let ts = await!(self.update_timestamp())?;
+ let sn = await!(self.update_snapshot())?;
+ let ta = await!(self.update_targets())?;
Ok(r || ts || sn || ta)
}
/// Store the metadata in the local repository. This is juts a local cache, so we ignore if it
/// experiences any errors.
- fn store_metadata<M>(
- &mut self,
- path: &MetadataPath,
- version: &MetadataVersion,
- metadata: &SignedMetadata<D, M>,
+ async fn store_metadata<'a, M>(
+ &'a mut self,
+ path: &'a MetadataPath,
+ version: &'a MetadataVersion,
+ metadata: &'a SignedMetadata<D, M>,
) where
- M: Metadata,
+ M: Metadata + 'static,
{
- match self.local.store_metadata(path, version, metadata) {
+ match await!(self.local.store_metadata(path, version, metadata)) {
Ok(()) => {}
Err(err) => {
warn!(
@@ -237,16 +242,16 @@
}
/// Returns `true` if an update occurred and `false` otherwise.
- fn update_root(&mut self) -> Result<bool> {
+ async fn update_root(&mut self) -> Result<bool> {
let root_path = MetadataPath::from_role(&Role::Root);
- let latest_root = self.remote.fetch_metadata(
+ let latest_root = await!(self.remote.fetch_metadata(
&root_path,
&MetadataVersion::None,
&self.config.max_root_size,
self.config.min_bytes_per_second,
None,
- )?;
+ ))?;
let latest_version = latest_root.version();
if latest_version < self.tuf.root().version() {
@@ -265,20 +270,20 @@
for i in (self.tuf.root().version() + 1)..latest_version {
let version = MetadataVersion::Number(i);
- let signed_root = self.remote.fetch_metadata(
+ let signed_root = await!(self.remote.fetch_metadata(
&root_path,
&version,
&self.config.max_root_size,
self.config.min_bytes_per_second,
None,
- )?;
+ ))?;
if !self.tuf.update_root(signed_root.clone())? {
error!("{}", err_msg);
return Err(Error::Programming(err_msg.into()));
}
- self.store_metadata(&root_path, &version, &signed_root);
+ await!(self.store_metadata(&root_path, &version, &signed_root));
}
if !self.tuf.update_root(latest_root.clone())? {
@@ -286,12 +291,10 @@
return Err(Error::Programming(err_msg.into()));
}
- self.store_metadata(
- &root_path,
- &MetadataVersion::Number(latest_version),
- &latest_root,
- );
- self.store_metadata(&root_path, &MetadataVersion::None, &latest_root);
+ let latest_version = MetadataVersion::Number(latest_version);
+
+ await!(self.store_metadata(&root_path, &latest_version, &latest_root,));
+ await!(self.store_metadata(&root_path, &MetadataVersion::None, &latest_root));
if self.tuf.root().expires() <= &Utc::now() {
error!("Root metadata expired, potential freeze attack");
@@ -302,25 +305,22 @@
}
/// Returns `true` if an update occurred and `false` otherwise.
- fn update_timestamp(&mut self) -> Result<bool> {
+ async fn update_timestamp(&mut self) -> Result<bool> {
let timestamp_path = MetadataPath::from_role(&Role::Timestamp);
- let signed_timestamp = self.remote.fetch_metadata(
+ let signed_timestamp = await!(self.remote.fetch_metadata(
×tamp_path,
&MetadataVersion::None,
&self.config.max_timestamp_size,
self.config.min_bytes_per_second,
None,
- )?;
+ ))?;
if self.tuf.update_timestamp(signed_timestamp.clone())? {
let latest_version = signed_timestamp.version();
+ let latest_version = MetadataVersion::Number(latest_version);
- self.store_metadata(
- ×tamp_path,
- &MetadataVersion::Number(latest_version),
- &signed_timestamp,
- );
+ await!(self.store_metadata(×tamp_path, &latest_version, &signed_timestamp,));
Ok(true)
} else {
@@ -329,7 +329,7 @@
}
/// Returns `true` if an update occurred and `false` otherwise.
- fn update_snapshot(&mut self) -> Result<bool> {
+ async fn update_snapshot(&mut self) -> Result<bool> {
// 5.3.1 Check against timestamp metadata. The hashes and version number listed in the
// timestamp metadata. If hashes and version do not match, discard the new snapshot
// metadata, abort the update cycle, and report the failure.
@@ -352,17 +352,18 @@
};
let snapshot_path = MetadataPath::from_role(&Role::Snapshot);
+ let snapshot_size = Some(snapshot_description.size());
- let signed_snapshot = self.remote.fetch_metadata(
+ let signed_snapshot = await!(self.remote.fetch_metadata(
&snapshot_path,
&version,
- &Some(snapshot_description.size()),
+ &snapshot_size,
self.config.min_bytes_per_second,
Some((alg, value.clone())),
- )?;
+ ))?;
if self.tuf.update_snapshot(signed_snapshot.clone())? {
- self.store_metadata(&snapshot_path, &version, &signed_snapshot);
+ await!(self.store_metadata(&snapshot_path, &version, &signed_snapshot));
Ok(true)
} else {
@@ -371,7 +372,7 @@
}
/// Returns `true` if an update occurred and `false` otherwise.
- fn update_targets(&mut self) -> Result<bool> {
+ async fn update_targets(&mut self) -> Result<bool> {
let targets_description = match self.tuf.snapshot() {
Some(sn) => match sn.meta().get(&MetadataPath::from_role(&Role::Targets)) {
Some(d) => Ok(d),
@@ -398,17 +399,18 @@
};
let targets_path = MetadataPath::from_role(&Role::Targets);
+ let targets_size = Some(targets_description.size());
- let signed_targets = self.remote.fetch_metadata(
+ let signed_targets = await!(self.remote.fetch_metadata(
&targets_path,
&version,
- &Some(targets_description.size()),
+ &targets_size,
self.config.min_bytes_per_second,
Some((alg, value.clone())),
- )?;
+ ))?;
if self.tuf.update_targets(signed_targets.clone())? {
- self.store_metadata(&targets_path, &version, &signed_targets);
+ await!(self.store_metadata(&targets_path, &version, &signed_targets));
Ok(true)
} else {
@@ -417,31 +419,24 @@
}
/// Fetch a target from the remote repo and write it to the local repo.
- pub fn fetch_target(&mut self, target: &TargetPath) -> Result<()> {
- let read = self._fetch_target(target)?;
- self.local.store_target(read, target)
+ pub async fn fetch_target<'a>(&'a mut self, target: &'a TargetPath) -> Result<()> {
+ let read = await!(self._fetch_target(target))?;
+ await!(self.local.store_target(read, target))
}
/// Fetch a target from the remote repo and write it to the provided writer.
- pub fn fetch_target_to_writer<W: Write>(
- &mut self,
- target: &TargetPath,
+ pub async fn fetch_target_to_writer<'a, W: AsyncWrite + 'a>(
+ &'a mut self,
+ target: &'a TargetPath,
mut write: W,
) -> Result<()> {
- let mut read = self._fetch_target(&target)?;
- let mut buf = [0; 1024];
- loop {
- let bytes_read = read.read(&mut buf)?;
- if bytes_read == 0 {
- break;
- }
- write.write_all(&buf[..bytes_read])?
- }
+ let mut read = await!(self._fetch_target(&target))?;
+ await!(read.copy_into(&mut write))?;
Ok(())
}
// TODO this should check the local repo first
- fn _fetch_target(&mut self, target: &TargetPath) -> Result<SafeReader<R::TargetRead>> {
+ async fn _fetch_target<'a>(&'a mut self, target: &'a TargetPath) -> Result<Box<dyn AsyncRead>> {
let virt = self.config.path_translator.real_to_virtual(target)?;
let snapshot = self
@@ -450,23 +445,23 @@
.ok_or_else(|| Error::MissingMetadata(Role::Snapshot))?
.clone();
let (_, target_description) =
- self.lookup_target_description(false, 0, &virt, &snapshot, None);
+ await!(self.lookup_target_description(false, 0, &virt, &snapshot, None));
let target_description = target_description?;
- self.remote.fetch_target(
+ await!(self.remote.fetch_target(
target,
&target_description,
self.config.min_bytes_per_second,
- )
+ ))
}
- fn lookup_target_description(
- &mut self,
+ async fn lookup_target_description<'a>(
+ &'a mut self,
default_terminate: bool,
current_depth: u32,
- target: &VirtualTargetPath,
- snapshot: &SnapshotMetadata,
- targets: Option<&TargetsMetadata>,
+ target: &'a VirtualTargetPath,
+ snapshot: &'a SnapshotMetadata,
+ targets: Option<&'a TargetsMetadata>,
) -> (bool, Result<TargetDescription>) {
if current_depth > self.config.max_delegation_depth {
warn!(
@@ -526,32 +521,35 @@
MetadataVersion::None
};
- let signed_meta = match self
- .local
- .fetch_metadata::<TargetsMetadata>(
- delegation.role(),
- &MetadataVersion::None,
- &Some(role_meta.size()),
- self.config.min_bytes_per_second(),
- Some((alg, value.clone())),
- )
- .or_else(|_| {
- self.remote.fetch_metadata::<TargetsMetadata>(
+ let role_size = Some(role_meta.size());
+ let signed_meta = await!(self.local.fetch_metadata::<TargetsMetadata>(
+ delegation.role(),
+ &MetadataVersion::None,
+ &role_size,
+ self.config.min_bytes_per_second(),
+ Some((alg, value.clone())),
+ ));
+
+ let signed_meta = match signed_meta {
+ Ok(signed_meta) => signed_meta,
+ Err(_) => {
+ match await!(self.remote.fetch_metadata::<TargetsMetadata>(
delegation.role(),
&version,
- &Some(role_meta.size()),
+ &role_size,
self.config.min_bytes_per_second(),
Some((alg, value.clone())),
- )
- }) {
- Ok(m) => m,
- Err(ref e) if !delegation.terminating() => {
- warn!("Failed to fetch metadata {:?}: {:?}", delegation.role(), e);
- continue;
- }
- Err(e) => {
- warn!("Failed to fetch metadata {:?}: {:?}", delegation.role(), e);
- return (true, Err(e));
+ )) {
+ Ok(m) => m,
+ Err(ref e) if !delegation.terminating() => {
+ warn!("Failed to fetch metadata {:?}: {:?}", delegation.role(), e);
+ continue;
+ }
+ Err(e) => {
+ warn!("Failed to fetch metadata {:?}: {:?}", delegation.role(), e);
+ return (true, Err(e));
+ }
+ }
}
};
@@ -560,11 +558,11 @@
.update_delegation(delegation.role(), signed_meta.clone())
{
Ok(_) => {
- match self.local.store_metadata(
+ match await!(self.local.store_metadata(
delegation.role(),
&MetadataVersion::None,
&signed_meta,
- ) {
+ )) {
Ok(_) => (),
Err(e) => warn!(
"Error storing metadata {:?} locally: {:?}",
@@ -579,13 +577,13 @@
.get(delegation.role())
.unwrap()
.clone();
- let (term, res) = self.lookup_target_description(
+ let (term, res) = await!(Box::pinned(self.lookup_target_description(
delegation.terminating(),
current_depth + 1,
target,
snapshot,
Some(meta.as_ref()),
- );
+ )));
if term && res.is_err() {
return (true, res);
@@ -766,11 +764,12 @@
use crate::crypto::{HashAlgorithm, PrivateKey, SignatureScheme};
use crate::interchange::Json;
use crate::metadata::{
- MetadataPath, MetadataVersion, RootMetadataBuilder, SnapshotMetadataBuilder,
+ MetadataPath, MetadataVersion, RootMetadata, RootMetadataBuilder, SnapshotMetadataBuilder,
TargetsMetadataBuilder, TimestampMetadataBuilder,
};
use crate::repository::EphemeralRepository;
use chrono::prelude::*;
+ use futures::executor::block_on;
use lazy_static::lazy_static;
use std::u32;
@@ -861,147 +860,136 @@
////
// Now register the metadata.
- repo.store_metadata(
+ block_on(repo.store_metadata(
&MetadataPath::from_role(&Role::Root),
&MetadataVersion::Number(1),
&root1,
- )
+ ))
.unwrap();
- repo.store_metadata(
+ block_on(repo.store_metadata(
&MetadataPath::from_role(&Role::Root),
&MetadataVersion::None,
&root1,
- )
+ ))
.unwrap();
- repo.store_metadata(
+ block_on(repo.store_metadata(
&MetadataPath::from_role(&Role::Targets),
&MetadataVersion::Number(1),
&targets,
- )
+ ))
.unwrap();
- repo.store_metadata(
+ block_on(repo.store_metadata(
&MetadataPath::from_role(&Role::Targets),
&MetadataVersion::None,
&targets,
- )
+ ))
.unwrap();
- repo.store_metadata(
+ block_on(repo.store_metadata(
&MetadataPath::from_role(&Role::Snapshot),
&MetadataVersion::Number(1),
&snapshot,
- )
+ ))
.unwrap();
- repo.store_metadata(
+ block_on(repo.store_metadata(
&MetadataPath::from_role(&Role::Snapshot),
&MetadataVersion::None,
&snapshot,
- )
+ ))
.unwrap();
- repo.store_metadata(
+ block_on(repo.store_metadata(
&MetadataPath::from_role(&Role::Timestamp),
&MetadataVersion::Number(1),
×tamp,
- )
+ ))
.unwrap();
- repo.store_metadata(
+ block_on(repo.store_metadata(
&MetadataPath::from_role(&Role::Timestamp),
&MetadataVersion::None,
×tamp,
- )
+ ))
.unwrap();
////
// Now, make sure that the local metadata got version 1.
- let mut client = Client::with_root_pinned(
- vec![KEYS[0].public().key_id()],
+ let key_ids = [KEYS[0].public().key_id().clone()];
+ let mut client = block_on(Client::with_root_pinned(
+ &key_ids,
Config::build().finish().unwrap(),
EphemeralRepository::new(),
repo,
- )
+ ))
.unwrap();
- assert_eq!(client.update(), Ok(true));
+ assert_eq!(block_on(client.update()), Ok(true));
assert_eq!(client.tuf.root().version(), 1);
assert_eq!(
root1,
- client
- .local
- .fetch_metadata::<RootMetadata>(
- &MetadataPath::from_role(&Role::Root),
- &MetadataVersion::Number(1),
- &None,
- u32::MAX,
- None
- )
- .unwrap(),
+ block_on(client.local.fetch_metadata::<RootMetadata>(
+ &MetadataPath::from_role(&Role::Root),
+ &MetadataVersion::Number(1),
+ &None,
+ u32::MAX,
+ None
+ ))
+ .unwrap(),
);
////
// Now bump the root to version 3
- client
- .remote
- .store_metadata(
- &MetadataPath::from_role(&Role::Root),
- &MetadataVersion::Number(2),
- &root2,
- )
- .unwrap();
+ block_on(client.remote.store_metadata(
+ &MetadataPath::from_role(&Role::Root),
+ &MetadataVersion::Number(2),
+ &root2,
+ ))
+ .unwrap();
- client
- .remote
- .store_metadata(
- &MetadataPath::from_role(&Role::Root),
- &MetadataVersion::None,
- &root2,
- )
- .unwrap();
+ block_on(client.remote.store_metadata(
+ &MetadataPath::from_role(&Role::Root),
+ &MetadataVersion::None,
+ &root2,
+ ))
+ .unwrap();
- client
- .remote
- .store_metadata(
- &MetadataPath::from_role(&Role::Root),
- &MetadataVersion::Number(3),
- &root3,
- )
- .unwrap();
+ block_on(client.remote.store_metadata(
+ &MetadataPath::from_role(&Role::Root),
+ &MetadataVersion::Number(3),
+ &root3,
+ ))
+ .unwrap();
- client
- .remote
- .store_metadata(
- &MetadataPath::from_role(&Role::Root),
- &MetadataVersion::None,
- &root3,
- )
- .unwrap();
+ block_on(client.remote.store_metadata(
+ &MetadataPath::from_role(&Role::Root),
+ &MetadataVersion::None,
+ &root3,
+ ))
+ .unwrap();
////
// Finally, check that the update brings us to version 3.
- assert_eq!(client.update(), Ok(true));
+ assert_eq!(block_on(client.update()), Ok(true));
assert_eq!(client.tuf.root().version(), 3);
assert_eq!(
root3,
- client
- .local
- .fetch_metadata::<RootMetadata>(
- &MetadataPath::from_role(&Role::Root),
- &MetadataVersion::Number(3),
- &None,
- u32::MAX,
- None
- )
- .unwrap(),
+ block_on(client.local.fetch_metadata::<RootMetadata>(
+ &MetadataPath::from_role(&Role::Root),
+ &MetadataVersion::Number(3),
+ &None,
+ u32::MAX,
+ None
+ ))
+ .unwrap(),
);
}
}
diff --git a/src/interchange/mod.rs b/src/interchange/mod.rs
index 47ea192..8799643 100644
--- a/src/interchange/mod.rs
+++ b/src/interchange/mod.rs
@@ -46,6 +46,11 @@
where
R: Read,
T: DeserializeOwned;
+
+ /// Read a struct from a stream.
+ fn from_slice<T>(slice: &[u8]) -> Result<T>
+ where
+ T: DeserializeOwned;
}
/// JSON data interchange.
@@ -323,4 +328,17 @@
{
Ok(serde_json::from_reader(rdr)?)
}
+
+ /// ```
+ /// # use tuf::interchange::{DataInterchange, Json};
+ /// # use std::collections::HashMap;
+ /// let jsn: &[u8] = br#"{"foo": "bar", "baz": "quux"}"#;
+ /// let _: HashMap<String, String> = Json::from_slice(&jsn).unwrap();
+ /// ```
+ fn from_slice<T>(slice: &[u8]) -> Result<T>
+ where
+ T: DeserializeOwned,
+ {
+ Ok(serde_json::from_slice(slice)?)
+ }
}
diff --git a/src/lib.rs b/src/lib.rs
index d049b96..456c329 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -109,6 +109,7 @@
clippy::op_ref,
clippy::too_many_arguments
)]
+#![feature(async_await, await_macro, futures_api, pin)]
pub mod client;
pub mod crypto;
@@ -123,7 +124,9 @@
pub use crate::error::*;
pub use crate::tuf::*;
-pub use crate::util::*;
/// Alias for `Result<T, Error>`.
pub type Result<T> = std::result::Result<T, Error>;
+
+/// Alias for `Pin<Box<dyn Future<Output = T> + 'a>>`.
+pub type TufFuture<'a, T> = std::pin::Pin<Box<dyn futures::Future<Output = T> + 'a>>;
diff --git a/src/repository.rs b/src/repository.rs
index 686a93f..518d468 100644
--- a/src/repository.rs
+++ b/src/repository.rs
@@ -1,5 +1,6 @@
//! Interfaces for interacting with different types of TUF repositories.
+use futures::io::{AllowStdIo, AsyncRead, AsyncReadExt};
use hyper::client::response::Response;
use hyper::header::{Headers, UserAgent};
use hyper::status::StatusCode;
@@ -7,7 +8,7 @@
use log::debug;
use std::collections::HashMap;
use std::fs::{DirBuilder, File};
-use std::io::{self, Cursor, Read, Write};
+use std::io::Cursor;
use std::marker::PhantomData;
use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock};
@@ -20,7 +21,7 @@
Metadata, MetadataPath, MetadataVersion, SignedMetadata, TargetDescription, TargetPath,
};
use crate::util::SafeReader;
-use crate::Result;
+use crate::{Result, TufFuture};
/// Top-level trait that represents a TUF repository and contains all the ways it can be interacted
/// with.
@@ -28,46 +29,47 @@
where
D: DataInterchange,
{
- /// The type returned when reading a target.
- type TargetRead: Read;
-
/// Store signed metadata.
///
/// Note: This **MUST** canonicalize the bytes before storing them as a read will expect the
/// hashes of the metadata to match.
- fn store_metadata<M>(
- &self,
- meta_path: &MetadataPath,
- version: &MetadataVersion,
- metadata: &SignedMetadata<D, M>,
- ) -> Result<()>
+ fn store_metadata<'a, M>(
+ &'a self,
+ meta_path: &'a MetadataPath,
+ version: &'a MetadataVersion,
+ metadata: &'a SignedMetadata<D, M>,
+ ) -> TufFuture<'a, Result<()>>
where
- M: Metadata;
+ M: Metadata + 'static;
/// Fetch signed metadata.
- fn fetch_metadata<M>(
- &self,
- meta_path: &MetadataPath,
- version: &MetadataVersion,
- max_size: &Option<usize>,
+ fn fetch_metadata<'a, M>(
+ &'a self,
+ meta_path: &'a MetadataPath,
+ version: &'a MetadataVersion,
+ max_size: &'a Option<usize>,
min_bytes_per_second: u32,
- hash_data: Option<(&HashAlgorithm, HashValue)>,
- ) -> Result<SignedMetadata<D, M>>
+ hash_data: Option<(&'static HashAlgorithm, HashValue)>,
+ ) -> TufFuture<'a, Result<SignedMetadata<D, M>>>
where
- M: Metadata;
+ M: Metadata + 'static;
/// Store the given target.
- fn store_target<R>(&self, read: R, target_path: &TargetPath) -> Result<()>
+ fn store_target<'a, R>(
+ &'a self,
+ read: R,
+ target_path: &'a TargetPath,
+ ) -> TufFuture<'a, Result<()>>
where
- R: Read;
+ R: AsyncRead + 'a;
/// Fetch the given target.
- fn fetch_target(
- &self,
- target_path: &TargetPath,
- target_description: &TargetDescription,
+ fn fetch_target<'a>(
+ &'a self,
+ target_path: &'a TargetPath,
+ target_description: &'a TargetDescription,
min_bytes_per_second: u32,
- ) -> Result<SafeReader<Self::TargetRead>>;
+ ) -> TufFuture<'a, Result<Box<dyn AsyncRead>>>;
/// Perform a sanity check that `M`, `Role`, and `MetadataPath` all desrcribe the same entity.
fn check<M>(meta_path: &MetadataPath) -> Result<()>
@@ -118,120 +120,137 @@
where
D: DataInterchange,
{
- type TargetRead = File;
-
- fn store_metadata<M>(
- &self,
- meta_path: &MetadataPath,
- version: &MetadataVersion,
- metadata: &SignedMetadata<D, M>,
- ) -> Result<()>
+ fn store_metadata<'a, M>(
+ &'a self,
+ meta_path: &'a MetadataPath,
+ version: &'a MetadataVersion,
+ metadata: &'a SignedMetadata<D, M>,
+ ) -> TufFuture<'a, Result<()>>
where
- M: Metadata,
+ M: Metadata + 'static,
{
- Self::check::<M>(meta_path)?;
+ Box::pinned(
+ async move {
+ Self::check::<M>(meta_path)?;
- let mut path = self.local_path.join("metadata");
- path.extend(meta_path.components::<D>(version));
+ let mut path = self.local_path.join("metadata");
+ path.extend(meta_path.components::<D>(version));
- if path.exists() {
- debug!("Metadata path exists. Overwriting: {:?}", path);
- }
+ if path.exists() {
+ debug!("Metadata path exists. Overwriting: {:?}", path);
+ }
- atomically_write(&path, |write| {
- D::to_writer(write, metadata)?;
- Ok(())
- })
+ let mut temp_file = create_temp_file(&path)?;
+ D::to_writer(&mut temp_file, metadata)?;
+ temp_file.persist(&path)?;
+
+ Ok(())
+ },
+ )
}
/// Fetch signed metadata.
- fn fetch_metadata<M>(
- &self,
- meta_path: &MetadataPath,
- version: &MetadataVersion,
- max_size: &Option<usize>,
+ fn fetch_metadata<'a, M>(
+ &'a self,
+ meta_path: &'a MetadataPath,
+ version: &'a MetadataVersion,
+ max_size: &'a Option<usize>,
min_bytes_per_second: u32,
- hash_data: Option<(&HashAlgorithm, HashValue)>,
- ) -> Result<SignedMetadata<D, M>>
+ hash_data: Option<(&'static HashAlgorithm, HashValue)>,
+ ) -> TufFuture<'a, Result<SignedMetadata<D, M>>>
where
- M: Metadata,
+ M: Metadata + 'static,
{
- Self::check::<M>(meta_path)?;
+ Box::pinned(
+ async move {
+ Self::check::<M>(&meta_path)?;
- let mut path = self.local_path.join("metadata");
- path.extend(meta_path.components::<D>(&version));
+ let mut path = self.local_path.join("metadata");
+ path.extend(meta_path.components::<D>(&version));
- let read = SafeReader::new(
- File::open(&path)?,
- max_size.unwrap_or(::std::usize::MAX) as u64,
- min_bytes_per_second,
- hash_data,
- )?;
+ let mut reader = SafeReader::new(
+ AllowStdIo::new(File::open(&path)?),
+ max_size.unwrap_or(::std::usize::MAX) as u64,
+ min_bytes_per_second,
+ hash_data,
+ )?;
- Ok(D::from_reader(read)?)
+ let mut buf = Vec::with_capacity(max_size.unwrap_or(0));
+ await!(reader.read_to_end(&mut buf))?;
+
+ Ok(D::from_slice(&buf)?)
+ },
+ )
}
- fn store_target<R>(&self, mut read: R, target_path: &TargetPath) -> Result<()>
+ fn store_target<'a, R>(
+ &'a self,
+ mut read: R,
+ target_path: &'a TargetPath,
+ ) -> TufFuture<'a, Result<()>>
where
- R: Read,
+ R: AsyncRead + 'a,
{
- let mut path = self.local_path.join("targets");
- path.extend(target_path.components());
+ Box::pinned(
+ async move {
+ let mut path = self.local_path.join("targets");
+ path.extend(target_path.components());
- if path.exists() {
- debug!("Target path exists. Overwriting: {:?}", path);
- }
+ if path.exists() {
+ debug!("Target path exists. Overwriting: {:?}", path);
+ }
- atomically_write(&path, |write| {
- io::copy(&mut read, write)?;
- Ok(())
- })
+ let mut temp_file = AllowStdIo::new(create_temp_file(&path)?);
+ await!(read.copy_into(&mut temp_file))?;
+ temp_file.into_inner().persist(&path)?;
+
+ Ok(())
+ },
+ )
}
- fn fetch_target(
- &self,
- target_path: &TargetPath,
- target_description: &TargetDescription,
+ fn fetch_target<'a>(
+ &'a self,
+ target_path: &'a TargetPath,
+ target_description: &'a TargetDescription,
min_bytes_per_second: u32,
- ) -> Result<SafeReader<Self::TargetRead>> {
- let mut path = self.local_path.join("targets");
- path.extend(target_path.components());
+ ) -> TufFuture<'a, Result<Box<dyn AsyncRead>>> {
+ Box::pinned(
+ async move {
+ let mut path = self.local_path.join("targets");
+ path.extend(target_path.components());
- if !path.exists() {
- return Err(Error::NotFound);
- }
+ if !path.exists() {
+ return Err(Error::NotFound);
+ }
- let (alg, value) = crypto::hash_preference(target_description.hashes())?;
+ let (alg, value) = crypto::hash_preference(target_description.hashes())?;
- SafeReader::new(
- File::open(&path)?,
- target_description.size(),
- min_bytes_per_second,
- Some((alg, value.clone())),
+ let reader: Box<dyn AsyncRead> = Box::new(SafeReader::new(
+ AllowStdIo::new(File::open(&path)?),
+ target_description.size(),
+ min_bytes_per_second,
+ Some((alg, value.clone())),
+ )?);
+
+ Ok(reader)
+ },
)
}
}
-fn atomically_write<F>(path: &Path, mut f: F) -> Result<()>
-where
- F: FnMut(&mut Write) -> Result<()>,
-{
- // We want to atomically write the file to make sure clients can never see a partially written file.
- // In order to do this, we'll write to a temporary file in the same directory as our target, otherwise
- // we risk writing the temporary file to one mountpoint, and then non-atomically copying the file to another mountpoint.
+fn create_temp_file(path: &Path) -> Result<NamedTempFile> {
+ // We want to atomically write the file to make sure clients can never see a partially written
+ // file. In order to do this, we'll write to a temporary file in the same directory as our
+ // target, otherwise we risk writing the temporary file to one mountpoint, and then
+ // non-atomically copying the file to another mountpoint.
- let mut temp_file = if let Some(parent) = path.parent() {
+ if let Some(parent) = path.parent() {
DirBuilder::new().recursive(true).create(parent)?;
- NamedTempFile::new_in(parent)?
+ Ok(NamedTempFile::new_in(parent)?)
} else {
- NamedTempFile::new_in(".")?
- };
-
- f(&mut temp_file)?;
-
- temp_file.persist(&path)?;
-
- Ok(())
+ Ok(NamedTempFile::new_in(".")?)
+ }
}
/// A repository accessible over HTTP.
@@ -317,69 +336,85 @@
where
D: DataInterchange,
{
- type TargetRead = Response;
-
/// This always returns `Err` as storing over HTTP is not yet supported.
- fn store_metadata<M>(
- &self,
- _: &MetadataPath,
- _: &MetadataVersion,
- _: &SignedMetadata<D, M>,
- ) -> Result<()>
+ fn store_metadata<'a, M>(
+ &'a self,
+ _: &'a MetadataPath,
+ _: &'a MetadataVersion,
+ _: &'a SignedMetadata<D, M>,
+ ) -> TufFuture<'a, Result<()>>
where
- M: Metadata,
+ M: Metadata + 'static,
{
- Err(Error::Opaque(
- "Http repo store metadata not implemented".to_string(),
- ))
+ Box::pinned(
+ async {
+ Err(Error::Opaque(
+ "Http repo store metadata not implemented".to_string(),
+ ))
+ },
+ )
}
- fn fetch_metadata<M>(
- &self,
- meta_path: &MetadataPath,
- version: &MetadataVersion,
- max_size: &Option<usize>,
+ fn fetch_metadata<'a, M>(
+ &'a self,
+ meta_path: &'a MetadataPath,
+ version: &'a MetadataVersion,
+ max_size: &'a Option<usize>,
min_bytes_per_second: u32,
- hash_data: Option<(&HashAlgorithm, HashValue)>,
- ) -> Result<SignedMetadata<D, M>>
+ hash_data: Option<(&'static HashAlgorithm, HashValue)>,
+ ) -> TufFuture<'a, Result<SignedMetadata<D, M>>>
where
- M: Metadata,
+ M: Metadata + 'static,
{
- Self::check::<M>(meta_path)?;
+ Box::pinned(
+ async move {
+ Self::check::<M>(meta_path)?;
- let resp = self.get(&self.metadata_prefix, &meta_path.components::<D>(&version))?;
+ let resp = self.get(&self.metadata_prefix, &meta_path.components::<D>(&version))?;
- let read = SafeReader::new(
- resp,
- max_size.unwrap_or(::std::usize::MAX) as u64,
- min_bytes_per_second,
- hash_data,
- )?;
- Ok(D::from_reader(read)?)
+ let mut reader = SafeReader::new(
+ AllowStdIo::new(resp),
+ max_size.unwrap_or(::std::usize::MAX) as u64,
+ min_bytes_per_second,
+ hash_data,
+ )?;
+
+ let mut buf = Vec::new();
+ await!(reader.read_to_end(&mut buf))?;
+
+ Ok(D::from_slice(&buf)?)
+ },
+ )
}
/// This always returns `Err` as storing over HTTP is not yet supported.
- fn store_target<R>(&self, _: R, _: &TargetPath) -> Result<()>
+ fn store_target<'a, R>(&'a self, _: R, _: &'a TargetPath) -> TufFuture<'a, Result<()>>
where
- R: Read,
+ R: AsyncRead + 'a,
{
- Err(Error::Opaque("Http repo store not implemented".to_string()))
+ Box::pinned(async { Err(Error::Opaque("Http repo store not implemented".to_string())) })
}
- fn fetch_target(
- &self,
- target_path: &TargetPath,
- target_description: &TargetDescription,
+ fn fetch_target<'a>(
+ &'a self,
+ target_path: &'a TargetPath,
+ target_description: &'a TargetDescription,
min_bytes_per_second: u32,
- ) -> Result<SafeReader<Self::TargetRead>> {
- let resp = self.get(&None, &target_path.components())?;
- let (alg, value) = crypto::hash_preference(target_description.hashes())?;
- Ok(SafeReader::new(
- resp,
- target_description.size(),
- min_bytes_per_second,
- Some((alg, value.clone())),
- )?)
+ ) -> TufFuture<'a, Result<Box<dyn AsyncRead>>> {
+ Box::pinned(
+ async move {
+ let resp = self.get(&None, &target_path.components())?;
+ let (alg, value) = crypto::hash_preference(target_description.hashes())?;
+ let reader = SafeReader::new(
+ AllowStdIo::new(resp),
+ target_description.size(),
+ min_bytes_per_second,
+ Some((alg, value.clone())),
+ )?;
+
+ Ok(Box::new(reader) as Box<dyn AsyncRead>)
+ },
+ )
}
}
@@ -422,85 +457,110 @@
where
D: DataInterchange,
{
- type TargetRead = Cursor<Vec<u8>>;
-
- fn store_metadata<M>(
- &self,
- meta_path: &MetadataPath,
- version: &MetadataVersion,
- metadata: &SignedMetadata<D, M>,
- ) -> Result<()>
+ fn store_metadata<'a, M>(
+ &'a self,
+ meta_path: &'a MetadataPath,
+ version: &'a MetadataVersion,
+ metadata: &'a SignedMetadata<D, M>,
+ ) -> TufFuture<'a, Result<()>>
where
- M: Metadata,
+ M: Metadata + 'static,
{
- Self::check::<M>(meta_path)?;
- let mut buf = Vec::new();
- D::to_writer(&mut buf, metadata)?;
- let mut metadata = self.metadata.write().unwrap();
- let _ = metadata.insert((meta_path.clone(), version.clone()), buf);
- Ok(())
+ Box::pinned(
+ async move {
+ Self::check::<M>(meta_path)?;
+ let mut buf = Vec::new();
+ D::to_writer(&mut buf, metadata)?;
+ let mut metadata = self.metadata.write().unwrap();
+ let _ = metadata.insert((meta_path.clone(), version.clone()), buf);
+ Ok(())
+ },
+ )
}
- fn fetch_metadata<M>(
- &self,
- meta_path: &MetadataPath,
- version: &MetadataVersion,
- max_size: &Option<usize>,
+ fn fetch_metadata<'a, M>(
+ &'a self,
+ meta_path: &'a MetadataPath,
+ version: &'a MetadataVersion,
+ max_size: &'a Option<usize>,
min_bytes_per_second: u32,
- hash_data: Option<(&HashAlgorithm, HashValue)>,
- ) -> Result<SignedMetadata<D, M>>
+ hash_data: Option<(&'static HashAlgorithm, HashValue)>,
+ ) -> TufFuture<'a, Result<SignedMetadata<D, M>>>
where
- M: Metadata,
+ M: Metadata + 'static,
{
- Self::check::<M>(meta_path)?;
+ Box::pinned(
+ async move {
+ Self::check::<M>(meta_path)?;
- let metadata = self.metadata.read().unwrap();
- match metadata.get(&(meta_path.clone(), version.clone())) {
- Some(bytes) => {
- let reader = SafeReader::new(
- &**bytes,
- max_size.unwrap_or(::std::usize::MAX) as u64,
- min_bytes_per_second,
- hash_data,
- )?;
- D::from_reader(reader)
- }
- None => Err(Error::NotFound),
- }
+ let metadata = self.metadata.read().unwrap();
+ match metadata.get(&(meta_path.clone(), version.clone())) {
+ Some(bytes) => {
+ let mut reader = SafeReader::new(
+ &**bytes,
+ max_size.unwrap_or(::std::usize::MAX) as u64,
+ min_bytes_per_second,
+ hash_data,
+ )?;
+
+ let mut buf = Vec::with_capacity(max_size.unwrap_or(0));
+ await!(reader.read_to_end(&mut buf))?;
+
+ D::from_slice(&buf)
+ }
+ None => Err(Error::NotFound),
+ }
+ },
+ )
}
- fn store_target<R>(&self, mut read: R, target_path: &TargetPath) -> Result<()>
+ fn store_target<'a, R>(
+ &'a self,
+ mut read: R,
+ target_path: &'a TargetPath,
+ ) -> TufFuture<'a, Result<()>>
where
- R: Read,
+ R: AsyncRead + 'a,
{
- let mut buf = Vec::new();
- read.read_to_end(&mut buf)?;
- let mut targets = self.targets.write().unwrap();
- let _ = targets.insert(target_path.clone(), buf);
- Ok(())
+ Box::pinned(
+ async move {
+ println!("EphemeralRepository.store_target: {:?}", target_path);
+ let mut buf = Vec::new();
+ await!(read.read_to_end(&mut buf))?;
+ let mut targets = self.targets.write().unwrap();
+ let _ = targets.insert(target_path.clone(), buf);
+ Ok(())
+ },
+ )
}
- fn fetch_target(
- &self,
- target_path: &TargetPath,
- target_description: &TargetDescription,
+ fn fetch_target<'a>(
+ &'a self,
+ target_path: &'a TargetPath,
+ target_description: &'a TargetDescription,
min_bytes_per_second: u32,
- ) -> Result<SafeReader<Self::TargetRead>> {
- let targets = self.targets.read().unwrap();
- match targets.get(target_path) {
- Some(bytes) => {
- let cur = Cursor::new(bytes.clone());
- let (alg, value) = crypto::hash_preference(target_description.hashes())?;
- let read = SafeReader::new(
- cur,
- target_description.size(),
- min_bytes_per_second,
- Some((alg, value.clone())),
- )?;
- Ok(read)
- }
- None => Err(Error::NotFound),
- }
+ ) -> TufFuture<'a, Result<Box<dyn AsyncRead>>> {
+ Box::pinned(
+ async move {
+ let targets = self.targets.read().unwrap();
+ match targets.get(target_path) {
+ Some(bytes) => {
+ let cur = Cursor::new(bytes.clone());
+ let (alg, value) = crypto::hash_preference(target_description.hashes())?;
+
+ let reader: Box<dyn AsyncRead> = Box::new(SafeReader::new(
+ cur,
+ target_description.size(),
+ min_bytes_per_second,
+ Some((alg, value.clone())),
+ )?);
+
+ Ok(reader)
+ }
+ None => Err(Error::NotFound),
+ }
+ },
+ )
}
}
@@ -508,69 +568,81 @@
mod test {
use super::*;
use crate::interchange::Json;
+ use futures::executor::block_on;
+ use futures::io::AsyncReadExt;
use tempfile;
#[test]
fn ephemeral_repo_targets() {
- let repo = EphemeralRepository::<Json>::new();
+ block_on(
+ async {
+ let repo = EphemeralRepository::<Json>::new();
- let data: &[u8] = b"like tears in the rain";
- let target_description =
- TargetDescription::from_reader(data, &[HashAlgorithm::Sha256]).unwrap();
- let path = TargetPath::new("batty".into()).unwrap();
- repo.store_target(data, &path).unwrap();
+ let data: &[u8] = b"like tears in the rain";
+ let target_description =
+ TargetDescription::from_reader(data, &[HashAlgorithm::Sha256]).unwrap();
+ let path = TargetPath::new("batty".into()).unwrap();
+ await!(repo.store_target(data, &path)).unwrap();
- let mut read = repo.fetch_target(&path, &target_description, 0).unwrap();
- let mut buf = Vec::new();
- read.read_to_end(&mut buf).unwrap();
- assert_eq!(buf.as_slice(), data);
+ let mut read = await!(repo.fetch_target(&path, &target_description, 0)).unwrap();
+ let mut buf = Vec::new();
+ await!(read.read_to_end(&mut buf)).unwrap();
+ assert_eq!(buf.as_slice(), data);
- let bad_data: &[u8] = b"you're in a desert";
- repo.store_target(bad_data, &path).unwrap();
- let mut read = repo.fetch_target(&path, &target_description, 0).unwrap();
- assert!(read.read_to_end(&mut buf).is_err());
+ let bad_data: &[u8] = b"you're in a desert";
+ await!(repo.store_target(bad_data, &path)).unwrap();
+ let mut read = await!(repo.fetch_target(&path, &target_description, 0)).unwrap();
+ assert!(await!(read.read_to_end(&mut buf)).is_err());
+ },
+ )
}
#[test]
fn file_system_repo_targets() {
- let temp_dir = tempfile::Builder::new()
- .prefix("rust-tuf")
- .tempdir()
- .unwrap();
- let repo = FileSystemRepository::<Json>::new(temp_dir.path().to_path_buf()).unwrap();
+ block_on(
+ async {
+ let temp_dir = tempfile::Builder::new()
+ .prefix("rust-tuf")
+ .tempdir()
+ .unwrap();
+ let repo =
+ FileSystemRepository::<Json>::new(temp_dir.path().to_path_buf()).unwrap();
- // test that init worked
- assert!(temp_dir.path().join("metadata").exists());
- assert!(temp_dir.path().join("targets").exists());
- assert!(temp_dir.path().join("temp").exists());
+ // test that init worked
+ assert!(temp_dir.path().join("metadata").exists());
+ assert!(temp_dir.path().join("targets").exists());
+ assert!(temp_dir.path().join("temp").exists());
- let data: &[u8] = b"like tears in the rain";
- let target_description =
- TargetDescription::from_reader(data, &[HashAlgorithm::Sha256]).unwrap();
- let path = TargetPath::new("foo/bar/baz".into()).unwrap();
- repo.store_target(data, &path).unwrap();
- assert!(temp_dir
- .path()
- .join("targets")
- .join("foo")
- .join("bar")
- .join("baz")
- .exists());
+ let data: &[u8] = b"like tears in the rain";
+ let target_description =
+ TargetDescription::from_reader(data, &[HashAlgorithm::Sha256]).unwrap();
+ let path = TargetPath::new("foo/bar/baz".into()).unwrap();
+ await!(repo.store_target(data, &path)).unwrap();
+ assert!(temp_dir
+ .path()
+ .join("targets")
+ .join("foo")
+ .join("bar")
+ .join("baz")
+ .exists());
- let mut buf = Vec::new();
+ let mut buf = Vec::new();
- // Enclose `fetch_target` in a scope to make sure the file is closed.
- // This is needed for `tempfile` on Windows, which doesn't open the
- // files in a mode that allows the file to be opened multiple times.
- {
- let mut read = repo.fetch_target(&path, &target_description, 0).unwrap();
- read.read_to_end(&mut buf).unwrap();
- assert_eq!(buf.as_slice(), data);
- }
+ // Enclose `fetch_target` in a scope to make sure the file is closed.
+ // This is needed for `tempfile` on Windows, which doesn't open the
+ // files in a mode that allows the file to be opened multiple times.
+ {
+ let mut read =
+ await!(repo.fetch_target(&path, &target_description, 0)).unwrap();
+ await!(read.read_to_end(&mut buf)).unwrap();
+ assert_eq!(buf.as_slice(), data);
+ }
- let bad_data: &[u8] = b"you're in a desert";
- repo.store_target(bad_data, &path).unwrap();
- let mut read = repo.fetch_target(&path, &target_description, 0).unwrap();
- assert!(read.read_to_end(&mut buf).is_err());
+ let bad_data: &[u8] = b"you're in a desert";
+ await!(repo.store_target(bad_data, &path)).unwrap();
+ let mut read = await!(repo.fetch_target(&path, &target_description, 0)).unwrap();
+ assert!(await!(read.read_to_end(&mut buf)).is_err());
+ },
+ )
}
}
diff --git a/src/util.rs b/src/util.rs
index d0deb81..afc12b9 100644
--- a/src/util.rs
+++ b/src/util.rs
@@ -1,7 +1,10 @@
use chrono::offset::Utc;
use chrono::DateTime;
+use futures::io::AsyncRead;
+use futures::task::LocalWaker;
+use futures::{try_ready, Poll};
use ring::digest::{self, SHA256, SHA512};
-use std::io::{self, ErrorKind, Read};
+use std::io::{self, ErrorKind};
use crate::crypto::{HashAlgorithm, HashValue};
use crate::error::Error;
@@ -17,7 +20,7 @@
///
/// It is **critical** that none of the bytes from this struct are used until it has been fully
/// consumed as the data is untrusted.
-pub struct SafeReader<R: Read> {
+pub struct SafeReader<R: AsyncRead> {
inner: R,
max_size: u64,
min_bytes_per_second: u32,
@@ -26,7 +29,7 @@
bytes_read: u64,
}
-impl<R: Read> SafeReader<R> {
+impl<R: AsyncRead> SafeReader<R> {
/// Create a new `SafeReader`.
///
/// The argument `hash_data` takes a `HashAlgorithm` and expected `HashValue`. The given
@@ -67,178 +70,213 @@
}
}
-impl<R: Read> Read for SafeReader<R> {
- fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
- match self.inner.read(buf) {
- Ok(read_bytes) => {
- if self.start_time.is_none() {
- self.start_time = Some(Utc::now())
- }
+impl<R: AsyncRead> AsyncRead for SafeReader<R> {
+ fn poll_read(&mut self, lw: &LocalWaker, buf: &mut [u8]) -> Poll<io::Result<usize>> {
+ let read_bytes = try_ready!(self.inner.poll_read(lw, buf));
- if read_bytes == 0 {
- if let Some((context, expected_hash)) = self.hasher.take() {
- let generated_hash = context.finish();
- if generated_hash.as_ref() != expected_hash.value() {
- return Err(io::Error::new(
- ErrorKind::InvalidData,
- "Calculated hash did not match the required hash.",
- ));
- }
- }
-
- return Ok(0);
- }
-
- match self.bytes_read.checked_add(read_bytes as u64) {
- Some(sum) if sum <= self.max_size => self.bytes_read = sum,
- _ => {
- return Err(io::Error::new(
- ErrorKind::InvalidData,
- "Read exceeded the maximum allowed bytes.",
- ));
- }
- }
-
- let duration = Utc::now().signed_duration_since(self.start_time.unwrap());
- // 30 second grace period before we start checking the bitrate
- if duration.num_seconds() >= 30 {
- if self.bytes_read as f32 / (duration.num_seconds() as f32)
- < self.min_bytes_per_second as f32
- {
- return Err(io::Error::new(
- ErrorKind::TimedOut,
- "Read aborted. Bitrate too low.",
- ));
- }
- }
-
- if let Some((ref mut context, _)) = self.hasher {
- context.update(&buf[..(read_bytes)]);
- }
-
- Ok(read_bytes)
- }
- e @ Err(_) => e,
+ if self.start_time.is_none() {
+ self.start_time = Some(Utc::now())
}
+
+ if read_bytes == 0 {
+ if let Some((context, expected_hash)) = self.hasher.take() {
+ let generated_hash = context.finish();
+ if generated_hash.as_ref() != expected_hash.value() {
+ return Poll::Ready(Err(io::Error::new(
+ ErrorKind::InvalidData,
+ "Calculated hash did not match the required hash.",
+ )));
+ }
+ }
+
+ return Poll::Ready(Ok(0));
+ }
+
+ match self.bytes_read.checked_add(read_bytes as u64) {
+ Some(sum) if sum <= self.max_size => self.bytes_read = sum,
+ _ => {
+ return Poll::Ready(Err(io::Error::new(
+ ErrorKind::InvalidData,
+ "Read exceeded the maximum allowed bytes.",
+ )));
+ }
+ }
+
+ let duration = Utc::now().signed_duration_since(self.start_time.unwrap());
+ // 30 second grace period before we start checking the bitrate
+ if duration.num_seconds() >= 30 {
+ if self.bytes_read as f32 / (duration.num_seconds() as f32)
+ < self.min_bytes_per_second as f32
+ {
+ return Poll::Ready(Err(io::Error::new(
+ ErrorKind::TimedOut,
+ "Read aborted. Bitrate too low.",
+ )));
+ }
+ }
+
+ if let Some((ref mut context, _)) = self.hasher {
+ context.update(&buf[..(read_bytes)]);
+ }
+
+ Poll::Ready(Ok(read_bytes))
}
}
#[cfg(test)]
mod test {
use super::*;
+ use futures::executor::block_on;
+ use futures::io::AsyncReadExt;
#[test]
fn valid_read() {
- let bytes: &[u8] = &[0x00, 0x01, 0x02, 0x03];
- let mut reader = SafeReader::new(bytes, bytes.len() as u64, 0, None).unwrap();
- let mut buf = Vec::new();
- assert!(reader.read_to_end(&mut buf).is_ok());
- assert_eq!(buf, bytes);
+ block_on(
+ async {
+ let bytes: &[u8] = &[0x00, 0x01, 0x02, 0x03];
+ let mut reader = SafeReader::new(bytes, bytes.len() as u64, 0, None).unwrap();
+ let mut buf = Vec::new();
+ assert!(await!(reader.read_to_end(&mut buf)).is_ok());
+ assert_eq!(buf, bytes);
+ },
+ )
}
#[test]
fn valid_read_large_data() {
- let bytes: &[u8] = &[0x00; 64 * 1024];
- let mut reader = SafeReader::new(bytes, bytes.len() as u64, 0, None).unwrap();
- let mut buf = Vec::new();
- assert!(reader.read_to_end(&mut buf).is_ok());
- assert_eq!(buf, bytes);
+ block_on(
+ async {
+ let bytes: &[u8] = &[0x00; 64 * 1024];
+ let mut reader = SafeReader::new(bytes, bytes.len() as u64, 0, None).unwrap();
+ let mut buf = Vec::new();
+ assert!(await!(reader.read_to_end(&mut buf)).is_ok());
+ assert_eq!(buf, bytes);
+ },
+ )
}
#[test]
fn valid_read_below_max_size() {
- let bytes: &[u8] = &[0x00, 0x01, 0x02, 0x03];
- let mut reader = SafeReader::new(bytes, (bytes.len() as u64) + 1, 0, None).unwrap();
- let mut buf = Vec::new();
- assert!(reader.read_to_end(&mut buf).is_ok());
- assert_eq!(buf, bytes);
+ block_on(
+ async {
+ let bytes: &[u8] = &[0x00, 0x01, 0x02, 0x03];
+ let mut reader = SafeReader::new(bytes, (bytes.len() as u64) + 1, 0, None).unwrap();
+ let mut buf = Vec::new();
+ assert!(await!(reader.read_to_end(&mut buf)).is_ok());
+ assert_eq!(buf, bytes);
+ },
+ )
}
#[test]
fn invalid_read_above_max_size() {
- let bytes: &[u8] = &[0x00, 0x01, 0x02, 0x03];
- let mut reader = SafeReader::new(bytes, (bytes.len() as u64) - 1, 0, None).unwrap();
- let mut buf = Vec::new();
- assert!(reader.read_to_end(&mut buf).is_err());
+ block_on(
+ async {
+ let bytes: &[u8] = &[0x00, 0x01, 0x02, 0x03];
+ let mut reader = SafeReader::new(bytes, (bytes.len() as u64) - 1, 0, None).unwrap();
+ let mut buf = Vec::new();
+ assert!(await!(reader.read_to_end(&mut buf)).is_err());
+ },
+ )
}
#[test]
fn invalid_read_above_max_size_large_data() {
- let bytes: &[u8] = &[0x00; 64 * 1024];
- let mut reader = SafeReader::new(bytes, (bytes.len() as u64) - 1, 0, None).unwrap();
- let mut buf = Vec::new();
- assert!(reader.read_to_end(&mut buf).is_err());
+ block_on(
+ async {
+ let bytes: &[u8] = &[0x00; 64 * 1024];
+ let mut reader = SafeReader::new(bytes, (bytes.len() as u64) - 1, 0, None).unwrap();
+ let mut buf = Vec::new();
+ assert!(await!(reader.read_to_end(&mut buf)).is_err());
+ },
+ )
}
#[test]
fn valid_read_good_hash() {
- let bytes: &[u8] = &[0x00, 0x01, 0x02, 0x03];
- let mut context = digest::Context::new(&SHA256);
- context.update(&bytes);
- let hash_value = HashValue::new(context.finish().as_ref().to_vec());
- let mut reader = SafeReader::new(
- bytes,
- bytes.len() as u64,
- 0,
- Some((&HashAlgorithm::Sha256, hash_value)),
+ block_on(
+ async {
+ let bytes: &[u8] = &[0x00, 0x01, 0x02, 0x03];
+ let mut context = digest::Context::new(&SHA256);
+ context.update(&bytes);
+ let hash_value = HashValue::new(context.finish().as_ref().to_vec());
+ let mut reader = SafeReader::new(
+ bytes,
+ bytes.len() as u64,
+ 0,
+ Some((&HashAlgorithm::Sha256, hash_value)),
+ )
+ .unwrap();
+ let mut buf = Vec::new();
+ assert!(await!(reader.read_to_end(&mut buf)).is_ok());
+ assert_eq!(buf, bytes);
+ },
)
- .unwrap();
- let mut buf = Vec::new();
- assert!(reader.read_to_end(&mut buf).is_ok());
- assert_eq!(buf, bytes);
}
#[test]
fn invalid_read_bad_hash() {
- let bytes: &[u8] = &[0x00, 0x01, 0x02, 0x03];
- let mut context = digest::Context::new(&SHA256);
- context.update(&bytes);
- context.update(&[0xFF]); // evil bytes
- let hash_value = HashValue::new(context.finish().as_ref().to_vec());
- let mut reader = SafeReader::new(
- bytes,
- bytes.len() as u64,
- 0,
- Some((&HashAlgorithm::Sha256, hash_value)),
+ block_on(
+ async {
+ let bytes: &[u8] = &[0x00, 0x01, 0x02, 0x03];
+ let mut context = digest::Context::new(&SHA256);
+ context.update(&bytes);
+ context.update(&[0xFF]); // evil bytes
+ let hash_value = HashValue::new(context.finish().as_ref().to_vec());
+ let mut reader = SafeReader::new(
+ bytes,
+ bytes.len() as u64,
+ 0,
+ Some((&HashAlgorithm::Sha256, hash_value)),
+ )
+ .unwrap();
+ let mut buf = Vec::new();
+ assert!(await!(reader.read_to_end(&mut buf)).is_err());
+ },
)
- .unwrap();
- let mut buf = Vec::new();
- assert!(reader.read_to_end(&mut buf).is_err());
}
#[test]
fn valid_read_good_hash_large_data() {
- let bytes: &[u8] = &[0x00; 64 * 1024];
- let mut context = digest::Context::new(&SHA256);
- context.update(&bytes);
- let hash_value = HashValue::new(context.finish().as_ref().to_vec());
- let mut reader = SafeReader::new(
- bytes,
- bytes.len() as u64,
- 0,
- Some((&HashAlgorithm::Sha256, hash_value)),
+ block_on(
+ async {
+ let bytes: &[u8] = &[0x00; 64 * 1024];
+ let mut context = digest::Context::new(&SHA256);
+ context.update(&bytes);
+ let hash_value = HashValue::new(context.finish().as_ref().to_vec());
+ let mut reader = SafeReader::new(
+ bytes,
+ bytes.len() as u64,
+ 0,
+ Some((&HashAlgorithm::Sha256, hash_value)),
+ )
+ .unwrap();
+ let mut buf = Vec::new();
+ assert!(await!(reader.read_to_end(&mut buf)).is_ok());
+ assert_eq!(buf, bytes);
+ },
)
- .unwrap();
- let mut buf = Vec::new();
- assert!(reader.read_to_end(&mut buf).is_ok());
- assert_eq!(buf, bytes);
}
#[test]
fn invalid_read_bad_hash_large_data() {
- let bytes: &[u8] = &[0x00; 64 * 1024];
- let mut context = digest::Context::new(&SHA256);
- context.update(&bytes);
- context.update(&[0xFF]); // evil bytes
- let hash_value = HashValue::new(context.finish().as_ref().to_vec());
- let mut reader = SafeReader::new(
- bytes,
- bytes.len() as u64,
- 0,
- Some((&HashAlgorithm::Sha256, hash_value)),
+ block_on(
+ async {
+ let bytes: &[u8] = &[0x00; 64 * 1024];
+ let mut context = digest::Context::new(&SHA256);
+ context.update(&bytes);
+ context.update(&[0xFF]); // evil bytes
+ let hash_value = HashValue::new(context.finish().as_ref().to_vec());
+ let mut reader = SafeReader::new(
+ bytes,
+ bytes.len() as u64,
+ 0,
+ Some((&HashAlgorithm::Sha256, hash_value)),
+ )
+ .unwrap();
+ let mut buf = Vec::new();
+ assert!(await!(reader.read_to_end(&mut buf)).is_err());
+ },
)
- .unwrap();
- let mut buf = Vec::new();
- assert!(reader.read_to_end(&mut buf).is_err());
}
}
diff --git a/tests/simple_example.rs b/tests/simple_example.rs
index ff7707c..2961e46 100644
--- a/tests/simple_example.rs
+++ b/tests/simple_example.rs
@@ -1,6 +1,10 @@
+#![feature(async_await, await_macro, futures_api, pin)]
+
extern crate chrono;
+extern crate futures;
extern crate tuf;
+use futures::executor::block_on;
use tuf::client::{Client, Config, PathTranslator};
use tuf::crypto::{HashAlgorithm, KeyId, PrivateKey, SignatureScheme};
use tuf::interchange::Json;
@@ -34,8 +38,12 @@
fn with_translator() {
let mut remote = EphemeralRepository::<Json>::new();
let config = Config::default();
- let root_key_ids = init_server(&mut remote, &config).unwrap();
- init_client(&root_key_ids, remote, config).unwrap();
+ block_on(
+ async {
+ let root_key_ids = await!(init_server(&mut remote, &config)).unwrap();
+ await!(init_client(&root_key_ids, remote, config)).unwrap();
+ },
+ )
}
#[test]
@@ -45,11 +53,16 @@
.path_translator(MyPathTranslator {})
.finish()
.unwrap();
- let root_key_ids = init_server(&mut remote, &config).unwrap();
- init_client(&root_key_ids, remote, config).unwrap();
+
+ block_on(
+ async {
+ let root_key_ids = await!(init_server(&mut remote, &config)).unwrap();
+ await!(init_client(&root_key_ids, remote, config)).unwrap();
+ },
+ )
}
-fn init_client<T>(
+async fn init_client<T: 'static>(
root_key_ids: &[KeyId],
remote: EphemeralRepository<Json>,
config: Config<T>,
@@ -58,12 +71,21 @@
T: PathTranslator,
{
let local = EphemeralRepository::<Json>::new();
- let mut client = Client::with_root_pinned(root_key_ids, config, local, remote)?;
- let _ = client.update()?;
- client.fetch_target(&TargetPath::new("foo-bar".into())?)
+ let mut client = await!(Client::with_root_pinned(
+ &root_key_ids,
+ config,
+ local,
+ remote
+ ))?;
+ let _ = await!(client.update())?;
+ let target_path = TargetPath::new("foo-bar".into())?;
+ await!(client.fetch_target(&target_path))
}
-fn init_server<T>(remote: &mut EphemeralRepository<Json>, config: &Config<T>) -> Result<Vec<KeyId>>
+async fn init_server<'a, T>(
+ remote: &'a mut EphemeralRepository<Json>,
+ config: &'a Config<T>,
+) -> Result<Vec<KeyId>>
where
T: PathTranslator,
{
@@ -82,23 +104,16 @@
.timestamp_key(timestamp_key.public().clone())
.signed::<Json>(&root_key)?;
- remote.store_metadata(
- &MetadataPath::new("root".into())?,
- &MetadataVersion::Number(1),
- &signed,
- )?;
- remote.store_metadata(
- &MetadataPath::new("root".into())?,
- &MetadataVersion::None,
- &signed,
- )?;
+ let root_path = MetadataPath::new("root".into())?;
+ await!(remote.store_metadata(&root_path, &MetadataVersion::Number(1), &signed,))?;
+ await!(remote.store_metadata(&root_path, &MetadataVersion::None, &signed,))?;
//// build the targets ////
let target_file: &[u8] = b"things fade, alternatives exclude";
let target_path = TargetPath::new("foo-bar".into())?;
- let _ = remote.store_target(target_file, &target_path);
+ let _ = await!(remote.store_target(target_file, &target_path));
let targets = TargetsMetadataBuilder::new()
.insert_target_from_reader(
@@ -108,16 +123,9 @@
)?
.signed::<Json>(&targets_key)?;
- remote.store_metadata(
- &MetadataPath::new("targets".into())?,
- &MetadataVersion::Number(1),
- &targets,
- )?;
- remote.store_metadata(
- &MetadataPath::new("targets".into())?,
- &MetadataVersion::None,
- &targets,
- )?;
+ let targets_path = &MetadataPath::new("targets".into())?;
+ await!(remote.store_metadata(&targets_path, &MetadataVersion::Number(1), &targets,))?;
+ await!(remote.store_metadata(&targets_path, &MetadataVersion::None, &targets,))?;
//// build the snapshot ////
@@ -125,32 +133,18 @@
.insert_metadata(&targets, &[HashAlgorithm::Sha256])?
.signed::<Json>(&snapshot_key)?;
- remote.store_metadata(
- &MetadataPath::new("snapshot".into())?,
- &MetadataVersion::Number(1),
- &snapshot,
- )?;
- remote.store_metadata(
- &MetadataPath::new("snapshot".into())?,
- &MetadataVersion::None,
- &snapshot,
- )?;
+ let snapshot_path = MetadataPath::new("snapshot".into())?;
+ await!(remote.store_metadata(&snapshot_path, &MetadataVersion::Number(1), &snapshot,))?;
+ await!(remote.store_metadata(&snapshot_path, &MetadataVersion::None, &snapshot,))?;
//// build the timestamp ////
let timestamp = TimestampMetadataBuilder::from_snapshot(&snapshot, &[HashAlgorithm::Sha256])?
.signed::<Json>(×tamp_key)?;
- remote.store_metadata(
- &MetadataPath::new("timestamp".into())?,
- &MetadataVersion::Number(1),
- ×tamp,
- )?;
- remote.store_metadata(
- &MetadataPath::new("timestamp".into())?,
- &MetadataVersion::None,
- ×tamp,
- )?;
+ let timestamp_path = MetadataPath::new("timestamp".into())?;
+ await!(remote.store_metadata(×tamp_path, &MetadataVersion::Number(1), ×tamp,))?;
+ await!(remote.store_metadata(×tamp_path, &MetadataVersion::None, ×tamp,))?;
Ok(vec![root_key.key_id().clone()])
}