| // Copyright 2021 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| use { |
| crate::{ |
| manager::RepositoryManager, |
| range::Range, |
| repository::{Error, Repository, RepositoryId}, |
| }, |
| anyhow::Result, |
| async_net::{TcpListener, TcpStream}, |
| chrono::Utc, |
| fuchsia_async as fasync, |
| futures::{future::Shared, prelude::*, AsyncRead, AsyncWrite, TryStreamExt}, |
| http_sse::{Event, EventSender, SseResponseCreator}, |
| hyper::{body::Body, header::RANGE, service::service_fn, Request, Response, StatusCode}, |
| log::{error, info, warn}, |
| parking_lot::{Mutex, RwLock}, |
| serde::{Deserialize, Serialize}, |
| std::{ |
| collections::{hash_map::Entry, HashMap}, |
| convert::Infallible, |
| future::Future, |
| io, |
| net::SocketAddr, |
| pin::Pin, |
| sync::{Arc, Weak}, |
| task::{Context, Poll}, |
| time::Duration, |
| }, |
| }; |
| |
| // FIXME: This value was chosen basically at random. |
| const AUTO_BUFFER_SIZE: usize = 10; |
| |
| // FIXME: This value was chosen basically at random. |
| const MAX_PARSE_RETRIES: usize = 5000; |
| |
| // FIXME: This value was chosen basically at random. |
| const PARSE_RETRY_DELAY: Duration = Duration::from_micros(100); |
| |
| type SseResponseCreatorMap = RwLock<HashMap<RepositoryId, Arc<SseResponseCreator>>>; |
| |
| /// RepositoryManager represents the web server that serves [Repositories](Repository) to a target. |
| #[derive(Debug)] |
| pub struct RepositoryServer { |
| local_addr: SocketAddr, |
| stop: futures::channel::oneshot::Sender<()>, |
| } |
| |
| impl RepositoryServer { |
| /// Gracefully signal the server to stop, and returns a future that resolves when the server |
| /// terminates. |
| pub fn stop(self) { |
| self.stop.send(()).expect("remote end to still be open"); |
| } |
| |
| /// The [RepositoryServer] is listening on this address. |
| pub fn local_addr(&self) -> SocketAddr { |
| self.local_addr |
| } |
| |
| /// Returns the URL that can be used to connect to this server. |
| pub fn local_url(&self) -> String { |
| format!("http://{}", self.local_addr) |
| } |
| |
| /// Create a [RepositoryServerBuilder]. |
| pub fn builder( |
| addr: SocketAddr, |
| repo_manager: Arc<RepositoryManager>, |
| ) -> RepositoryServerBuilder { |
| RepositoryServerBuilder::new(addr, repo_manager) |
| } |
| } |
| |
| /// [RepositoryServerBuilder] constructs [RepositoryServer] instances. |
| pub struct RepositoryServerBuilder { |
| addr: SocketAddr, |
| repo_manager: Arc<RepositoryManager>, |
| } |
| |
| impl RepositoryServerBuilder { |
| /// Create a new RepositoryServerBuilder. |
| pub fn new(addr: SocketAddr, repo_manager: Arc<RepositoryManager>) -> Self { |
| Self { addr, repo_manager } |
| } |
| |
| /// Construct a web server future, and return a [RepositoryServer] to manage the server. |
| /// [RepositoryServer], and return a handle to manaserver and the web server task. |
| pub async fn start( |
| self, |
| ) -> Result<( |
| impl Future<Output = ()>, |
| futures::channel::mpsc::UnboundedSender<Result<ConnectionStream>>, |
| RepositoryServer, |
| )> { |
| let listener = TcpListener::bind(&self.addr).await?; |
| let local_addr = listener.local_addr()?; |
| |
| let (tx_stop_server, rx_stop_server) = futures::channel::oneshot::channel(); |
| let (tx_conns, rx_conns) = futures::channel::mpsc::unbounded(); |
| |
| let server_fut = |
| run_server(listener, rx_conns, rx_stop_server, Arc::clone(&self.repo_manager)); |
| |
| Ok((server_fut, tx_conns, RepositoryServer { local_addr, stop: tx_stop_server })) |
| } |
| } |
| |
| /// Executor to help run tasks in the background, but still allow these tasks to be cleaned up when |
| /// the server is shut down. |
| #[derive(Debug, Default, Clone)] |
| struct TaskExecutor<T: 'static> { |
| inner: Arc<Mutex<TaskExecutorInner<T>>>, |
| } |
| |
| #[derive(Debug, Default)] |
| struct TaskExecutorInner<T> { |
| next_task_id: u64, |
| tasks: HashMap<u64, fasync::Task<T>>, |
| } |
| |
| impl<T> TaskExecutor<T> { |
| pub fn new() -> Self { |
| Self { |
| inner: Arc::new(Mutex::new(TaskExecutorInner { |
| next_task_id: 0, |
| tasks: HashMap::new(), |
| })), |
| } |
| } |
| |
| /// Spawn a task in the executor. |
| pub fn spawn<F: Future<Output = T> + 'static>(&self, fut: F) { |
| let fut_inner = Arc::clone(&self.inner); |
| |
| let mut inner = self.inner.lock(); |
| |
| // We could technically have a collision when the task id overflows, but if we spawned a |
| // task once a nanosecond, we still wouldn't overflow for 584 years. But lets add an |
| // assertion just to be safe. |
| let task_id = inner.next_task_id; |
| inner.next_task_id = inner.next_task_id.wrapping_add(1); |
| assert!(!inner.tasks.contains_key(&task_id)); |
| |
| let fut = async move { |
| let res = fut.await; |
| // Clean up our entry when the task completes. |
| fut_inner.lock().tasks.remove(&task_id); |
| res |
| }; |
| |
| inner.tasks.insert(task_id, fasync::Task::local(fut)); |
| } |
| } |
| |
| impl<T, F: Future<Output = T> + 'static> hyper::rt::Executor<F> for TaskExecutor<T> { |
| fn execute(&self, fut: F) { |
| self.spawn(fut); |
| } |
| } |
| |
| /// Starts the server loop. |
| async fn run_server( |
| listener: TcpListener, |
| tunnel_conns: futures::channel::mpsc::UnboundedReceiver<Result<ConnectionStream>>, |
| server_stopped: futures::channel::oneshot::Receiver<()>, |
| server_repo_manager: Arc<RepositoryManager>, |
| ) { |
| // Turn the shutdown signal into a shared future, so we can signal to the server to and all the |
| // auto/ SSE processes, to initiate a graceful shutdown. |
| let mut server_stopped = server_stopped.shared(); |
| |
| // Merge the listener connections with the tunnel connections. |
| let mut incoming = futures::stream::select( |
| listener.incoming().map_ok(ConnectionStream::Tcp).map_err(Into::into), |
| tunnel_conns, |
| ); |
| |
| // Spawn all connections and related tasks in this executor. |
| let executor = TaskExecutor::new(); |
| |
| // Contains all the SSE services. |
| let server_sse_response_creators = Arc::new(RwLock::new(HashMap::new())); |
| |
| loop { |
| let mut next = incoming.next().fuse(); |
| let conn = futures::select! { |
| conn = next => { |
| match conn { |
| Some(Ok(conn)) => conn, |
| Some(Err(err)) => { |
| error!("failed to accept connection: {:?}", err); |
| continue; |
| } |
| None => { |
| unreachable!( |
| "incoming stream has ended, which should be impossible \ |
| according to async_net::TcpListener logs" |
| ); |
| } |
| } |
| }, |
| _ = server_stopped => { |
| break; |
| }, |
| }; |
| |
| let service_rx_stop = server_stopped.clone(); |
| let service_repo_manager = Arc::clone(&server_repo_manager); |
| let service_sse_response_creators = Arc::clone(&server_sse_response_creators); |
| |
| executor.spawn(handle_connection( |
| executor.clone(), |
| service_rx_stop, |
| service_repo_manager, |
| service_sse_response_creators, |
| conn, |
| )); |
| } |
| } |
| |
| async fn handle_connection( |
| executor: TaskExecutor<()>, |
| server_stopped: Shared<futures::channel::oneshot::Receiver<()>>, |
| repo_manager: Arc<RepositoryManager>, |
| sse_response_creators: Arc<SseResponseCreatorMap>, |
| conn: ConnectionStream, |
| ) { |
| let service_server_stopped = server_stopped.clone(); |
| let conn = hyper::server::conn::Http::new().with_executor(executor.clone()).serve_connection( |
| conn, |
| service_fn(|req| { |
| // Each request made by a connection is serviced by the |
| // service_fn created from this scope, which is why there is |
| // another cloning of the repository manager. |
| let method = req.method().to_string(); |
| let path = req.uri().path().to_string(); |
| |
| handle_request( |
| executor.clone(), |
| service_server_stopped.clone(), |
| Arc::clone(&repo_manager), |
| Arc::clone(&sse_response_creators), |
| req, |
| ) |
| .inspect(move |resp| { |
| info!( |
| "{} [ffx] {} {} => {}", |
| Utc::now().format("%T.%6f"), |
| method, |
| path, |
| resp.status() |
| ); |
| }) |
| .map(Ok::<_, Infallible>) |
| }), |
| ); |
| |
| let conn = GracefulConnection { conn, stop: server_stopped, shutting_down: false }; |
| |
| if let Err(err) = conn.await { |
| error!("Error while serving HTTP connection: {}", err); |
| } |
| } |
| |
| /// [GracefulConnection] will signal to the connection to shut down if we receive a shutdown signal |
| /// on the `stop` channel. |
| #[pin_project::pin_project] |
| struct GracefulConnection<S> |
| where |
| S: hyper::service::Service<Request<Body>, Response = Response<Body>>, |
| S::Error: std::error::Error + Send + Sync + 'static, |
| S::Future: 'static, |
| { |
| #[pin] |
| stop: Shared<futures::channel::oneshot::Receiver<()>>, |
| |
| /// The hyper connection. |
| #[pin] |
| conn: hyper::server::conn::Connection<ConnectionStream, S, TaskExecutor<()>>, |
| |
| shutting_down: bool, |
| } |
| |
| impl<S> Future for GracefulConnection<S> |
| where |
| S: hyper::service::Service<Request<Body>, Response = Response<Body>>, |
| S::Error: std::error::Error + Send + Sync + 'static, |
| S::Future: 'static, |
| { |
| type Output = Result<(), hyper::Error>; |
| |
| fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| let mut this = self.project(); |
| if !*this.shutting_down { |
| match this.stop.poll(cx) { |
| Poll::Pending => {} |
| Poll::Ready(_) => { |
| *this.shutting_down = true; |
| |
| // Tell the connection to begin to gracefully shut down the connection. |
| // According to the [docs], we need to continue polling the connection until |
| // completion. This allows hyper to flush any send queues, or emit any |
| // HTTP-level shutdown messages. That would help the client to distinguish |
| // the server going away on purpose, or some other unexpected error. |
| // |
| // [docs]: https://docs.rs/hyper/latest/hyper/server/conn/struct.Connection.html#method.graceful_shutdown |
| this.conn.as_mut().graceful_shutdown(); |
| } |
| } |
| } |
| |
| match this.conn.poll(cx) { |
| Poll::Pending => Poll::Pending, |
| Poll::Ready(x) => Poll::Ready(x), |
| } |
| } |
| } |
| |
| async fn handle_request( |
| executor: TaskExecutor<()>, |
| server_stopped: Shared<futures::channel::oneshot::Receiver<()>>, |
| repo_manager: Arc<RepositoryManager>, |
| sse_response_creators: Arc<SseResponseCreatorMap>, |
| req: Request<Body>, |
| ) -> Response<Body> { |
| let mut path = req.uri().path(); |
| |
| // Ignore the leading slash. |
| if path.starts_with('/') { |
| path = &path[1..]; |
| } |
| |
| let mut parts = path.splitn(2, '/'); |
| let repo_name = parts.next().expect("split should produce at least 1 item"); |
| |
| if repo_name.is_empty() { |
| // FIXME: consider returning a list of repositories to make it easier for users to browse |
| // the repository server. |
| return status_response(StatusCode::NOT_FOUND); |
| } |
| |
| let resource_path = if let Some(resource_path) = parts.next() { |
| resource_path |
| } else { |
| // FIXME: consider returning a list of repositories to make it easier for users to browse |
| // the repository server. |
| return status_response(StatusCode::NOT_FOUND); |
| }; |
| |
| if resource_path.is_empty() { |
| // FIXME: consider returning a list of repositories to make it easier for users to browse |
| // the repository server. |
| return status_response(StatusCode::NOT_FOUND); |
| } |
| |
| let repo = if let Some(repo) = repo_manager.get(repo_name) { |
| repo |
| } else { |
| warn!("could not find repository {}", repo_name); |
| return status_response(StatusCode::NOT_FOUND); |
| }; |
| let headers = req.headers(); |
| let range = if let Some(header) = headers.get(RANGE) { |
| if let Ok(range) = Range::from_http_range_header(header) { |
| range |
| } else { |
| return status_response(StatusCode::RANGE_NOT_SATISFIABLE); |
| } |
| } else { |
| Range::Full |
| }; |
| |
| let resource = match resource_path { |
| "auto" => { |
| if repo.supports_watch() { |
| return handle_auto(executor, server_stopped, repo, sse_response_creators).await; |
| } else { |
| // The repo doesn't support watching. |
| return status_response(StatusCode::NOT_FOUND); |
| } |
| } |
| _ => { |
| let res = if let Some(resource_path) = resource_path.strip_prefix("blobs/") { |
| repo.fetch_blob_range(resource_path, range.clone()).await |
| } else { |
| repo.fetch_metadata_range(resource_path, range.clone()).await |
| }; |
| |
| match res { |
| Ok(file) => file, |
| Err(Error::NotFound) => { |
| warn!("could not find resource: {}", resource_path); |
| return status_response(StatusCode::NOT_FOUND); |
| } |
| Err(Error::InvalidPath(path)) => { |
| warn!("invalid path: {}", path); |
| return status_response(StatusCode::BAD_REQUEST); |
| } |
| Err(Error::RangeNotSatisfiable) => { |
| warn!("invalid range: {:?}", range); |
| return status_response(StatusCode::RANGE_NOT_SATISFIABLE); |
| } |
| Err(err) => { |
| error!("error fetching file {}: {:?}", resource_path, err); |
| return status_response(StatusCode::INTERNAL_SERVER_ERROR); |
| } |
| } |
| } |
| }; |
| |
| // Send the response back to the caller. |
| let mut builder = Response::builder() |
| .header("Accept-Ranges", "bytes") |
| .header("Content-Length", resource.content_len()); |
| |
| // If we requested a subset of the file, respond with the partial content headers. |
| builder = if let Some(content_range) = resource.content_range.to_http_content_range_header() { |
| builder.header("Content-Range", content_range).status(StatusCode::PARTIAL_CONTENT) |
| } else { |
| builder.status(StatusCode::OK) |
| }; |
| |
| builder.body(Body::wrap_stream(resource.stream)).unwrap() |
| } |
| |
| async fn handle_auto( |
| executor: TaskExecutor<()>, |
| mut server_stopped: Shared<futures::channel::oneshot::Receiver<()>>, |
| repo: Arc<Repository>, |
| sse_response_creators: Arc<SseResponseCreatorMap>, |
| ) -> Response<Body> { |
| let id = repo.id(); |
| let response_creator = sse_response_creators.read().get(&id).map(Arc::clone); |
| |
| // Exit early if we've already created an auto-handler. |
| if let Some(response_creator) = response_creator { |
| return response_creator.create().await; |
| } |
| |
| // Otherwise, create a timestamp watch stream. We'll do it racily to avoid holding the lock and |
| // blocking the executor. |
| let watcher = match repo.watch() { |
| Ok(watcher) => watcher, |
| Err(err) => { |
| warn!("error creating file watcher: {}", err); |
| return status_response(StatusCode::INTERNAL_SERVER_ERROR); |
| } |
| }; |
| |
| // Next, create a response creator. It's possible we raced another call, which could have |
| // already created a creator for us. This is denoted by `sender` being `None`. |
| let (response_creator, sender) = match sse_response_creators.write().entry(id) { |
| Entry::Occupied(entry) => (Arc::clone(entry.get()), None), |
| Entry::Vacant(entry) => { |
| // Next, create a response creator. |
| let (response_creator, sender) = |
| SseResponseCreator::with_additional_buffer_size(AUTO_BUFFER_SIZE); |
| |
| let response_creator = Arc::new(response_creator); |
| entry.insert(Arc::clone(&response_creator)); |
| |
| (response_creator, Some(sender)) |
| } |
| }; |
| |
| // Spawn the watcher if one doesn't exist already. This will run in the background, and register |
| // a drop callback that will shut down the watcher when the repository is closed. |
| if let Some(sender) = sender { |
| // Make sure the entry is cleaned up if the repository is deleted. |
| let sse_response_creators = Arc::downgrade(&sse_response_creators); |
| |
| // Grab a handle to the repo dropped signal, so we can shut down our watcher. |
| let mut repo_dropped = repo.on_dropped_signal(); |
| |
| // Downgrade our repository handle, so we won't block it being deleted. |
| let repo = Arc::downgrade(&repo); |
| |
| executor.spawn(async move { |
| let watcher_fut = timestamp_watcher(repo, sender, watcher).fuse(); |
| futures::pin_mut!(watcher_fut); |
| |
| // Run the task until the watcher exits, or we were asked to cancel. |
| futures::select! { |
| () = watcher_fut => {}, |
| _ = server_stopped => {}, |
| _ = repo_dropped => (), |
| }; |
| |
| // Clean up our SSE creators. |
| if let Some(sse_response_creators) = sse_response_creators.upgrade() { |
| sse_response_creators.write().remove(&id); |
| } |
| }); |
| }; |
| |
| // Finally, create the response for the client. |
| response_creator.create().await |
| } |
| |
| #[derive(Serialize, Deserialize)] |
| struct SignedTimestamp { |
| signed: TimestampFile, |
| } |
| #[derive(Serialize, Deserialize)] |
| struct TimestampFile { |
| version: u32, |
| } |
| |
| async fn timestamp_watcher<S>(repo: Weak<Repository>, sender: EventSender, mut watcher: S) |
| where |
| S: Stream<Item = ()> + Unpin, |
| { |
| let mut old_version = None; |
| |
| loop { |
| // Temporarily upgrade the repository while we look up the timestamp.json's version. |
| let version = match repo.upgrade() { |
| Some(repo) => read_timestamp_version(repo).await, |
| None => { |
| // Exit our watcher if the repository has been deleted. |
| return; |
| } |
| }; |
| |
| if let Some(version) = version { |
| if old_version != Some(version) { |
| old_version = Some(version); |
| |
| sender |
| .send( |
| &Event::from_type_and_data("timestamp.json", version.to_string()) |
| .expect("Could not assemble timestamp event"), |
| ) |
| .await; |
| } |
| } |
| |
| // Exit the loop if the notify watcher has shut down. |
| if watcher.next().await.is_none() { |
| break; |
| } |
| } |
| } |
| |
| // Try to read the timestamp.json's version from the repository, or return `None` if we experience |
| // any errors. |
| async fn read_timestamp_version(repo: Arc<Repository>) -> Option<u32> { |
| for _ in 0..MAX_PARSE_RETRIES { |
| // Read the timestamp file. |
| // |
| // FIXME: We should be using the TUF client to get the latest |
| // timestamp in order to make sure the metadata is valid. |
| match repo.fetch_metadata("timestamp.json").await { |
| Ok(mut file) => { |
| let mut bytes = vec![]; |
| match file.read_to_end(&mut bytes).await { |
| Ok(()) => match serde_json::from_slice::<SignedTimestamp>(&bytes) { |
| Ok(timestamp_file) => { |
| return Some(timestamp_file.signed.version); |
| } |
| Err(err) => { |
| warn!("failed to parse timestamp.json: {:#?}", err); |
| } |
| }, |
| Err(err) => { |
| warn!("failed to read timestamp.json: {:#}", err); |
| } |
| } |
| } |
| Err(err) => { |
| warn!("failed to read timestamp.json: {:#?}", err); |
| } |
| }; |
| |
| // We might see the file change when it's half-written, so we need to retry |
| // the parse if it fails. |
| fasync::Timer::new(PARSE_RETRY_DELAY).await; |
| } |
| |
| // Failed to parse out the timestamp file. |
| error!("failed to read timestamp.json after {} attempts", MAX_PARSE_RETRIES); |
| |
| None |
| } |
| |
| fn status_response(status_code: StatusCode) -> Response<Body> { |
| Response::builder().status(status_code).body(Body::empty()).unwrap() |
| } |
| |
| /// Adapt [async_net::TcpStream] to work with hyper. |
| #[derive(Debug)] |
| pub enum ConnectionStream { |
| Tcp(TcpStream), |
| Socket(fasync::Socket), |
| } |
| |
| impl tokio::io::AsyncRead for ConnectionStream { |
| fn poll_read( |
| mut self: Pin<&mut Self>, |
| cx: &mut Context<'_>, |
| buf: &mut tokio::io::ReadBuf<'_>, |
| ) -> Poll<io::Result<()>> { |
| match &mut *self { |
| ConnectionStream::Tcp(t) => Pin::new(t).poll_read(cx, buf.initialize_unfilled()), |
| ConnectionStream::Socket(t) => Pin::new(t).poll_read(cx, buf.initialize_unfilled()), |
| } |
| .map_ok(|sz| { |
| buf.advance(sz); |
| () |
| }) |
| } |
| } |
| |
| impl tokio::io::AsyncWrite for ConnectionStream { |
| fn poll_write( |
| mut self: Pin<&mut Self>, |
| cx: &mut Context<'_>, |
| buf: &[u8], |
| ) -> Poll<io::Result<usize>> { |
| match &mut *self { |
| ConnectionStream::Tcp(t) => Pin::new(t).poll_write(cx, buf), |
| ConnectionStream::Socket(t) => Pin::new(t).poll_write(cx, buf), |
| } |
| } |
| |
| fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
| match &mut *self { |
| ConnectionStream::Tcp(t) => Pin::new(t).poll_flush(cx), |
| ConnectionStream::Socket(t) => Pin::new(t).poll_flush(cx), |
| } |
| } |
| |
| fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
| match &mut *self { |
| ConnectionStream::Tcp(t) => Pin::new(t).poll_close(cx), |
| ConnectionStream::Socket(t) => Pin::new(t).poll_close(cx), |
| } |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use { |
| super::*, |
| crate::{manager::RepositoryManager, test_utils::make_writable_empty_repository}, |
| anyhow::Result, |
| assert_matches::assert_matches, |
| bytes::Bytes, |
| camino::Utf8Path, |
| fuchsia_async as fasync, |
| http_sse::Client as SseClient, |
| std::convert::TryInto, |
| std::{fs::remove_file, io::Write as _, net::Ipv4Addr}, |
| timeout::timeout, |
| }; |
| |
| async fn get(url: impl AsRef<str>) -> Result<Response<Body>> { |
| let req = Request::get(url.as_ref()).body(Body::empty())?; |
| let client = fuchsia_hyper::new_client(); |
| let response = client.request(req).await?; |
| Ok(response) |
| } |
| |
| async fn get_bytes(url: impl AsRef<str> + std::fmt::Debug) -> Result<Bytes> { |
| let response = get(url).await?; |
| assert_eq!(response.status(), StatusCode::OK); |
| assert_eq!(response.headers()["Accept-Ranges"], "bytes"); |
| Ok(hyper::body::to_bytes(response).await?) |
| } |
| |
| async fn get_range( |
| url: impl AsRef<str>, |
| start: Option<u64>, |
| end: Option<u64>, |
| ) -> Result<Response<Body>> { |
| let start_str = match start { |
| Some(start) => start.to_string(), |
| None => "".to_owned(), |
| }; |
| let end_str = match end { |
| Some(end) => end.to_string(), |
| None => "".to_owned(), |
| }; |
| let req = Request::get(url.as_ref()) |
| .header("Range", format!("bytes={}-{}", start_str, end_str)) |
| .body(Body::empty())?; |
| let client = fuchsia_hyper::new_client(); |
| let response = client.request(req).await?; |
| Ok(response) |
| } |
| |
| async fn get_bytes_range( |
| url: impl AsRef<str> + std::fmt::Debug, |
| start: Option<u64>, |
| end: Option<u64>, |
| total_len: u64, |
| ) -> Result<Bytes> { |
| let response = get_range(url, start, end).await?; |
| assert_eq!(response.status(), StatusCode::PARTIAL_CONTENT); |
| |
| // http ranges are inclusive, so we need to add one to `end` to compute the content length. |
| let content_len = end.map(|end| end + 1).unwrap_or(total_len) - start.unwrap_or(0); |
| assert_eq!(response.headers()["Content-Length"], content_len.to_string()); |
| |
| let start_str = start.map(|i| i.to_string()).unwrap_or_else(String::new); |
| let end_str = end.map(|i| i.to_string()).unwrap_or_else(String::new); |
| assert_eq!( |
| response.headers()["Content-Range"], |
| format!("bytes {}-{}/{}", start_str, end_str, total_len) |
| ); |
| |
| Ok(hyper::body::to_bytes(response).await?) |
| } |
| |
| fn write_file(path: &Utf8Path, body: &[u8]) { |
| let mut tmp = tempfile::NamedTempFile::new().unwrap(); |
| tmp.write_all(body).unwrap(); |
| tmp.persist(path).unwrap(); |
| } |
| |
| async fn run_test<F, R>(manager: Arc<RepositoryManager>, test: F) |
| where |
| F: Fn(String) -> R, |
| R: Future<Output = ()>, |
| { |
| let addr = (Ipv4Addr::LOCALHOST, 0).into(); |
| let (server_fut, _, server) = |
| RepositoryServer::builder(addr, Arc::clone(&manager)).start().await.unwrap(); |
| |
| // Run the server in the background. |
| let task = fasync::Task::local(server_fut); |
| |
| test(server.local_url()).await; |
| |
| // Signal the server to shutdown. |
| server.stop(); |
| |
| // Wait for the server to actually shut down. |
| task.await; |
| } |
| |
| #[fuchsia_async::run_singlethreaded(test)] |
| async fn test_start_stop() { |
| let manager = RepositoryManager::new(); |
| let addr = (Ipv4Addr::LOCALHOST, 0).into(); |
| let (server_fut, _, server) = |
| RepositoryServer::builder(addr, Arc::clone(&manager)).start().await.unwrap(); |
| |
| // Run the server in the background. |
| let task = fasync::Task::local(server_fut); |
| |
| // Signal the server to shutdown. |
| server.stop(); |
| |
| // Wait for the server to actually shut down. |
| task.await; |
| } |
| |
| #[fuchsia_async::run_singlethreaded(test)] |
| async fn test_empty_server_404s() { |
| let manager = RepositoryManager::new(); |
| |
| run_test(manager, |server_url| async move { |
| let result = get(server_url).await.unwrap(); |
| assert_eq!(result.status(), StatusCode::NOT_FOUND); |
| }) |
| .await |
| } |
| |
| #[fuchsia_async::run_singlethreaded(test)] |
| async fn test_get_files_from_repositories() { |
| let manager = RepositoryManager::new(); |
| |
| let tmp = tempfile::tempdir().unwrap(); |
| let dir = Utf8Path::from_path(tmp.path()).unwrap(); |
| |
| let test_cases = [("devhost-0", ["0-0", "0-1"]), ("devhost-1", ["1-0", "1-1"])]; |
| |
| for (devhost, bodies) in &test_cases { |
| let dir = dir.join(devhost); |
| let repo = make_writable_empty_repository(*devhost, dir.clone()).await.unwrap(); |
| |
| for body in &bodies[..] { |
| write_file(&dir.join("repository").join(body), body.as_bytes()); |
| } |
| |
| manager.add(Arc::new(repo)); |
| } |
| |
| run_test(manager, |server_url| async move { |
| for (devhost, bodies) in &test_cases { |
| for body in &bodies[..] { |
| let url = format!("{}/{}/{}", server_url, devhost, body); |
| assert_matches!(get_bytes(&url).await, Ok(bytes) if bytes == &body[..]); |
| } |
| } |
| }) |
| .await |
| } |
| |
| #[fuchsia_async::run_singlethreaded(test)] |
| async fn test_get_files_with_close_range_from_repositories() { |
| let manager = RepositoryManager::new(); |
| |
| let tmp = tempfile::tempdir().unwrap(); |
| let dir = Utf8Path::from_path(tmp.path()).unwrap(); |
| |
| let test_cases = [("devhost-0", ["0-0", "0-1"]), ("devhost-1", ["1-0", "1-1"])]; |
| |
| for (devhost, bodies) in &test_cases { |
| let dir = dir.join(devhost); |
| let repo = make_writable_empty_repository(*devhost, dir.clone()).await.unwrap(); |
| |
| for body in &bodies[..] { |
| write_file(&dir.join("repository").join(body), body.as_bytes()); |
| } |
| |
| manager.add(Arc::new(repo)); |
| } |
| |
| run_test(manager, |server_url| async move { |
| for (devhost, bodies) in &test_cases { |
| for body in &bodies[..] { |
| let url = format!("{}/{}/{}", server_url, devhost, body); |
| |
| assert_eq!( |
| &body[1..=2], |
| get_bytes_range( |
| &url, |
| Some(1), |
| Some(2), |
| body.chars().count().try_into().unwrap(), |
| ) |
| .await |
| .unwrap() |
| ); |
| } |
| } |
| }) |
| .await |
| } |
| |
| #[fuchsia_async::run_singlethreaded(test)] |
| async fn test_get_files_with_get_416_when_range_too_big() { |
| let manager = RepositoryManager::new(); |
| |
| let tmp = tempfile::tempdir().unwrap(); |
| let dir = Utf8Path::from_path(tmp.path()).unwrap(); |
| |
| let test_cases = [("devhost-0", ["0-0", "0-1"]), ("devhost-1", ["1-0", "1-1"])]; |
| |
| for (devhost, bodies) in &test_cases { |
| let dir = dir.join(devhost); |
| let repo = make_writable_empty_repository(*devhost, dir.clone()).await.unwrap(); |
| |
| for body in &bodies[..] { |
| write_file(&dir.join("repository").join(body), body.as_bytes()); |
| } |
| |
| manager.add(Arc::new(repo)); |
| } |
| |
| run_test(manager, |server_url| async move { |
| for (devhost, bodies) in &test_cases { |
| for body in &bodies[..] { |
| let url = format!("{}/{}/{}", server_url, devhost, body); |
| let response = get_range(&url, Some(1), Some(5)).await.unwrap(); |
| assert_eq!(response.status(), StatusCode::RANGE_NOT_SATISFIABLE); |
| } |
| } |
| }) |
| .await |
| } |
| |
| #[fuchsia_async::run_singlethreaded(test)] |
| async fn test_auto_inner() { |
| let manager = RepositoryManager::new(); |
| |
| let tmp = tempfile::tempdir().unwrap(); |
| let dir = Utf8Path::from_path(tmp.path()).unwrap().join("devhost"); |
| |
| let repo = make_writable_empty_repository("devhost", dir.clone()).await.unwrap(); |
| |
| let timestamp_file = dir.join("repository").join("timestamp.json"); |
| write_file( |
| ×tamp_file, |
| serde_json::to_string(&SignedTimestamp { signed: TimestampFile { version: 1 } }) |
| .unwrap() |
| .as_bytes(), |
| ); |
| |
| manager.add(Arc::new(repo)); |
| |
| run_test(manager, |server_url| { |
| let timestamp_file = timestamp_file.clone(); |
| async move { |
| let url = format!("{}/devhost/auto", server_url); |
| let mut client = |
| SseClient::connect(fuchsia_hyper::new_https_client(), url).await.unwrap(); |
| |
| assert_eq!( |
| timeout(std::time::Duration::from_secs(3), client.next()) |
| .await |
| .unwrap() |
| .unwrap() |
| .unwrap() |
| .data(), |
| "1" |
| ); |
| write_file( |
| ×tamp_file, |
| serde_json::to_string(&SignedTimestamp { |
| signed: TimestampFile { version: 2 }, |
| }) |
| .unwrap() |
| .as_bytes(), |
| ); |
| assert_eq!(client.next().await.unwrap().unwrap().data(), "2"); |
| |
| write_file( |
| ×tamp_file, |
| serde_json::to_string(&SignedTimestamp { |
| signed: TimestampFile { version: 3 }, |
| }) |
| .unwrap() |
| .as_bytes(), |
| ); |
| assert_eq!(client.next().await.unwrap().unwrap().data(), "3"); |
| |
| remove_file(×tamp_file).unwrap(); |
| write_file( |
| ×tamp_file, |
| serde_json::to_string(&SignedTimestamp { |
| signed: TimestampFile { version: 4 }, |
| }) |
| .unwrap() |
| .as_bytes(), |
| ); |
| assert_eq!(client.next().await.unwrap().unwrap().data(), "4"); |
| } |
| }) |
| .await; |
| |
| // FIXME(https://github.com/notify-rs/notify/pull/337): On OSX, notify uses a |
| // crossbeam-channel in `Drop` to shut down the interior thread. Unfortunately this can |
| // trip over an issue where OSX will tear down the thread local storage before shutting |
| // down the thread, which can trigger a panic. To avoid this issue, sleep a little bit |
| // after shutting down our stream. |
| fasync::Timer::new(Duration::from_millis(100)).await; |
| } |
| |
| #[fuchsia_async::run_singlethreaded(test)] |
| async fn test_shutdown_timeout() { |
| let manager = RepositoryManager::new(); |
| |
| let tmp = tempfile::tempdir().unwrap(); |
| let dir = Utf8Path::from_path(tmp.path()).unwrap().join("devhost"); |
| |
| let repo = make_writable_empty_repository("devhost", dir.clone()).await.unwrap(); |
| |
| let timestamp_file = dir.join("repository").join("timestamp.json"); |
| write_file( |
| ×tamp_file, |
| serde_json::to_string(&SignedTimestamp { signed: TimestampFile { version: 1 } }) |
| .unwrap() |
| .as_bytes(), |
| ); |
| |
| manager.add(Arc::new(repo)); |
| |
| let addr = (Ipv4Addr::LOCALHOST, 0).into(); |
| let (server_fut, _, server) = |
| RepositoryServer::builder(addr, Arc::clone(&manager)).start().await.unwrap(); |
| |
| // Run the server in the background. |
| let server_task = fasync::Task::local(server_fut); |
| |
| // Connect to the sse endpoint. |
| let url = format!("{}/devhost/auto", server.local_url()); |
| |
| // wait for an SSE event in the background. |
| let (tx_sse_connected, rx_sse_connected) = futures::channel::oneshot::channel(); |
| let sse_task = fasync::Task::local(async move { |
| let mut sse = SseClient::connect(fuchsia_hyper::new_https_client(), url).await.unwrap(); |
| |
| // We should receive one event for the current timestamp. |
| sse.next().await.unwrap().unwrap(); |
| |
| tx_sse_connected.send(()).unwrap(); |
| |
| // We should block until we receive an error because the server went away. |
| match sse.next().await { |
| Some(Ok(event)) => { |
| panic!("unexpected event {:?}", event); |
| } |
| Some(Err(_)) => {} |
| None => { |
| panic!("unexpected channel close"); |
| } |
| } |
| }); |
| |
| // wait for the sse client to connect to the server. |
| rx_sse_connected.await.unwrap(); |
| |
| // Signal the server to shutdown. |
| server.stop(); |
| |
| // The server should shutdown after the timeout period. |
| server_task.await; |
| |
| futures::select! { |
| () = sse_task.fuse() => {}, |
| |
| () = fuchsia_async::Timer::new(Duration::from_secs(10)).fuse() => { |
| panic!("sse task failed to shut down"); |
| }, |
| } |
| } |
| } |