blob: f10c9d3006ffd91db6df7e737a7515a2de2f2001 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#![deny(missing_docs)]
//! This module provides a test HTTP(S) server that can be instantiated simply by a unit test, for
//! connecting components to where you need to vary the response(s) from the HTTP(S) server during
//! the operation of the test.
//!
//! It handles the TCP setup, letting the user specify `Handler` implementations which return the
//! responses from the server. `Handler` implementations are meant to be composable to provide
//! for fault injection and varying behavior in tests.
// This is gratuitously borrowed from src/sys/pkg/lib/fuchsia-pkg-testing/src/serve.rs, and then
// made generic across all requests by removing the repo-serving aspects of it.
use {
anyhow::Error,
chrono::Utc,
fuchsia_async::{self as fasync, net::TcpListener, Task},
fuchsia_hyper,
futures::{future::BoxFuture, prelude::*},
hyper::{
server::{accept::from_stream, Server},
service::{make_service_fn, service_fn},
Body, Request, Response, StatusCode,
},
std::{
convert::Infallible,
net::{Ipv6Addr, SocketAddr},
pin::Pin,
sync::Arc,
},
};
// Some provided Handler implementations.
pub mod handler;
// Some provided Handler implementations for injecting faults into the server's behavior.
pub mod fault_injection;
/// A "test" HTTP(S) server which is composed of `Handler` implementations, and holding the
/// connection state.
pub struct TestServer {
stop: futures::channel::oneshot::Sender<()>,
addr: SocketAddr,
use_https: bool,
task: Task<()>,
}
/// Base trait that all Handlers implement.
pub trait Handler: 'static + Send + Sync {
/// A Handler impl signals that it wishes to handle a request by returning a response for it,
/// otherwise it returns None.
fn handles(&self, request: &Request<Body>) -> Option<BoxFuture<'_, Response<Body>>>;
}
impl Handler for Arc<dyn Handler> {
fn handles(&self, request: &Request<Body>) -> Option<BoxFuture<'_, Response<Body>>> {
(**self).handles(request)
}
}
impl TestServer {
/// return the scheme of the TestServer
fn scheme(&self) -> &'static str {
if self.use_https {
"https"
} else {
"http"
}
}
/// Returns the URL that can be used to connect to this repository from this device.
pub fn local_url(&self) -> String {
format!("{}://localhost:{}", self.scheme(), self.addr.port())
}
/// Returns the URL for the given path that can be used to connect to this repository from this
/// device.
pub fn local_url_for_path(&self, path: &str) -> String {
let path = path.trim_start_matches('/');
format!("{}://localhost:{}/{}", self.scheme(), self.addr.port(), path)
}
/// Gracefully signal the server to stop and returns a future that resolves when it terminates.
pub fn stop(self) -> impl Future<Output = ()> {
self.stop.send(()).expect("remote end to still be open");
self.task
}
/// Internal helper which iterates over all Handlers until it finds one that will respond to the
/// request. It then returns that reponse. If not response is found, it returns 404 NOT_FOUND.
async fn handle_request(
handlers: Arc<Vec<Arc<dyn Handler>>>,
req: Request<Body>,
) -> Response<Body> {
let response = handlers.iter().find_map(|h| h.handles(&req));
match response {
Some(response) => response.await,
None => Response::builder().status(StatusCode::NOT_FOUND).body(Body::empty()).unwrap(),
}
}
/// Create a Builder
pub fn builder() -> TestServerBuilder {
TestServerBuilder::new()
}
}
/// A builder to construct a `TestServer`.
#[derive(Default)]
pub struct TestServerBuilder {
handlers: Vec<Arc<dyn Handler>>,
https_certs: Option<(Vec<rustls::Certificate>, rustls::PrivateKey)>,
}
impl TestServerBuilder {
/// Create a new TestServerBuilder
pub fn new() -> Self {
Self::default()
}
/// Serve over TLS, using a server certificate rooted the provided certs
pub fn use_https(mut self, cert_chain: &[u8], private_key: &[u8]) -> Self {
let cert_chain = parse_cert_chain(cert_chain);
let private_key = parse_private_key(private_key);
self.https_certs = Some((cert_chain, private_key));
self
}
/// Add a Handler which implements the server's behavior. These are given the ability to
/// handle a request in the order in which they are added to the `TestServerBuilder`.
pub fn handler(mut self, handler: impl Handler + 'static) -> Self {
self.handlers.push(Arc::new(handler));
self
}
/// Spawn the server on the current executor, returning a handle to manage the server.
pub fn start(self) -> TestServer {
let (listener, addr) = {
let addr = SocketAddr::new(Ipv6Addr::UNSPECIFIED.into(), 0);
let listener = TcpListener::bind(&addr).unwrap();
let local_addr = listener.local_addr().unwrap();
(listener, local_addr)
};
let listener = listener
.accept_stream()
.map_err(Error::from)
.map_ok(|(conn, _addr)| fuchsia_hyper::TcpStream { stream: conn });
let (connections, use_https) = if let Some((cert_chain, private_key)) = self.https_certs {
// build a server configuration using a test CA and cert chain
let mut tls_config = rustls::ServerConfig::new(rustls::NoClientAuth::new());
tls_config.set_single_cert(cert_chain, private_key).unwrap();
let tls_acceptor = tokio_rustls::TlsAcceptor::from(Arc::new(tls_config));
// wrap incoming tcp streams
(
listener
.and_then(move |conn| {
tls_acceptor.accept(conn).map(|res| match res {
Ok(conn) => {
Ok(Pin::new(Box::new(conn)) as Pin<Box<dyn AsyncReadWrite>>)
}
Err(e) => Err(Error::from(e)),
})
})
.boxed(), // connections
true, // use_https
)
} else {
(
listener
.map_ok(|conn| Pin::new(Box::new(conn)) as Pin<Box<dyn AsyncReadWrite>>)
.boxed(), // connections
false, // use_https
)
};
// This is the root Arc<Vec<Arc<dyn Handler>>>.
let handlers = Arc::new(self.handlers);
let make_svc = make_service_fn(move |_socket| {
// Each connection to the server receives a separate service_fn instance, and so needs
// it's own copy of the handlers, this is a factory of sorts.
let handlers = Arc::clone(&handlers);
async move {
Ok::<_, Infallible>(service_fn(move |req| {
// Each request made by a connection is serviced by the service_fn created from
// this scope, which is why there is another cloning of the Arc of Handlers.
let method = req.method().to_owned();
let path = req.uri().path().to_owned();
TestServer::handle_request(Arc::clone(&handlers), req)
.inspect(move |x| {
println!(
"{} [test http] {} {} => {}",
Utc::now().format("%T.%6f"),
method,
path,
x.status()
)
})
.map(Ok::<_, Infallible>)
}))
}
});
let (stop, rx_stop) = futures::channel::oneshot::channel();
let server = Server::builder(from_stream(connections))
.executor(fuchsia_hyper::Executor)
.serve(make_svc)
.with_graceful_shutdown(
rx_stop.map(|res| res.unwrap_or_else(|futures::channel::oneshot::Canceled| ())),
)
.unwrap_or_else(|e| panic!("error serving repo over http: {}", e));
let task = fasync::Task::spawn(server);
TestServer { stop, addr, use_https, task }
}
}
fn parse_cert_chain(mut bytes: &[u8]) -> Vec<rustls::Certificate> {
rustls::internal::pemfile::certs(&mut bytes).expect("certs to parse")
}
fn parse_private_key(mut bytes: &[u8]) -> rustls::PrivateKey {
let keys =
rustls::internal::pemfile::rsa_private_keys(&mut bytes).expect("private keys to parse");
assert_eq!(keys.len(), 1, "expecting a single private key");
keys.into_iter().next().unwrap()
}
trait AsyncReadWrite: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send {}
impl<T> AsyncReadWrite for T where T: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send {}
// These are a set of useful functions when writing tests.
/// Create a GET request for a given url, which can be used with any hyper client.
pub fn make_get(url: impl AsRef<str>) -> Result<Request<Body>, Error> {
Request::get(url.as_ref()).body(Body::empty()).map_err(Error::from)
}
/// Perform an HTTP GET for the given url, returning the result.
pub async fn get(url: impl AsRef<str>) -> Result<Response<Body>, Error> {
let request = make_get(url)?;
let client = fuchsia_hyper::new_client();
let response = client.request(request).await?;
Ok(response)
}
/// Collect a Response into a single Vec of bytes.
pub async fn body_as_bytes(response: Response<Body>) -> Result<Vec<u8>, Error> {
let bytes = response
.into_body()
.try_fold(Vec::new(), |mut vec, b| async move {
vec.extend(b);
Ok(vec)
})
.await?;
Ok(bytes)
}
/// Collect a Response's Body and convert the body to a tring.
pub async fn body_as_string(response: Response<Body>) -> Result<String, Error> {
let bytes = body_as_bytes(response).await?;
let string = String::from_utf8(bytes)?;
Ok(string)
}
/// Get a url and return the body of the response as a string.
pub async fn get_body_as_string(url: impl AsRef<str>) -> Result<String, Error> {
let response = get(url).await?;
body_as_string(response).await
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{fault_injection::*, handler::*};
use anyhow::anyhow;
use fasync::TimeoutExt;
#[fuchsia_async::run_singlethreaded(test)]
async fn test_start_stop() {
let server = TestServer::builder().start();
server.stop().await;
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_empty_server_404s() {
let server = TestServer::builder().start();
let result = get(server.local_url()).await;
assert_eq!(result.unwrap().status(), StatusCode::NOT_FOUND);
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_shared_handler() {
let shared: Arc<dyn Handler> = Arc::new(StaticResponse::ok_body("shared"));
let server = TestServer::builder()
.handler(ForPath::new("/a", Arc::clone(&shared)))
.handler(shared)
.start();
assert_eq!(get_body_as_string(server.local_url_for_path("/a")).await.unwrap(), "shared");
assert_eq!(get_body_as_string(server.local_url_for_path("/foo")).await.unwrap(), "shared");
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_simple_responder() {
let server = TestServer::builder().handler(StaticResponse::ok_body("some data")).start();
assert_eq!(
get_body_as_string(server.local_url_for_path("ignored")).await.unwrap(),
"some data"
);
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_simple_path() {
let server = TestServer::builder()
.handler(ForPath::new("/some/path", StaticResponse::ok_body("some data")))
.start();
assert_eq!(
get_body_as_string(server.local_url_for_path("/some/path")).await.unwrap(),
"some data"
);
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_simple_path_doesnt_respond_to_wrong_path() {
let server = TestServer::builder()
.handler(ForPath::new("/some/path", StaticResponse::ok_body("some data")))
.start();
// make sure a non-matching path fails
let result = get(server.local_url_for_path("/other/path")).await;
assert_eq!(result.unwrap().status(), StatusCode::NOT_FOUND);
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_hang() {
let server = TestServer::builder().handler(Hang).start();
let result = get(server.local_url_for_path("ignored"))
.on_timeout(std::time::Duration::from_secs(1), || Err(anyhow!("timed out")))
.await;
assert_eq!(result.unwrap_err().to_string(), Error::msg("timed out").to_string());
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_hang_body() {
let server = TestServer::builder().handler(HangBody::content_length(500)).start();
let result = get_body_as_string(server.local_url_for_path("ignored"))
.on_timeout(std::time::Duration::from_secs(1), || Err(anyhow!("timed out")))
.await;
assert_eq!(result.unwrap_err().to_string(), Error::msg("timed out").to_string());
}
}