blob: 2f9d403bb39cfd94341675522b06b8d676b98e9d [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
crate::{Event, EventSource},
fuchsia_hyper::HttpsClient,
futures::{
stream::Stream,
task::{Context, Poll},
},
hyper::{Body, Request, StatusCode},
std::pin::Pin,
thiserror::Error,
};
/// An http SSE client.
#[derive(Debug)]
pub struct Client {
source: EventSource,
chunks: Body,
events: std::vec::IntoIter<Event>,
}
impl Client {
/// Connects to an http url and, on success, returns a `Stream` of SSE events.
pub async fn connect(
https_client: HttpsClient,
url: impl AsRef<str>,
) -> Result<Self, ClientConnectError> {
let request = Request::get(url.as_ref())
.header("accept", "text/event-stream")
.body(Body::empty())
.map_err(|e| ClientConnectError::CreateRequest(e))?;
let response =
https_client.request(request).await.map_err(|e| ClientConnectError::MakeRequest(e))?;
if response.status() != StatusCode::OK {
return Err(ClientConnectError::HttpStatus(response.status()));
}
Ok(Self {
source: EventSource::new(),
chunks: response.into_body(),
events: vec![].into_iter(),
})
}
}
#[derive(Debug, Error)]
pub enum ClientConnectError {
#[error("error creating http request: {}", _0)]
CreateRequest(hyper::http::Error),
#[error("error making http request: {}", _0)]
MakeRequest(hyper::Error),
#[error("http server responded with status other than OK: {}", _0)]
HttpStatus(hyper::StatusCode),
}
impl Stream for Client {
type Item = Result<Event, ClientPollError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
if let Some(event) = self.events.next() {
return Poll::Ready(Some(Ok(event)));
}
match Pin::new(&mut self.chunks).poll_next(cx) {
Poll::Ready(Some(Ok(chunk))) => {
self.events = self.source.parse(&chunk).into_iter();
}
Poll::Ready(Some(Err(e))) => {
return Poll::Ready(Some(Err(ClientPollError::NextChunk(e))))
}
Poll::Ready(None) => {
return Poll::Ready(None);
}
Poll::Pending => {
return Poll::Pending;
}
}
}
}
}
#[derive(Debug, Error)]
pub enum ClientPollError {
#[error("error downloading next chunk: {}", _0)]
NextChunk(hyper::Error),
}
#[cfg(test)]
mod tests {
use {
super::*,
assert_matches::assert_matches,
fuchsia_async::{self as fasync, net::TcpListener},
fuchsia_hyper::new_https_client,
futures::{
future::{Future, TryFutureExt},
stream::{StreamExt, TryStreamExt},
},
hyper::{
server::{accept::from_stream, Server},
service::{make_service_fn, service_fn},
Response,
},
std::{
convert::Infallible,
net::{Ipv4Addr, SocketAddr},
},
};
fn spawn_server<F>(handle_req: fn(Request<Body>) -> F) -> String
where
F: Future<Output = Result<Response<Body>, hyper::Error>> + Send + 'static,
{
let (connections, url) = {
let listener =
TcpListener::bind(&SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0)).unwrap();
let local_addr = listener.local_addr().unwrap();
(
listener
.accept_stream()
.map_ok(|(conn, _addr)| fuchsia_hyper::TcpStream { stream: conn }),
format!("http://{}", local_addr),
)
};
let server = Server::builder(from_stream(connections))
.executor(fuchsia_hyper::Executor)
.serve(make_service_fn(move |_socket: &fuchsia_hyper::TcpStream| async move {
Ok::<_, Infallible>(service_fn(handle_req))
}))
.unwrap_or_else(|e| panic!("mock sse server failed: {:?}", e));
fasync::Task::spawn(server).detach();
url
}
fn make_event() -> Event {
Event::from_type_and_data("event_type", "data_contents").unwrap()
}
#[fasync::run_singlethreaded(test)]
async fn receive_one_event() {
async fn handle_req(_req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
Ok(Response::builder()
.status(StatusCode::OK)
.header("content-type", "text/event-stream")
.body(make_event().to_vec().into())
.unwrap())
}
let url = spawn_server(handle_req);
let client = Client::connect(new_https_client(), url).await.unwrap();
let events: Result<Vec<_>, _> = client.collect::<Vec<_>>().await.into_iter().collect();
assert_eq!(events.unwrap(), vec![make_event()]);
}
#[fasync::run_singlethreaded(test)]
async fn client_sends_correct_http_headers() {
async fn handle_req(req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
assert_eq!(req.method(), &hyper::Method::GET);
assert_eq!(
req.headers().get("accept").map(|h| h.as_bytes()),
Some(&b"text/event-stream"[..])
);
Ok(Response::builder()
.status(StatusCode::OK)
.header("content-type", "text/event-stream")
.body(make_event().to_vec().into())
.unwrap())
}
let url = spawn_server(handle_req);
let client = Client::connect(new_https_client(), url).await.unwrap();
client.collect::<Vec<_>>().await;
}
#[fasync::run_singlethreaded(test)]
async fn error_create_request() {
assert_matches!(
Client::connect(new_https_client(), "\n").await,
Err(ClientConnectError::CreateRequest(_))
);
}
#[fasync::run_singlethreaded(test)]
async fn error_make_request() {
assert_matches!(
Client::connect(new_https_client(), "bad_url").await,
Err(ClientConnectError::MakeRequest(_))
);
}
#[fasync::run_singlethreaded(test)]
async fn error_http_status() {
async fn handle_req(_req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
Ok(Response::builder().status(StatusCode::NOT_FOUND).body(Body::empty()).unwrap())
}
let url = spawn_server(handle_req);
assert_matches!(
Client::connect(new_https_client(), url).await,
Err(ClientConnectError::HttpStatus(_))
);
}
#[fasync::run_singlethreaded(test)]
async fn error_downloading_chunk() {
// If the body of an http response is not large enough, hyper will download the body
// along with the header in the initial fuchsia_hyper::HttpsClient.request(). This means
// that even if the body is implemented with a stream that fails before the transfer is
// complete, the failure will occur during the initial request, before awaiting on the
// body chunk stream.
const BODY_SIZE_LARGE_ENOUGH_TO_TRIGGER_DELAYED_STREAMING: usize = 1_000_000;
async fn handle_req(_req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
Ok(Response::builder()
.status(StatusCode::OK)
.header(
"content-length",
&format!("{}", BODY_SIZE_LARGE_ENOUGH_TO_TRIGGER_DELAYED_STREAMING),
)
.header("content-type", "text/event-stream")
.body(Body::wrap_stream(futures::stream::iter(vec![
Ok(vec![b' '; BODY_SIZE_LARGE_ENOUGH_TO_TRIGGER_DELAYED_STREAMING - 1]),
Err("error-text".to_string()),
])))
.unwrap())
}
let url = spawn_server(handle_req);
let mut client = Client::connect(new_https_client(), url).await.unwrap();
assert_matches!(client.try_next().await, Err(ClientPollError::NextChunk(_)));
}
}