blob: fb9cdad81960433172e52f2004e94986e76188de [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#![cfg(test)]
use {
super::*,
assert_matches::assert_matches,
fuchsia_async::{self as fasync, net::TcpListener},
fuchsia_hyper::new_https_client,
futures::{
future::{join, TryFutureExt},
stream::{StreamExt, TryStreamExt},
},
hyper::{
server::{accept::from_stream, Server},
service::{make_service_fn, service_fn},
},
std::{
convert::Infallible,
net::{Ipv4Addr, SocketAddr},
sync::Arc,
},
};
fn spawn_server(buffer_size: usize) -> (String, EventSender) {
let (connections, url) = {
let listener = TcpListener::bind(&SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0)).unwrap();
let local_addr = listener.local_addr().unwrap();
(
listener
.accept_stream()
.map_ok(|(conn, _addr)| fuchsia_hyper::TcpStream { stream: conn }),
format!("http://{}", local_addr),
)
};
let (sse_response_creator, event_sender) =
SseResponseCreator::with_additional_buffer_size(buffer_size);
let sse_response_creator = Arc::new(sse_response_creator);
let server = Server::builder(from_stream(connections))
.executor(fuchsia_hyper::Executor)
.serve(make_service_fn(move |_socket: &fuchsia_hyper::TcpStream| {
let sse_response_creator = Arc::clone(&sse_response_creator);
async move {
Ok::<_, Infallible>(service_fn(move |_req| {
let sse_response_creator = Arc::clone(&sse_response_creator);
async move { Ok::<_, Infallible>(sse_response_creator.create().await) }
}))
}
}))
.unwrap_or_else(|e| panic!("mock sse server failed: {:?}", e));
fasync::Task::spawn(server).detach();
(url, event_sender)
}
#[fasync::run_singlethreaded(test)]
async fn single_client_single_event() {
let (url, event_sender) = spawn_server(1);
let mut client = Client::connect(new_https_client(), &url).await.unwrap();
let event = Event::from_type_and_data("event_type", "event_data").unwrap();
let (_, recv) = join(event_sender.send(&event), client.next()).await;
assert_matches!(recv, Some(Ok(e)) if e == event);
}
#[fasync::run_singlethreaded(test)]
async fn multiple_clients_multiple_events() {
let (url, event_sender) = spawn_server(2);
let client0 = Client::connect(new_https_client(), &url).await.unwrap();
let client1 = Client::connect(new_https_client(), &url).await.unwrap();
let events = vec![
Event::from_type_and_data("event_type0", "event_data0").unwrap(),
Event::from_type_and_data("event_type1", "event_data1").unwrap(),
];
for event in events.iter() {
event_sender.send(event).await;
}
let client0_events = client0.take(2).collect::<Vec<_>>();
let client1_events = client1.take(2).collect::<Vec<_>>();
let (client0_events, client1_events) = join(client0_events, client1_events).await;
assert_eq!(client0_events.into_iter().map(|r| r.unwrap()).collect::<Vec<_>>(), events);
assert_eq!(client1_events.into_iter().map(|r| r.unwrap()).collect::<Vec<_>>(), events);
}