blob: c1be6156393ce220a138402566ed928607bf7865 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use anyhow::{anyhow, Error};
use fidl::endpoints::ServerEnd;
use fidl_fuchsia_testing_proxy::{
TcpProxyControlRequest, TcpProxyControlRequestStream, TcpProxy_Marker, TcpProxy_RequestStream,
};
use fuchsia_async as fasync;
use fuchsia_component::server::ServiceFs;
use futures::{
channel::mpsc,
future::{select, FutureExt, TryFutureExt},
io::AsyncReadExt,
lock::Mutex,
stream::{StreamExt, TryStreamExt},
};
use log::{error, info, warn};
use std::{
collections::HashMap,
net::{Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6},
sync::Arc,
};
#[fasync::run_singlethreaded]
async fn main() {
fuchsia_syslog::init_with_tags(&["tcp-proxy"]).expect("Can't init logger");
let mut fs = ServiceFs::new();
let proxy_control = Arc::new(TcpProxyControl::new());
fs.dir("svc").add_fidl_service(move |stream| {
let proxy_control_clone = proxy_control.clone();
fasync::Task::spawn(async move {
proxy_control_clone
.serve_requests_from_stream(stream)
.unwrap_or_else(|e| error!("Error handling TcpProxyControl channel: {:?}", e))
.await;
})
.detach();
});
fs.take_and_serve_directory_handle().unwrap();
fs.collect::<()>().await;
}
/// An implementation of `fuchsia.testing.proxy.TcpProxyControl` that opens
/// proxy ports accessible from a remote device.
struct TcpProxyControl {
/// A mapping from target port to handles of TCP proxies.
proxy_handles: Mutex<HashMap<u16, TcpProxyHandle>>,
}
impl TcpProxyControl {
pub fn new() -> Self {
TcpProxyControl { proxy_handles: Mutex::new(HashMap::new()) }
}
/// Serve `TcpProxyControlRequest`s recieved on the provided stream.
pub async fn serve_requests_from_stream(
&self,
mut stream: TcpProxyControlRequestStream,
) -> Result<(), Error> {
while let Some(req) = stream.try_next().await? {
let TcpProxyControlRequest::OpenProxy_ {
target_port,
proxy_port,
tcp_proxy,
responder,
} = req;
let open_port =
self.open_proxy(target_port, proxy_port, tcp_proxy).await.map_err(|e| {
warn!("Error opening proxy: {:?}", e);
e
})?;
responder.send(open_port)?;
}
Ok(())
}
async fn open_proxy(
&self,
target_port: u16,
proxy_port: u16,
tcp_proxy_token: ServerEnd<TcpProxy_Marker>,
) -> Result<u16, Error> {
let mut tcp_proxy_stream = tcp_proxy_token.into_stream()?;
let mut proxy_handles_lock = self.proxy_handles.lock().await;
if let Some(proxy_handle) = proxy_handles_lock.get(&target_port) {
match proxy_handle.register_client(tcp_proxy_stream) {
// proxy exists and will extend it's lifetime until the client drops
Ok(open_port) => return Ok(open_port),
// error indicates the proxy has stopped and should be recreated.
Err(RegisterError::ProxyClosed(stream)) => tcp_proxy_stream = stream,
}
}
let (tcp_proxy, tcp_proxy_handle) = TcpProxy::new(target_port, proxy_port)?;
let open_port = tcp_proxy_handle
.register_client(tcp_proxy_stream)
.map_err(|_| anyhow!("Error registering channel with new proxy"))?;
fasync::Task::spawn(async move {
info!("Forwarding port {:?} to {:?}", open_port, target_port);
tcp_proxy.serve_proxy_while_open_clients().await;
info!("Stopped forwarding to port {:?}", target_port);
})
.detach();
proxy_handles_lock.insert(target_port, tcp_proxy_handle);
Ok(open_port)
}
}
/// A proxy that forwards TCP requests made to an externally accessible port to a target port which
/// is only internally available.
struct TcpProxy {
/// Listener that accepts connections on the open port.
tcp_listener: fasync::net::TcpListener,
/// Target port on the local device to forward requests to.
target_port: u16,
/// Channel through which the `TcpProxy` is notified of new clients.
stream_receiver: mpsc::UnboundedReceiver<TcpProxy_RequestStream>,
}
struct TcpProxyHandle {
/// The port on which the `TcpProxy` is listening for connection requests.
open_port: u16,
/// Channel through which the handle notifies `TcpProxy` of new clients.
stream_sender: mpsc::UnboundedSender<TcpProxy_RequestStream>,
}
enum RegisterError {
ProxyClosed(TcpProxy_RequestStream),
}
impl TcpProxyHandle {
/// Register a client with the corresponding `TcpProxy`, extending it's lifetime until the
/// given token is closed. Returns the port the proxy listens on, or an error if the `TcpProxy`
/// has already terminated.
pub fn register_client(&self, token: TcpProxy_RequestStream) -> Result<u16, RegisterError> {
match self.stream_sender.unbounded_send(token) {
Ok(()) => Ok(self.open_port),
Err(e) => Err(RegisterError::ProxyClosed(e.into_inner())),
}
}
}
impl TcpProxy {
/// Creates a new `TcpProxy` and corresponding `TcpProxyHandle` with which to register clients.
pub fn new(target_port: u16, proxy_port: u16) -> Result<(Self, TcpProxyHandle), Error> {
let open_addr: SocketAddrV6 = format!("[::]:{}", proxy_port).parse()?;
let tcp_listener = fasync::net::TcpListener::bind(&open_addr.into())?;
let open_port = tcp_listener.local_addr()?.port();
let (sender, receiver) = mpsc::unbounded();
let proxy = TcpProxy { tcp_listener, target_port, stream_receiver: receiver };
let proxy_handle = TcpProxyHandle { open_port, stream_sender: sender };
Ok((proxy, proxy_handle))
}
/// Proxies requests received on the open port to the target port, until all the clients
/// registered via the corresponding `TcpProxyHandle` are closed or the proxy crashes.
async fn serve_proxy_while_open_clients(self) {
let TcpProxy { tcp_listener, target_port, mut stream_receiver } = self;
let clients_complete_fut = async move {
while let Some(Some(request_stream)) = stream_receiver.next().now_or_never() {
request_stream.collect::<Vec<_>>().await;
}
};
select(clients_complete_fut.boxed(), Self::serve_proxy(tcp_listener, target_port).boxed())
.await;
}
/// Proxies any requests recieved on |tcp_listener| to localhost:|target_port|.
async fn serve_proxy(tcp_listener: fasync::net::TcpListener, target_port: u16) {
tcp_listener
.accept_stream()
.try_for_each_concurrent(None, |(client_conn, _addr)| async move {
let v6_addr = SocketAddrV6::new(Ipv6Addr::LOCALHOST, target_port, 0, 0);
let v4_addr = SocketAddrV4::new(Ipv4Addr::LOCALHOST, target_port);
let server_conn = match fasync::net::TcpStream::connect(v6_addr.into())?.await {
Ok(v6_conn) => v6_conn,
Err(_) => fasync::net::TcpStream::connect(v4_addr.into())?.await?,
};
Self::serve_single_connection(client_conn, server_conn)
.await
.unwrap_or_else(|e| warn!("Error serving tunnel: {:?}", e));
Ok(())
})
.await
.unwrap_or_else(|e| warn!("Error listening for tcp connections: {:?}", e));
}
/// Proxy a single connection between the provided `TcpStream`s.
async fn serve_single_connection(
client_conn: fasync::net::TcpStream,
server_conn: fasync::net::TcpStream,
) -> Result<(), Error> {
let (client_read, mut client_write) = client_conn.split();
let (server_read, mut server_write) = server_conn.split();
let client_to_server_fut = futures::io::copy(client_read, &mut server_write);
let server_to_client_fut = futures::io::copy(server_read, &mut client_write);
// If one side closes, the connection is broken so we can stop both sides.
select(client_to_server_fut.boxed(), server_to_client_fut.boxed()).await.factor_first().0?;
Ok(())
}
}
#[cfg(test)]
mod test {
use super::*;
use fidl::endpoints::{create_proxy, create_proxy_and_stream};
use fidl_fuchsia_testing_proxy::{TcpProxyControlMarker, TcpProxyControlProxy};
use fuchsia_async::DurationExt;
use fuchsia_zircon as zx;
use futures::future::TryFutureExt;
use hyper::server::{accept::from_stream, Server};
use hyper::{Body, Response, Uri};
use std::{convert::Infallible, net::SocketAddr};
fn launch_data_proxy_control() -> TcpProxyControlProxy {
let control = TcpProxyControl::new();
let (proxy, stream) = create_proxy_and_stream::<TcpProxyControlMarker>().unwrap();
fasync::Task::spawn(async move {
control.serve_requests_from_stream(stream).await.unwrap();
})
.detach();
proxy
}
const TEST_RESPONSE: &str = "success";
/// Assumed time for which serving a proxy completes after the last client drops.
const STOP_DURATION: zx::Duration = zx::Duration::from_millis(100);
/// Launches an http server that always responds with success. Returns the port the
/// server is listening on.
fn launch_test_server(addr: SocketAddr) -> u16 {
let tcp_listener = fasync::net::TcpListener::bind(&addr).unwrap();
let port = tcp_listener.local_addr().unwrap().port();
let connections = tcp_listener
.accept_stream()
.map_ok(|(conn, _addr)| fuchsia_hyper::TcpStream { stream: conn });
let make_svc = hyper::service::make_service_fn(move |_socket| async {
Ok::<_, Infallible>(hyper::service::service_fn(move |_req| async {
info!("HTTP server got request!");
Ok::<_, Infallible>(Response::new(Body::from(TEST_RESPONSE)))
}))
});
let server = Server::builder(from_stream(connections))
.executor(fuchsia_hyper::Executor)
.serve(make_svc)
.unwrap_or_else(|e| panic!("HTTP server failed! {:?}", e));
fasync::Task::spawn(server).detach();
port
}
fn launch_test_server_v6() -> u16 {
let addr: SocketAddrV6 = "[::1]:0".parse().unwrap();
launch_test_server(addr.into())
}
fn launch_test_server_v4() -> u16 {
let addr: SocketAddrV4 = "127.0.0.1:0".parse().unwrap();
launch_test_server(addr.into())
}
/// Asserts that an HTTP request to [::1]:port succeeds.
async fn assert_request(port: u16) {
let http_client = fuchsia_hyper::new_client();
let request_uri = Uri::builder()
.scheme("http")
.authority(format!("[::1]:{:?}", port).as_str())
.path_and_query("/")
.build()
.unwrap();
let response = http_client.get(request_uri).await.unwrap();
assert_eq!(response.status(), hyper::StatusCode::OK);
let resp_body = response
.into_body()
.try_fold(Vec::new(), |mut vec, b| async move {
vec.extend(b);
Ok(vec)
})
.await
.unwrap();
assert_eq!(String::from_utf8(resp_body).unwrap().as_str(), TEST_RESPONSE);
}
/// Asserts that an HTTP request to [::1]:port fails.
async fn assert_unreachable(port: u16) {
let http_client = fuchsia_hyper::new_client();
let request_uri = Uri::builder()
.scheme("http")
.authority(format!("[::1]:{:?}", port).as_str())
.path_and_query("/")
.build()
.unwrap();
assert!(http_client.get(request_uri).await.is_err());
}
#[fasync::run_singlethreaded(test)]
async fn test_tcp_proxy_lifetime() {
let test_port = launch_test_server_v6();
assert_request(test_port).await;
let (tcp_proxy, tcp_proxy_handle) = TcpProxy::new(test_port, 0).unwrap();
let (tcp_proxy_token, tcp_proxy_server_end) =
create_proxy_and_stream::<TcpProxy_Marker>().unwrap();
let proxy_port = tcp_proxy_handle
.register_client(tcp_proxy_server_end)
.map_err(|_| anyhow!("Error on register client"))
.unwrap();
let tcp_proxy_fut = tcp_proxy.serve_proxy_while_open_clients().shared();
fasync::Task::spawn(tcp_proxy_fut.clone()).detach();
// test server reachable while proxy is served
assert_request(proxy_port).await;
// write again
assert_request(proxy_port).await;
// after dropping the TcpProxy, serving proxy should complete
drop(tcp_proxy_token);
tcp_proxy_fut.await;
assert_unreachable(proxy_port).await;
}
#[fasync::run_singlethreaded(test)]
async fn test_tcp_proxy_ipv4() {
let test_port = launch_test_server_v4();
let (tcp_proxy, tcp_proxy_handle) = TcpProxy::new(test_port, 0).unwrap();
let (tcp_proxy_token, tcp_proxy_server_end) =
create_proxy_and_stream::<TcpProxy_Marker>().unwrap();
let proxy_port = tcp_proxy_handle
.register_client(tcp_proxy_server_end)
.map_err(|_| anyhow!("Error on register client"))
.unwrap();
let tcp_proxy_fut = tcp_proxy.serve_proxy_while_open_clients().shared();
fasync::Task::spawn(tcp_proxy_fut.clone()).detach();
// test server reachable while proxy is served
assert_request(proxy_port).await;
// after dropping the TcpProxy, serving proxy should complete
drop(tcp_proxy_token);
tcp_proxy_fut.await;
assert_unreachable(proxy_port).await;
}
#[fasync::run_singlethreaded(test)]
async fn test_tcp_proxy_multiple_clients_lifetime() {
let test_port = launch_test_server_v6();
assert_request(test_port).await;
let (tcp_proxy, tcp_proxy_handle) = TcpProxy::new(test_port, 0).unwrap();
let (tcp_proxy_token, tcp_proxy_server_end) =
create_proxy_and_stream::<TcpProxy_Marker>().unwrap();
let proxy_port = tcp_proxy_handle
.register_client(tcp_proxy_server_end)
.map_err(|_| anyhow!("Error on register client"))
.unwrap();
let tcp_proxy_fut = tcp_proxy.serve_proxy_while_open_clients().shared();
fasync::Task::spawn(tcp_proxy_fut.clone()).detach();
// create second client
let (tcp_proxy_token_2, tcp_proxy_server_end_2) =
create_proxy_and_stream::<TcpProxy_Marker>().unwrap();
tcp_proxy_handle
.register_client(tcp_proxy_server_end_2)
.map_err(|_| anyhow!("Error on register client"))
.unwrap();
assert_request(proxy_port).await;
// after dropping first handle, but not second, proxy is still accessible
drop(tcp_proxy_token);
assert_request(proxy_port).await;
// proxy completes after second handle dropped
drop(tcp_proxy_token_2);
tcp_proxy_fut.await;
assert_unreachable(proxy_port).await;
}
#[fasync::run_until_stalled(test)]
async fn test_proxy_control_reuses_proxy() {
let control_proxy = launch_data_proxy_control();
let target_port = 80;
let (_proxy_client, proxy_client_server_end) = create_proxy::<TcpProxy_Marker>().unwrap();
let open_port =
control_proxy.open_proxy_(target_port, 0, proxy_client_server_end).await.unwrap();
let (_proxy_client_2, proxy_client_server_end_2) =
create_proxy::<TcpProxy_Marker>().unwrap();
let open_port_2 =
control_proxy.open_proxy_(target_port, 0, proxy_client_server_end_2).await.unwrap();
assert_eq!(open_port, open_port_2);
}
#[fasync::run_singlethreaded(test)]
async fn test_proxy_control_recreate_proxy() {
let test_port = launch_test_server_v6();
assert_request(test_port).await;
let control_proxy = launch_data_proxy_control();
// echo server reachable through proxy
let (tcp_proxy, tcp_proxy_server_end) = create_proxy().unwrap();
let proxy_port =
control_proxy.open_proxy_(test_port, 0, tcp_proxy_server_end).await.unwrap();
assert_request(proxy_port).await;
// after dropping the TcpProxy, serving should stop
std::mem::drop(tcp_proxy);
fasync::Timer::new(STOP_DURATION.after_now()).await;
assert_unreachable(proxy_port).await;
// reachable after recreating proxy
let (_tcp_proxy, tcp_proxy_server_end) = create_proxy().unwrap();
let proxy_port =
control_proxy.open_proxy_(test_port, 0, tcp_proxy_server_end).await.unwrap();
assert_request(proxy_port).await;
}
}