// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//! UriPathHandler implementations
use {
crate::serve::{RangeUriPathHandler, UriPathHandler},
channel::{mpsc, oneshot},
future::{pending, ready, BoxFuture},
hyper::{header::CONTENT_LENGTH, Body, Response, StatusCode},
path::{Path, PathBuf},
atomic::{AtomicBool, AtomicU16, AtomicU32, Ordering},
/// Handler that always responds with the given status code
pub struct StaticResponseCode(StatusCode);
impl UriPathHandler for StaticResponseCode {
fn handle(&self, _uri_path: &Path, _response: Response<Body>) -> BoxFuture<'_, Response<Body>> {
impl StaticResponseCode {
/// Creates handler that always responds with the given status code
pub fn new(status: StatusCode) -> Self {
/// Creates handler that always responds with 200 OK
pub fn ok() -> Self {
/// Creates handler that always responds with 404 Not Found
pub fn not_found() -> Self {
/// Creates handler that always responds with 500 Internal Server Error
pub fn server_error() -> Self {
/// Creates handler that always responds with 429 Too Many Requests
pub fn too_many_requests() -> Self {
/// Handler that always responds with the given status code
pub struct RangeStaticResponseCode(StatusCode);
impl RangeUriPathHandler for RangeStaticResponseCode {
fn handle(
_: &Path,
_: &http::HeaderValue,
_: Response<Body>,
) -> BoxFuture<'_, Response<Body>> {
impl RangeStaticResponseCode {
/// Creates handler that always responds with 500 Internal Server Error
pub fn server_error() -> Self {
/// An atomic HTTP status code carrier.
#[derive(Debug, Default)]
pub struct DynamicResponseSetter(Arc<AtomicU16>);
impl DynamicResponseSetter {
/// Atomically sets this toggle to the supplied code.
pub fn set(&self, code: u16) {, Ordering::SeqCst);
/// Handler that replies with an externally-settable HTTP status.
pub struct DynamicResponseCode {
code: Arc<AtomicU16>,
impl UriPathHandler for DynamicResponseCode {
fn handle(&self, _uri_path: &Path, _response: Response<Body>) -> BoxFuture<'_, Response<Body>> {
impl DynamicResponseCode {
/// Creates a new handler with a (re)settable status code.
pub fn new(initial: u16) -> (Self, DynamicResponseSetter) {
let setter = DynamicResponseSetter(Arc::new(initial.into()));
(Self { code: Arc::clone(&setter.0) }, setter)
/// An atomic toggle switch.
#[derive(Debug, Default)]
pub struct AtomicToggle(Arc<AtomicBool>);
impl AtomicToggle {
/// Creates a new AtomicToggle initialized to `initial`.
pub fn new(initial: bool) -> Self {
/// Atomically sets this toggle to true.
pub fn set(&self) {, Ordering::SeqCst);
/// Atomically sets this toggle to false.
pub fn unset(&self) {, Ordering::SeqCst);
/// Handler that overrides requests with the given handler only when enabled
pub struct Toggleable<H: UriPathHandler> {
enabled: Arc<AtomicBool>,
handler: H,
impl<H: UriPathHandler> UriPathHandler for Toggleable<H> {
fn handle(&self, uri_path: &Path, response: Response<Body>) -> BoxFuture<'_, Response<Body>> {
if self.enabled.load(Ordering::SeqCst) {
self.handler.handle(uri_path, response)
} else {
impl<H: UriPathHandler> Toggleable<H> {
/// Creates handler that overrides requests when should_override is set.
pub fn new(should_override: &AtomicToggle, handler: H) -> Self {
Self { enabled: Arc::clone(&should_override.0), handler }
/// Handler that overrides the given request path for the given number of requests.
pub struct ForRequestCount<H: UriPathHandler> {
remaining: Mutex<u32>,
handler: H,
impl<H: UriPathHandler> UriPathHandler for ForRequestCount<H> {
fn handle(&self, uri_path: &Path, response: Response<Body>) -> BoxFuture<'_, Response<Body>> {
let mut remaining = self.remaining.lock();
if *remaining > 0 {
*remaining -= 1;
self.handler.handle(uri_path, response)
} else {
impl<H: UriPathHandler> ForRequestCount<H> {
/// Creates handler that overrides the given request path for the given number of requests.
pub fn new(count: u32, handler: H) -> Self {
Self { remaining: Mutex::new(count), handler }
/// Handler that overrides the given request path using the given handler.
pub struct ForPath<H: UriPathHandler> {
path: PathBuf,
handler: H,
impl<H: UriPathHandler> UriPathHandler for ForPath<H> {
fn handle(&self, uri_path: &Path, response: Response<Body>) -> BoxFuture<'_, Response<Body>> {
if self.path == uri_path {
self.handler.handle(uri_path, response)
} else {
impl<H: UriPathHandler> ForPath<H> {
/// Creates handler that overrides the given request path using the given handler.
pub fn new(path: impl Into<PathBuf>, handler: H) -> Self {
Self { path: path.into(), handler }
/// Handler that overrides the given request paths using the given handler.
pub struct ForPaths<H: UriPathHandler> {
paths: HashSet<PathBuf>,
handler: H,
impl<H: UriPathHandler> UriPathHandler for ForPaths<H> {
fn handle(&self, uri_path: &Path, response: Response<Body>) -> BoxFuture<'_, Response<Body>> {
if self.paths.contains(uri_path) {
self.handler.handle(uri_path, response)
} else {
impl<H: UriPathHandler> ForPaths<H> {
/// Creates handler that overrides the given request paths using the given handler.
pub fn new(paths: HashSet<PathBuf>, handler: H) -> Self {
Self { paths, handler }
/// Handler that overrides all the requests that start with the given request path using the
/// given handler.
pub struct ForPathPrefix<H: UriPathHandler> {
prefix: PathBuf,
handler: H,
impl<H: UriPathHandler> UriPathHandler for ForPathPrefix<H> {
fn handle(&self, uri_path: &Path, response: Response<Body>) -> BoxFuture<'_, Response<Body>> {
if uri_path.starts_with(&self.prefix) {
self.handler.handle(uri_path, response)
} else {
impl<H: UriPathHandler> ForPathPrefix<H> {
/// Creates handler that overrides all the requests that start with the given request path
/// using the given handler.
pub fn new(prefix: impl Into<PathBuf>, handler: H) -> Self {
Self { prefix: prefix.into(), handler }
/// Handler that overrides all the requests that end with the given request path using the
/// given handler. Useful for hitting all versions of versioned TUF metadata (e.g. X.targets.json).
/// TODO(ampearce): change ForPathSuffix and ForPathPrefix to use string matches rather than path.
pub struct ForPathSuffix<H: UriPathHandler> {
suffix: PathBuf,
handler: H,
impl<H: UriPathHandler> UriPathHandler for ForPathSuffix<H> {
fn handle(&self, uri_path: &Path, response: Response<Body>) -> BoxFuture<'_, Response<Body>> {
if uri_path.ends_with(&self.suffix) {
self.handler.handle(uri_path, response)
} else {
impl<H: UriPathHandler> ForPathSuffix<H> {
/// Creates handler that overrides all the requests that start with the given request path
/// using the given handler.
pub fn new(suffix: impl Into<PathBuf>, handler: H) -> Self {
Self { suffix: suffix.into(), handler }
/// Handler that passes responses through the given handler once per unique path.
pub struct OncePerPath<H: UriPathHandler> {
handler: H,
failed_paths: Mutex<HashSet<PathBuf>>,
impl<H: UriPathHandler> UriPathHandler for OncePerPath<H> {
fn handle(&self, uri_path: &Path, response: Response<Body>) -> BoxFuture<'_, Response<Body>> {
if self.failed_paths.lock().insert(uri_path.to_owned()) {
self.handler.handle(uri_path, response)
} else {
impl<H: UriPathHandler> OncePerPath<H> {
/// Creates handler that passes responses through the given handler once per unique path.
pub fn new(handler: H) -> Self {
Self { handler, failed_paths: Mutex::new(HashSet::new()) }
/// Transform a `serde_json::Value`. Implements `UriPathHandler` by assuming the `Response<Body>` is
/// json-formatted.
pub trait JsonTransformer: Send + Sync + Clone + 'static {
/// Transform a `serde_json::Value`
fn transform(&self, v: serde_json::Value) -> serde_json::Value;
impl<F> JsonTransformer for F
F: Fn(serde_json::Value) -> serde_json::Value + Send + Sync + Clone + 'static,
fn transform(&self, v: serde_json::Value) -> serde_json::Value {
/// Handler that manipulates requests with json-formatted bodies.
impl<T: JsonTransformer> UriPathHandler for T {
fn handle(&self, _uri_path: &Path, response: Response<Body>) -> BoxFuture<'_, Response<Body>> {
async move {
let bytes = body_to_bytes(response.into_body()).await;
let value = self.transform(serde_json::from_reader(bytes.as_slice()).unwrap());
let bytes = serde_json::to_vec(&value).unwrap();
.header(CONTENT_LENGTH, bytes.len())
.expect("valid response")
/// Handler that notifies a channel when it receives a request.
pub struct NotifyWhenRequested {
notify: mpsc::UnboundedSender<()>,
impl NotifyWhenRequested {
/// Creates a new handler and the receiver it notifies on request receipt.
pub fn new() -> (Self, mpsc::UnboundedReceiver<()>) {
let (tx, rx) = mpsc::unbounded();
(Self { notify: tx }, rx)
impl UriPathHandler for NotifyWhenRequested {
fn handle(&self, _uri_path: &Path, response: Response<Body>) -> BoxFuture<'_, Response<Body>> {
/// A response that is waiting to be sent.
pub struct BlockedResponse {
path: PathBuf,
unblocker: oneshot::Sender<()>,
impl BlockedResponse {
/// The path of the request.
pub fn path(&self) -> &Path {
/// Send the response.
pub fn unblock(self) {
self.unblocker.send(()).expect("request to still be pending")
/// Handler that blocks sending response headers and bodies until unblocked by a test.
pub struct BlockResponseHeaders {
blocked_responses: mpsc::UnboundedSender<BlockedResponse>,
impl BlockResponseHeaders {
/// Creates a new handler and the receiver it notifies on request receipt.
pub fn new() -> (Self, mpsc::UnboundedReceiver<BlockedResponse>) {
let (sender, receiver) = mpsc::unbounded();
(Self { blocked_responses: sender }, receiver)
impl UriPathHandler for BlockResponseHeaders {
fn handle(&self, path: &Path, response: Response<Body>) -> BoxFuture<'_, Response<Body>> {
// Return a future that notifies the test that the request was blocked and wait for it to
// unblock the response.
let path = path.to_owned();
let mut blocked_responses = self.blocked_responses.clone();
async move {
let (unblocker, waiter) = oneshot::channel();
.send(BlockedResponse { path, unblocker })
.expect("receiver to still exist");
waiter.await.expect("request to be unblocked");
/// Handler that blocks sending response bodies until unblocked by a test.
pub struct BlockResponseBodyOnce {
notify: Mutex<Option<oneshot::Sender<Box<dyn FnOnce() + Send>>>>,
impl BlockResponseBodyOnce {
/// Creates a new handler and the receiver it notifies after sending the response headers.
pub fn new() -> (Self, oneshot::Receiver<Box<dyn FnOnce() + Send>>) {
let (sender, receiver) = oneshot::channel();
(Self { notify: Mutex::new(Some(sender)) }, receiver)
impl UriPathHandler for BlockResponseBodyOnce {
fn handle(&self, _path: &Path, mut response: Response<Body>) -> BoxFuture<'_, Response<Body>> {
let notify = self.notify.lock().take().expect("a single request for this path");
async move {
// Replace the response's body with a stream that will yield data when the test
// unblocks the response body.
let (mut sender, new_body) = Body::channel();
let old_body = std::mem::replace(response.body_mut(), new_body);
let contents = body_to_bytes(old_body).await;
// Notify the test.
.send(Box::new(move || {
sender.try_send_data(contents.into()).expect("sending body")
.map_err(|_| ())
.expect("receiver to still exist");
// Yield the modified response so hyper will send the headers and wait for the body to be
// unblocked.
async fn body_to_bytes(body: Body) -> Vec<u8> {
body.try_fold(Vec::new(), |mut vec, b| async move {
.expect("body stream to complete")
/// Handler that yields the response up to the final byte, then produces an error. Panics if the
/// response contains an empty body.
pub struct OneByteShortThenError;
impl UriPathHandler for OneByteShortThenError {
fn handle(&self, _uri_path: &Path, response: Response<Body>) -> BoxFuture<'_, Response<Body>> {
async {
let mut bytes = body_to_bytes(response.into_body()).await;
if bytes.pop().is_none() {
panic!("can't short 0 bytes");
.header(CONTENT_LENGTH, bytes.len() + 1)
Err("all_but_one_byte_then_eror has sent all but one bytes".to_string()),
.expect("valid response")
/// Handler that yields the response up to the Nth byte, then produces an error. Panics if the
/// response does not contain more than N bytes.
pub struct NBytesThenError {
n: usize,
impl NBytesThenError {
/// Make a handler that returns N bytes then errors.
pub fn new(n: usize) -> Self {
Self { n }
impl UriPathHandler for NBytesThenError {
fn handle(&self, _uri_path: &Path, response: Response<Body>) -> BoxFuture<'_, Response<Body>> {
let n = self.n;
async move {
let mut bytes = body_to_bytes(response.into_body()).await;
let initial_len = bytes.len();
if initial_len <= n {
panic!("not enough bytes to shorten, {} {}", initial_len, n);
.header(CONTENT_LENGTH, initial_len)
Err("all_but_one_byte_then_eror has sent all but one bytes".to_string()),
.expect("valid response")
/// Handler that yields the response up to the Nth byte, then produces an error. Panics if the
/// response does not contain more than N bytes.
pub struct RangeNBytesThenError {
n: usize,
impl RangeNBytesThenError {
/// Make a handler that returns N bytes then errors.
pub fn new(n: usize) -> Self {
Self { n }
impl RangeUriPathHandler for RangeNBytesThenError {
fn handle(
_: &Path,
_: &http::HeaderValue,
response: Response<Body>,
) -> BoxFuture<'_, Response<Body>> {
let n = self.n;
async move {
let content_range_header =
let mut bytes = body_to_bytes(response.into_body()).await;
let initial_len = bytes.len();
if initial_len <= n {
panic!("not enough bytes to shorten, {} {}", initial_len, n);
.header(http::header::CONTENT_RANGE, content_range_header)
Err("all_but_one_byte_then_eror has sent all but one bytes".to_string()),
.expect("valid response")
/// Handler that yields the response up to the final byte, then disconnects. Panics if the
/// response contains an empty body.
pub struct OneByteShortThenDisconnect;
impl UriPathHandler for OneByteShortThenDisconnect {
fn handle(&self, _uri_path: &Path, response: Response<Body>) -> BoxFuture<'_, Response<Body>> {
async {
let mut bytes = body_to_bytes(response.into_body()).await;
if bytes.pop().is_none() {
panic!("can't short 0 bytes");
.header(CONTENT_LENGTH, bytes.len() + 1)
Result::<Vec<u8>, String>::Ok(bytes),
.expect("valid response")
/// Handler that flips the first byte of the response. Panics if the response contains an empty
/// body.
pub struct OneByteFlipped;
impl UriPathHandler for OneByteFlipped {
fn handle(&self, _uri_path: &Path, response: Response<Body>) -> BoxFuture<'_, Response<Body>> {
async {
let mut bytes = body_to_bytes(response.into_body()).await;
if bytes.is_empty() {
panic!("can't flip 0 bytes");
bytes[0] = !bytes[0];
.expect("valid response")
/// Handler that never sends bytes.
pub struct Hang;
impl UriPathHandler for Hang {
fn handle(&self, _: &Path, _: Response<Body>) -> BoxFuture<'_, Response<Body>> {
/// Handler that sends the header but then never sends body bytes.
pub struct HangBody;
impl UriPathHandler for HangBody {
fn handle(&self, _: &Path, response: Response<Body>) -> BoxFuture<'_, Response<Body>> {
async {
let content_length = body_to_bytes(response.into_body()).await.len();
.header(CONTENT_LENGTH, content_length)
.body(Body::wrap_stream(futures::stream::pending::<Result<Vec<u8>, String>>()))
.expect("valid response")
/// Handler that forwards to its wrapped handler once.
pub struct Once<H: UriPathHandler> {
already_forwarded: AtomicBool,
handler: H,
impl<H: UriPathHandler> UriPathHandler for Once<H> {
fn handle(&self, uri_path: &Path, response: Response<Body>) -> BoxFuture<'_, Response<Body>> {
if self.already_forwarded.fetch_or(true, Ordering::SeqCst) {
} else {
self.handler.handle(uri_path, response)
impl<H: UriPathHandler> Once<H> {
/// Creates a handler that forwards to `handler` once.
pub fn new(handler: H) -> Self {
Self { already_forwarded: AtomicBool::new(false), handler }
/// Range Handler that forwards to its wrapped handler once.
pub struct RangeOnce<H: RangeUriPathHandler> {
already_forwarded: AtomicBool,
handler: H,
impl<H: RangeUriPathHandler> RangeUriPathHandler for RangeOnce<H> {
fn handle(
uri_path: &Path,
range: &http::HeaderValue,
response: Response<Body>,
) -> BoxFuture<'_, Response<Body>> {
if self.already_forwarded.fetch_or(true, Ordering::SeqCst) {
} else {
self.handler.handle(uri_path, range, response)
impl<H: RangeUriPathHandler> RangeOnce<H> {
/// Creates a Range handler that forwards to `handler` once.
pub fn new(handler: H) -> Self {
Self { already_forwarded: AtomicBool::new(false), handler }
/// Handler that forwards to its wrapped handler the nth time it is called.
pub struct OverrideNth<H: UriPathHandler> {
n: u32,
call_count: AtomicU32,
handler: H,
impl<H: UriPathHandler> UriPathHandler for OverrideNth<H> {
fn handle(&self, uri_path: &Path, response: Response<Body>) -> BoxFuture<'_, Response<Body>> {
if self.call_count.fetch_add(1, Ordering::SeqCst) + 1 == self.n {
self.handler.handle(uri_path, response)
} else {
impl<H: UriPathHandler> OverrideNth<H> {
/// Creates a handler that forwards to `handler` on the nth call.
pub fn new(n: u32, handler: H) -> Self {
Self { n, call_count: AtomicU32::new(0), handler }
/// Information record by RecordingRange for each request it handles.
pub struct RangeHistoryEntry {
uri_path: PathBuf,
range: http::HeaderValue,
impl RangeHistoryEntry {
/// The uri_path of the request.
pub fn uri_path(&self) -> &Path {
/// The "Range" header of the request.
pub fn range(&self) -> &http::HeaderValue {
/// The request history recorded by RecordingRange.
pub struct RangeHistory(Arc<Mutex<Vec<RangeHistoryEntry>>>);
impl RangeHistory {
/// Take the recorded history, clearing it from the RecordingRange.
pub fn take(&self) -> Vec<RangeHistoryEntry> {
std::mem::replace(&mut self.0.lock(), vec![])
/// Handler that records the Range headers
pub struct RecordingRange {
history: RangeHistory,
impl RecordingRange {
/// Creates a handler that records all the requests.
pub fn new() -> (Self, RangeHistory) {
let history = Arc::new(Mutex::new(vec![]));
(Self { history: RangeHistory(Arc::clone(&history)) }, RangeHistory(history))
impl RangeUriPathHandler for RecordingRange {
fn handle(
uri_path: &Path,
range: &http::HeaderValue,
response: Response<Body>,
) -> BoxFuture<'_, Response<Body>> {
.push(RangeHistoryEntry { uri_path: uri_path.to_owned(), range: range.clone() });