| // Copyright 2019 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| //! Test tools for serving TUF repositories. |
| |
| use { |
| crate::repo::Repository, |
| anyhow::{bail, format_err, Context as _, Error}, |
| chrono::Utc, |
| fidl_fuchsia_pkg_ext::{MirrorConfig, MirrorConfigBuilder, RepositoryConfig}, |
| fuchsia_async::{self as fasync, net::TcpListener, Task}, |
| fuchsia_url::pkg_url::RepoUrl, |
| fuchsia_zircon as zx, |
| futures::{future::BoxFuture, prelude::*}, |
| http::Uri, |
| http_sse::{Event, EventSender, SseResponseCreator}, |
| hyper::{ |
| header, |
| server::{accept::from_stream, Server}, |
| service::{make_service_fn, service_fn}, |
| Body, Method, Request, Response, StatusCode, |
| }, |
| std::{ |
| convert::{Infallible, TryFrom, TryInto as _}, |
| io::{Cursor, Read as _, Seek as _}, |
| net::{IpAddr, Ipv6Addr, SocketAddr}, |
| path::{Path, PathBuf}, |
| pin::Pin, |
| sync::{ |
| atomic::{AtomicU64, Ordering}, |
| Arc, |
| }, |
| }, |
| }; |
| |
| pub mod handler; |
| |
| trait AsyncReadWrite: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send {} |
| impl<T> AsyncReadWrite for T where T: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send {} |
| |
| /// A builder to construct a test repository server. |
| pub struct ServedRepositoryBuilder { |
| repo: Arc<Repository>, |
| uri_path_override_handlers: Vec<Arc<dyn UriPathHandler>>, |
| range_uri_path_override_handlers: Vec<Arc<dyn RangeUriPathHandler>>, |
| use_https: bool, |
| bind_addr: IpAddr, |
| } |
| |
| /// Override how a `ServedRepository` responds to GET requests on valid URI paths. |
| /// Useful for injecting failures. |
| pub trait UriPathHandler: 'static + Send + Sync { |
| /// `response` is what the server would have responded with. |
| fn handle(&self, uri_path: &Path, response: Response<Body>) -> BoxFuture<'_, Response<Body>>; |
| } |
| |
| /// Override how a `ServedRepository` responds to GET requests on valid URI paths. |
| /// Useful for injecting failures. |
| pub trait RangeUriPathHandler: 'static + Send + Sync { |
| /// `response` is what the server would have responded with. |
| fn handle( |
| &self, |
| uri_path: &Path, |
| range: &http::HeaderValue, |
| response: Response<Body>, |
| ) -> BoxFuture<'_, Response<Body>>; |
| } |
| |
| impl ServedRepositoryBuilder { |
| pub(crate) fn new(repo: Arc<Repository>) -> Self { |
| ServedRepositoryBuilder { |
| repo, |
| uri_path_override_handlers: vec![], |
| range_uri_path_override_handlers: vec![], |
| use_https: false, |
| bind_addr: Ipv6Addr::UNSPECIFIED.into(), |
| } |
| } |
| |
| /// Override how the `ServedRepositoryBuilder` responds to some URI paths. |
| /// |
| /// Requests are passed through URI path handlers in the order in which they were added to this |
| /// builder. |
| pub fn uri_path_override_handler(mut self, handler: impl UriPathHandler) -> Self { |
| self.uri_path_override_handlers.push(Arc::new(handler)); |
| self |
| } |
| |
| /// Override how the `ServedRepositoryBuilder` responds to some URI paths for Range requests. |
| /// |
| /// Requests are passed through URI path handlers in the order in which they were added to this |
| /// builder. |
| pub fn range_uri_path_override_handler(mut self, handler: impl RangeUriPathHandler) -> Self { |
| self.range_uri_path_override_handlers.push(Arc::new(handler)); |
| self |
| } |
| |
| /// Serve the repository over TLS, using a server certificate rooted to |
| /// //third_party/rust_crates/vendor/rustls/test-ca/rsa/ca.cert. |
| pub fn use_https(mut self, value: bool) -> Self { |
| self.use_https = value; |
| self |
| } |
| |
| /// Bind the tcp listener to the provided ip address. Binds to Ipv6Addr::UNSPECIFIED by |
| /// default. |
| pub fn bind_to_addr(mut self, addr: impl Into<IpAddr>) -> Self { |
| self.bind_addr = addr.into(); |
| self |
| } |
| |
| /// Spawn the server on the current executor, returning a handle to manage the server. |
| pub fn start(self) -> Result<ServedRepository, Error> { |
| let (listener, addr) = { |
| let addr = SocketAddr::new(self.bind_addr, 0); |
| let listener = TcpListener::bind(&addr).context("bind")?; |
| let local_addr = listener.local_addr().context("local_addr")?; |
| (listener, local_addr) |
| }; |
| |
| let listener = listener |
| .accept_stream() |
| .map_err(Error::from) |
| .map_ok(|(conn, _addr)| fuchsia_hyper::TcpStream { stream: conn }); |
| |
| let connection_attempts = Arc::new(AtomicU64::new(0)); |
| let connections: Pin< |
| Box<dyn Stream<Item = Result<Pin<Box<dyn AsyncReadWrite>>, Error>> + Send>, |
| > = if self.use_https { |
| // build a server configuration using a test CA and cert chain |
| let certs = parse_cert_chain(&include_bytes!("../certs/server.certchain")[..]); |
| let key = parse_private_key(&include_bytes!("../certs/server.rsa")[..]); |
| let mut tls_config = rustls::ServerConfig::new(rustls::NoClientAuth::new()); |
| // Configure ALPN and prefer H2 over HTTP/1.1. |
| tls_config.set_protocols(&[b"h2".to_vec(), b"http/1.1".to_vec()]); |
| tls_config.set_single_cert(certs, key).unwrap(); |
| let tls_acceptor = tokio_rustls::TlsAcceptor::from(Arc::new(tls_config)); |
| let connection_attempts = Arc::clone(&connection_attempts); |
| |
| // wrap incoming tcp streams |
| listener |
| .and_then(move |conn| { |
| connection_attempts.fetch_add(1, Ordering::SeqCst); |
| tls_acceptor.accept(conn).map(|res| match res { |
| Ok(conn) => Ok(Pin::new(Box::new(conn)) as Pin<Box<dyn AsyncReadWrite>>), |
| Err(e) => Err(Error::from(e)), |
| }) |
| }) |
| .boxed() |
| } else { |
| let connection_attempts = Arc::clone(&connection_attempts); |
| listener |
| .map_ok(move |conn| { |
| connection_attempts.fetch_add(1, Ordering::SeqCst); |
| Pin::new(Box::new(conn)) as Pin<Box<dyn AsyncReadWrite>> |
| }) |
| .boxed() |
| }; |
| |
| let root = self.repo.path(); |
| let uri_path_override_handlers = Arc::new(self.uri_path_override_handlers); |
| let range_uri_path_override_handlers = Arc::new(self.range_uri_path_override_handlers); |
| |
| let (auto_response_creator, auto_event_sender) = |
| SseResponseCreator::with_additional_buffer_size(10); |
| let auto_response_creator = Arc::new(auto_response_creator); |
| |
| let make_svc = make_service_fn(move |_socket| { |
| let root = root.clone(); |
| let uri_path_override_handlers = Arc::clone(&uri_path_override_handlers); |
| let range_uri_path_override_handlers = Arc::clone(&range_uri_path_override_handlers); |
| let auto_response_creator = Arc::clone(&auto_response_creator); |
| |
| async move { |
| Ok::<_, Infallible>(service_fn(move |req| { |
| let method = req.method().to_owned(); |
| let path = req.uri().path().to_owned(); |
| ServedRepository::handle_tuf_repo_request( |
| root.clone(), |
| Arc::clone(&uri_path_override_handlers), |
| Arc::clone(&range_uri_path_override_handlers), |
| Arc::clone(&auto_response_creator), |
| req, |
| ) |
| .inspect(move |x| { |
| println!( |
| "{} [http repo] {} {} => {}", |
| Utc::now().format("%T.%6f"), |
| method, |
| path, |
| x.status() |
| ) |
| }) |
| .map(Ok::<_, Infallible>) |
| })) |
| } |
| }); |
| |
| let (stop, rx_stop) = futures::channel::oneshot::channel(); |
| |
| let server = Server::builder(from_stream(connections)) |
| .executor(fuchsia_hyper::Executor) |
| .serve(make_svc) |
| .with_graceful_shutdown( |
| rx_stop.map(|res| res.unwrap_or_else(|futures::channel::oneshot::Canceled| ())), |
| ) |
| .unwrap_or_else(|e| panic!("error serving repo over http: {}", e)); |
| |
| let server = Task::spawn(server); |
| |
| Ok(ServedRepository { |
| repo: self.repo, |
| stop, |
| server, |
| addr, |
| use_https: self.use_https, |
| auto_event_sender, |
| connection_attempts, |
| }) |
| } |
| } |
| |
| fn parse_cert_chain(mut bytes: &[u8]) -> Vec<rustls::Certificate> { |
| rustls::internal::pemfile::certs(&mut bytes).expect("certs to parse") |
| } |
| |
| fn parse_private_key(mut bytes: &[u8]) -> rustls::PrivateKey { |
| let keys = |
| rustls::internal::pemfile::rsa_private_keys(&mut bytes).expect("private keys to parse"); |
| assert_eq!(keys.len(), 1, "expecting a single private key"); |
| keys.into_iter().next().unwrap() |
| } |
| |
| /// A [`Repository`] being served over HTTP. |
| pub struct ServedRepository { |
| repo: Arc<Repository>, |
| stop: futures::channel::oneshot::Sender<()>, |
| server: Task<()>, |
| addr: SocketAddr, |
| use_https: bool, |
| auto_event_sender: EventSender, |
| connection_attempts: Arc<AtomicU64>, |
| } |
| |
| impl ServedRepository { |
| fn scheme(&self) -> &'static str { |
| if self.use_https { |
| "https" |
| } else { |
| "http" |
| } |
| } |
| /// Request the given path served by the repository over HTTP. |
| pub async fn get(&self, path: impl AsRef<str>) -> Result<Vec<u8>, Error> { |
| let url = format!("{}/{}", self.local_url(), path.as_ref()); |
| get(url).await |
| } |
| |
| /// Returns the URL that can be used to connect to this repository from this device. |
| pub fn local_url(&self) -> String { |
| format!("{}://localhost:{}", self.scheme(), self.addr.port()) |
| } |
| |
| /// Returns a sorted vector of all packages contained in this repository. |
| pub async fn list_packages(&self) -> Result<Vec<crate::repo::PackageEntry>, Error> { |
| let targets_json = self.get("targets.json").await?; |
| let mut packages = crate::repo::iter_packages(Cursor::new(targets_json))? |
| .collect::<Result<Vec<_>, _>>()?; |
| packages.sort_unstable(); |
| Ok(packages) |
| } |
| |
| /// Generates a [`MirrorConfigBuilder`] that points to this served repository. |
| pub fn get_mirror_config_builder(&self) -> MirrorConfigBuilder { |
| MirrorConfigBuilder::new(self.local_url().parse::<Uri>().unwrap()).unwrap() |
| } |
| |
| /// Generates a [`MirrorConfig`] that points to this served repository. |
| fn get_mirror_config(&self, subscribe: bool) -> MirrorConfig { |
| self.get_mirror_config_builder().subscribe(subscribe).build() |
| } |
| |
| /// Generate a [`RepositoryConfig`] suitable for configuring a package resolver to use this |
| /// served repository. |
| pub fn make_repo_config(&self, url: RepoUrl) -> RepositoryConfig { |
| self.repo.make_repo_config(url, Some(self.get_mirror_config(false)), false) |
| } |
| |
| /// Generate a [`RepositoryConfig`] suitable for configuring a package resolver to use this |
| /// served repository. Set subscribe on the mirror configs to true. |
| pub fn make_repo_config_with_subscribe(&self, url: RepoUrl) -> RepositoryConfig { |
| self.repo.make_repo_config(url, Some(self.get_mirror_config(true)), false) |
| } |
| |
| /// Generate a [`RepositoryConfig`] suitable for configuring a package resolver to use this |
| /// served repository with local mirroring enabled. |
| // TODO(fxbug.dev/59827) delete this method once pkg-resolver can fetch metadata from a LocalMirror. |
| pub fn make_repo_config_with_local_mirror(&self, url: RepoUrl) -> RepositoryConfig { |
| self.repo.make_repo_config(url, Some(self.get_mirror_config(false)), true) |
| } |
| |
| /// Send an SSE event to all clients subscribed to /auto. |
| pub async fn send_auto_event(&self, event: &Event) { |
| self.auto_event_sender.send(event).await |
| } |
| |
| /// Waits until `send_auto_event` would attempt to send an `Event` to exactly |
| /// `n` clients. Panics if extra clients are connected. |
| pub async fn wait_for_n_connected_auto_clients(&self, n: usize) { |
| loop { |
| let connected = self.auto_event_sender.client_count().await; |
| if connected == n { |
| break; |
| } else if connected > n { |
| panic!("ServedRepository too many auto clients connected."); |
| } |
| fasync::Timer::new(fasync::Time::after(zx::Duration::from_millis(10))).await; |
| } |
| } |
| |
| /// Errors all currently existing /auto `Response<Body>` streams. |
| pub async fn drop_all_auto_clients(&self) { |
| self.auto_event_sender.drop_all_clients().await |
| } |
| |
| /// Gracefully signal the server to stop and returns a future that resolves when it terminates. |
| pub fn stop(self) -> impl Future<Output = ()> { |
| self.stop.send(()).expect("remote end to still be open"); |
| self.server |
| } |
| |
| /// Number of connection attempts. |
| pub fn connection_attempts(&self) -> u64 { |
| self.connection_attempts.load(Ordering::SeqCst) |
| } |
| |
| async fn handle_tuf_repo_request( |
| repo: PathBuf, |
| uri_path_override_handlers: Arc<Vec<Arc<dyn UriPathHandler>>>, |
| range_uri_path_override_handlers: Arc<Vec<Arc<dyn RangeUriPathHandler>>>, |
| auto_response_creator: Arc<SseResponseCreator>, |
| req: Request<Body>, |
| ) -> Response<Body> { |
| let fail = |
| |status: StatusCode| Response::builder().status(status).body(Body::empty()).unwrap(); |
| |
| if *req.method() != Method::GET { |
| return fail(StatusCode::NOT_FOUND); |
| } else if req.uri().query().is_some() { |
| return fail(StatusCode::BAD_REQUEST); |
| } |
| |
| let uri_path = Path::new(req.uri().path()); |
| |
| // don't let queries escape the repo root. |
| if uri_path.components().any(|component| component == std::path::Component::ParentDir) { |
| return fail(StatusCode::NOT_FOUND); |
| } |
| |
| let mut response = if uri_path == PathBuf::from("/auto") { |
| auto_response_creator.create().await |
| } else { |
| let fs_path = repo.join(uri_path.strip_prefix("/").unwrap_or(uri_path)); |
| |
| if let Some(range) = req.headers().get(http::header::RANGE) { |
| make_range_response(fs_path, range) |
| } else { |
| // TODO(fxbug.dev/71372) synchronous IO in an async context. |
| let (status, data) = match std::fs::read(fs_path) { |
| Ok(data) => (StatusCode::OK, data), |
| Err(ref err) if err.kind() == std::io::ErrorKind::NotFound => { |
| (StatusCode::NOT_FOUND, "File did not exist".to_owned().into_bytes()) |
| } |
| Err(err) => { |
| eprintln!("error reading repo file: {}", err); |
| ( |
| StatusCode::INTERNAL_SERVER_ERROR, |
| "Failed to read repo file".to_owned().into_bytes(), |
| ) |
| } |
| }; |
| |
| Response::builder() |
| .status(status) |
| .header(header::CONTENT_LENGTH, data.len()) |
| .body(Body::from(data)) |
| .unwrap() |
| } |
| }; |
| |
| // TODO(fxbug.dev/71333) consider combining the handlers |
| if let Some(range) = req.headers().get(http::header::RANGE) { |
| for handler in range_uri_path_override_handlers.iter() { |
| response = handler.handle(uri_path, range, response).await |
| } |
| } else { |
| for handler in uri_path_override_handlers.iter() { |
| response = handler.handle(uri_path, response).await |
| } |
| } |
| |
| response |
| } |
| } |
| |
| // TODO(fxbug.dev/71260) use specific HTTP status codes for errors instead of mapping everything to |
| // INTERNAL_SERVER_ERROR. |
| fn make_range_response(fs_path: PathBuf, range: &http::HeaderValue) -> Response<Body> { |
| match make_range_response_impl(fs_path, range) { |
| Ok(response) => response, |
| Err(e) => { |
| eprintln!("error making range response: {:#}", e); |
| Response::builder() |
| .status(StatusCode::INTERNAL_SERVER_ERROR) |
| .body(Body::from("Error creating response".to_owned().into_bytes())) |
| .unwrap() |
| } |
| } |
| } |
| |
| fn make_range_response_impl( |
| fs_path: PathBuf, |
| range: &http::HeaderValue, |
| ) -> Result<Response<Body>, Error> { |
| let HttpRange { start, end } = range.try_into().context("parse range header")?; |
| let mut file = match std::fs::File::open(fs_path) { |
| Ok(file) => file, |
| Err(ref err) if err.kind() == std::io::ErrorKind::NotFound => { |
| return Ok(Response::builder() |
| .status(StatusCode::NOT_FOUND) |
| .body(Body::from("File did not exist".to_owned().into_bytes())) |
| .unwrap()) |
| } |
| Err(e) => Err(e).context("opening file")?, |
| }; |
| |
| let file_size = file.metadata().context("file metadata")?.len(); |
| // TODO(fxbug.dev/71372) synchronous IO in an async context. |
| // TODO(fxbug.dev/71260) return 416 if the range is invalid |
| file.seek(std::io::SeekFrom::Start(start)).context("seeking file")?; |
| let mut data = vec![0; end as usize - start as usize + 1]; |
| file.read_exact(&mut data).context("reading file for range request")?; |
| |
| Ok(Response::builder() |
| .status(StatusCode::PARTIAL_CONTENT) |
| .header(header::CONTENT_LENGTH, data.len()) |
| .header(header::CONTENT_RANGE, format!("bytes {}-{}/{}", start, end, file_size)) |
| .body(Body::from(data)) |
| .unwrap()) |
| } |
| |
| /// Parsed value of an HTTP request "Range" headers |
| pub struct HttpRange { |
| start: u64, |
| end: u64, |
| } |
| |
| impl HttpRange { |
| /// The Range start |
| pub fn start(&self) -> u64 { |
| self.start |
| } |
| |
| /// The Range start |
| pub fn end(&self) -> u64 { |
| self.end |
| } |
| } |
| |
| // TODO(fxbug.dev/71260) use a spec compliant parser |
| impl TryFrom<&http::HeaderValue> for HttpRange { |
| type Error = anyhow::Error; |
| |
| fn try_from(range: &http::HeaderValue) -> Result<Self, Self::Error> { |
| let range = range.to_str().context("range header should be ascii")?; |
| let range = if let Some(range) = range.strip_prefix("bytes=") { |
| range |
| } else { |
| bail!("range header should start with 'bytes='"); |
| }; |
| let dash = range.find('-').ok_or(anyhow::anyhow!("range header should have dash"))?; |
| let (start, end) = range.split_at(dash); |
| if end.len() < 2 { |
| bail!("range header end empty"); |
| } |
| let start = start.parse().context("valid range start")?; |
| let end = end[1..].parse().context("valid range end")?; |
| |
| if start >= end { |
| bail!("start {} >= end {}", start, end); |
| } |
| |
| Ok(HttpRange { start, end }) |
| } |
| } |
| |
| async fn get(url: impl AsRef<str>) -> Result<Vec<u8>, Error> { |
| let request = Request::get(url.as_ref()).body(Body::empty()).map_err(Error::from)?; |
| let client = fuchsia_hyper::new_client(); |
| let response = client.request(request).await?; |
| |
| if response.status() != StatusCode::OK { |
| return Err(format_err!("unexpected status code: {:?}", response.status())); |
| } |
| |
| let body = response |
| .into_body() |
| .try_fold(Vec::new(), |mut vec, b| async move { |
| vec.extend(b); |
| Ok(vec) |
| }) |
| .await?; |
| |
| Ok(body) |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use { |
| super::*, |
| crate::{package::PackageBuilder, repo::RepositoryBuilder}, |
| matches::assert_matches, |
| }; |
| |
| #[fuchsia_async::run_singlethreaded(test)] |
| #[ignore] |
| async fn test_serve_empty_hangs_on_last_get() { |
| let repo = Arc::new(RepositoryBuilder::new().build().await.unwrap()); |
| let served_repo = repo.server().start().unwrap(); |
| |
| // contains no packages. |
| let packages = served_repo.list_packages().await.unwrap(); |
| assert_eq!(packages, vec![]); |
| |
| // no '..' allowed. |
| assert_matches!(served_repo.get("blobs/../root.json").await, Err(_)); |
| |
| // getting a known file fetches something. |
| let bytes = served_repo.get("targets.json").await.unwrap(); |
| assert_ne!(bytes, Vec::<u8>::new()); |
| |
| // even if it doesn't go through the helper function. |
| let url = format!("{}/targets.json", served_repo.local_url()); |
| let also_bytes = get(&url).await.unwrap(); |
| assert_eq!(bytes, also_bytes); |
| |
| // requests fail after stopping the server |
| served_repo.stop().await; |
| |
| // FIXME(49247): this often flakes by hanging |
| assert_matches!(get(url).await, Err(_)); |
| } |
| |
| #[fuchsia_async::run_singlethreaded(test)] |
| async fn test_serve_empty() { |
| let repo = Arc::new(RepositoryBuilder::new().build().await.unwrap()); |
| let served_repo = repo.server().start().unwrap(); |
| |
| // contains no packages. |
| let packages = served_repo.list_packages().await.unwrap(); |
| assert_eq!(packages, vec![]); |
| |
| // no '..' allowed. |
| assert_matches!(served_repo.get("blobs/../root.json").await, Err(_)); |
| |
| // getting a known file fetches something. |
| let bytes = served_repo.get("targets.json").await.unwrap(); |
| assert_ne!(bytes, Vec::<u8>::new()); |
| |
| // even if it doesn't go through the helper function. |
| let url = format!("{}/targets.json", served_repo.local_url()); |
| let also_bytes = get(&url).await.unwrap(); |
| assert_eq!(bytes, also_bytes); |
| } |
| |
| #[fuchsia_async::run_singlethreaded(test)] |
| async fn test_serve_packages() { |
| let same_contents = "same contents"; |
| let repo = RepositoryBuilder::new() |
| .add_package( |
| PackageBuilder::new("rolldice") |
| .add_resource_at("bin/rolldice", "#!/boot/bin/sh\necho 4\n".as_bytes()) |
| .add_resource_at( |
| "meta/rolldice.cmx", |
| r#"{"program":{"binary":"bin/rolldice"}}"#.as_bytes(), |
| ) |
| .add_resource_at("data/duplicate_a", "same contents".as_bytes()) |
| .build() |
| .await |
| .unwrap(), |
| ) |
| .add_package( |
| PackageBuilder::new("fortune") |
| .add_resource_at( |
| "bin/fortune", |
| "#!/boot/bin/sh\necho ask again later\n".as_bytes(), |
| ) |
| .add_resource_at( |
| "meta/fortune.cmx", |
| r#"{"program":{"binary":"bin/fortune"}}"#.as_bytes(), |
| ) |
| .add_resource_at("data/duplicate_b", same_contents.as_bytes()) |
| .add_resource_at("data/duplicate_c", same_contents.as_bytes()) |
| .build() |
| .await |
| .unwrap(), |
| ) |
| .build() |
| .await |
| .unwrap(); |
| let repo = Arc::new(repo); |
| |
| let local_packages = repo.list_packages().unwrap(); |
| |
| let served_repository = repo.server().start().unwrap(); |
| let served_packages = served_repository.list_packages().await.unwrap(); |
| assert_eq!(local_packages, served_packages); |
| } |
| } |