| // Copyright 2018 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| use { |
| crate::{ |
| connect_and_bind_device, |
| happy_eyeballs::{self, RealSocketConnector}, |
| parse_ip_addr, HyperConnectorFuture, SocketOptions, TcpOptions, TcpStream, |
| }, |
| fidl_connector::{Connect, ServiceReconnector}, |
| fidl_fuchsia_net_name::{LookupIpOptions, LookupMarker, LookupProxy, LookupResult}, |
| fidl_fuchsia_posix_socket::{ProviderMarker, ProviderProxy}, |
| fuchsia_async::net, |
| fuchsia_zircon as zx, |
| futures::{ |
| future::{Future, FutureExt}, |
| io, |
| task::{Context, Poll}, |
| }, |
| http::uri::{Scheme, Uri}, |
| hyper::service::Service, |
| rustls::ClientConfig, |
| std::{convert::TryFrom as _, net::SocketAddr, num::TryFromIntError}, |
| }; |
| |
| pub(crate) fn configure_cert_store(tls: &mut ClientConfig) { |
| tls.root_store.add_server_trust_anchors(&webpki_roots_fuchsia::TLS_SERVER_ROOTS); |
| } |
| |
| /// A Fuchsia-compatible implementation of hyper's `Connect` trait which allows |
| /// creating a TcpStream to a particular destination. |
| #[derive(Clone)] |
| pub struct HyperConnector { |
| tcp_options: TcpOptions, |
| socket_options: SocketOptions, |
| provider: RealServiceConnector, |
| } |
| |
| impl From<(TcpOptions, SocketOptions)> for HyperConnector { |
| fn from((tcp_options, socket_options): (TcpOptions, SocketOptions)) -> Self { |
| Self { tcp_options, socket_options, provider: RealServiceConnector::new() } |
| } |
| } |
| |
| impl HyperConnector { |
| pub fn new() -> Self { |
| Self::from_tcp_options(TcpOptions::default()) |
| } |
| |
| pub fn from_tcp_options(tcp_options: TcpOptions) -> Self { |
| Self { |
| tcp_options, |
| socket_options: SocketOptions::default(), |
| provider: RealServiceConnector::new(), |
| } |
| } |
| } |
| |
| impl Service<Uri> for HyperConnector { |
| type Response = TcpStream; |
| type Error = std::io::Error; |
| type Future = HyperConnectorFuture; |
| |
| fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
| // This connector is always ready, but others might not be. |
| Poll::Ready(Ok(())) |
| } |
| |
| fn call(&mut self, dst: Uri) -> Self::Future { |
| let self_ = self.clone(); |
| HyperConnectorFuture { fut: Box::pin(async move { self_.call_async(dst).await }) } |
| } |
| } |
| |
| impl HyperConnector { |
| async fn call_async(&self, dst: Uri) -> Result<TcpStream, io::Error> { |
| let host = dst |
| .host() |
| .ok_or(io::Error::new(io::ErrorKind::Other, "destination host is unspecified"))?; |
| let port = match dst.port() { |
| Some(port) => port.as_u16(), |
| None => { |
| if dst.scheme() == Some(&Scheme::HTTPS) { |
| 443 |
| } else { |
| 80 |
| } |
| } |
| }; |
| |
| let stream = |
| connect_to_addr(&self.provider, host, port, self.socket_options.bind_device.as_deref()) |
| .await?; |
| let () = self.tcp_options.apply(stream.std())?; |
| |
| Ok(TcpStream { stream }) |
| } |
| } |
| |
| #[derive(Clone)] |
| pub struct Executor; |
| |
| impl<F: Future + Send + 'static> hyper::rt::Executor<F> for Executor { |
| fn execute(&self, fut: F) { |
| fuchsia_async::Task::spawn(fut.map(|_| ())).detach() |
| } |
| } |
| |
| #[derive(Clone)] |
| pub struct LocalExecutor; |
| |
| impl<F: Future + 'static> hyper::rt::Executor<F> for LocalExecutor { |
| fn execute(&self, fut: F) { |
| fuchsia_async::Task::local(fut.map(drop)).detach() |
| } |
| } |
| |
| trait ProviderConnector { |
| fn connect(&self) -> Result<ProviderProxy, io::Error>; |
| } |
| |
| trait LookupConnector { |
| fn connect(&self) -> Result<LookupProxy, io::Error>; |
| } |
| |
| #[derive(Clone)] |
| struct RealServiceConnector { |
| socket_provider_connector: ServiceReconnector<ProviderMarker>, |
| name_lookup_connector: ServiceReconnector<LookupMarker>, |
| } |
| |
| impl RealServiceConnector { |
| fn new() -> Self { |
| RealServiceConnector { |
| socket_provider_connector: ServiceReconnector::<ProviderMarker>::new(), |
| name_lookup_connector: ServiceReconnector::<LookupMarker>::new(), |
| } |
| } |
| } |
| |
| impl ProviderConnector for RealServiceConnector { |
| fn connect(&self) -> Result<ProviderProxy, io::Error> { |
| self.socket_provider_connector.connect().map_err(|err| { |
| io::Error::new( |
| io::ErrorKind::Other, |
| format!("failed to connect to socket provider service: {}", err), |
| ) |
| }) |
| } |
| } |
| |
| impl LookupConnector for RealServiceConnector { |
| fn connect(&self) -> Result<LookupProxy, io::Error> { |
| self.name_lookup_connector.connect().map_err(|err| { |
| io::Error::new( |
| io::ErrorKind::Other, |
| format!("failed to connect to name lookup service: {}", err), |
| ) |
| }) |
| } |
| } |
| |
| async fn connect_to_addr<T: ProviderConnector + LookupConnector>( |
| provider: &T, |
| host: &str, |
| port: u16, |
| bind_device: Option<&str>, |
| ) -> Result<net::TcpStream, io::Error> { |
| if let Some(addr) = parse_ip_addr_with_provider(provider, host, port).await? { |
| return connect_and_bind_device(addr, bind_device)?.await; |
| } |
| |
| happy_eyeballs::happy_eyeballs( |
| resolve_ip_addr(provider, host, port).await?, |
| RealSocketConnector, |
| happy_eyeballs::RECOMMENDED_MIN_CONN_ATT_DELAY, |
| happy_eyeballs::RECOMMENDED_CONN_ATT_DELAY, |
| bind_device, |
| ) |
| .await |
| } |
| |
| async fn resolve_ip_addr( |
| name_lookup: &impl LookupConnector, |
| host: &str, |
| port: u16, |
| ) -> Result<impl Iterator<Item = SocketAddr>, io::Error> { |
| let proxy = name_lookup.connect()?; |
| let LookupResult { addresses, .. } = proxy |
| .lookup_ip( |
| host, |
| &LookupIpOptions { |
| ipv4_lookup: Some(true), |
| ipv6_lookup: Some(true), |
| sort_addresses: Some(true), |
| ..Default::default() |
| }, |
| ) |
| .await |
| .map_err(|err| { |
| io::Error::new( |
| io::ErrorKind::Other, |
| format!("failed to call NameProvider.LookupIp: {}", err), |
| ) |
| })? |
| .map_err(|err| { |
| // Match stdlib's behavior, which maps all GAI errors but EAI_SYSTEM |
| // to io::ErrorKind::Other. |
| io::Error::new( |
| io::ErrorKind::Other, |
| format!("NameProvider.LookupIp failure: {:?}", err), |
| ) |
| })?; |
| |
| Ok(addresses |
| .ok_or_else(|| { |
| io::Error::new(io::ErrorKind::Other, "addresses not provided in NameProvider response") |
| })? |
| .into_iter() |
| .map(move |addr| { |
| let fidl_fuchsia_net_ext::IpAddress(addr) = addr.into(); |
| SocketAddr::new(addr, port) |
| })) |
| } |
| |
| async fn parse_ip_addr_with_provider( |
| provider: &impl ProviderConnector, |
| host: &str, |
| port: u16, |
| ) -> Result<Option<SocketAddr>, io::Error> { |
| parse_ip_addr(host, port, |zone_id| async { |
| let proxy = provider.connect()?; |
| let id = proxy |
| .interface_name_to_index(zone_id) |
| .await |
| .map_err(|err| { |
| io::Error::new( |
| io::ErrorKind::Other, |
| format!("failed to get interface index from socket provider: {}", err), |
| ) |
| })? |
| .map_err(|status| zx::Status::from_raw(status).into_io_error())?; |
| |
| // SocketAddrV6 only works with 32 bit scope ids. |
| u32::try_from(id).map_err(|TryFromIntError { .. }| { |
| io::Error::new(io::ErrorKind::Other, "interface index too large to convert to scope_id") |
| }) |
| }) |
| .await |
| } |
| |
| #[cfg(test)] |
| mod test { |
| use { |
| super::*, |
| crate::*, |
| assert_matches::assert_matches, |
| fidl::endpoints::create_proxy_and_stream, |
| fidl_fuchsia_net_name::{LookupError, LookupRequest}, |
| fidl_fuchsia_posix_socket::ProviderRequest, |
| fuchsia_async::{self as fasync, net::TcpListener, LocalExecutor}, |
| futures::prelude::*, |
| std::cell::RefCell, |
| }; |
| |
| struct PanicConnector; |
| |
| impl ProviderConnector for PanicConnector { |
| fn connect(&self) -> Result<ProviderProxy, io::Error> { |
| panic!("should not be trying to talk to the Provider service") |
| } |
| } |
| |
| #[test] |
| fn can_create_client() { |
| let _exec = LocalExecutor::new(); |
| let _client = new_client(); |
| } |
| |
| #[test] |
| fn can_create_https_client() { |
| let _exec = LocalExecutor::new(); |
| let _client = new_https_client(); |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn hyper_connector_sets_tcp_options() { |
| let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0); |
| let listener = TcpListener::bind(&addr).unwrap(); |
| let addr = listener.local_addr().unwrap(); |
| |
| let idle = std::time::Duration::from_secs(36); |
| let interval = std::time::Duration::from_secs(47); |
| let count = 58; |
| let uri = format!("https://{}", addr).parse::<hyper::Uri>().unwrap(); |
| let (TcpStream { stream }, _server) = future::try_join( |
| HyperConnector::from_tcp_options(TcpOptions { |
| keepalive_idle: Some(idle), |
| keepalive_interval: Some(interval), |
| keepalive_count: Some(count), |
| ..Default::default() |
| }) |
| .call(uri), |
| listener.accept_stream().try_next(), |
| ) |
| .await |
| .unwrap(); |
| |
| let stream = socket2::SockRef::from(stream.std()); |
| |
| assert_matches!(stream.keepalive(), Ok(v) if v); |
| assert_matches!(stream.keepalive_time(), Ok(v) if v == idle); |
| assert_matches!(stream.keepalive_interval(), Ok(v) if v == interval); |
| assert_matches!(stream.keepalive_retries(), Ok(v) if v == count); |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_parse_ipv6_addr_with_provider() { |
| let expected = "fe80::1:2:3:4".parse::<Ipv6Addr>().unwrap(); |
| |
| assert_matches!( |
| parse_ip_addr_with_provider(&PanicConnector, "[fe80::1:2:3:4%250]", 8080).await, |
| Ok(Some(addr)) if addr == SocketAddr::V6(SocketAddrV6::new(expected, 8080, 0, 0)) |
| ); |
| |
| assert_matches!( |
| parse_ip_addr_with_provider(&PanicConnector, "[fe80::1:2:3:4%252]", 8080).await, |
| Ok(Some(addr)) if addr == SocketAddr::V6(SocketAddrV6::new(expected, 8080, 0, 2)) |
| ); |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_parse_ipv6_addr_with_provider_supports_interface_names() { |
| let connector = RealServiceConnector::new(); |
| let expected = "fe80::1:2:3:4".parse::<Ipv6Addr>().unwrap(); |
| |
| assert_matches!( |
| parse_ip_addr_with_provider(&connector, "[fe80::1:2:3:4%25lo]", 8080).await, |
| Ok(Some(addr)) if addr == SocketAddr::V6(SocketAddrV6::new(expected, 8080, 0, 1)) |
| ); |
| |
| assert_matches!( |
| parse_ip_addr_with_provider(&connector, "[fe80::1:2:3:4%25]", 8080).await, |
| Err(err) if err.kind() == io::ErrorKind::NotFound |
| ); |
| |
| assert_matches!( |
| parse_ip_addr_with_provider(&connector, "[fe80::1:2:3:4%25unknownif]", 8080).await, |
| Err(err) if err.kind() == io::ErrorKind::NotFound |
| ); |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_parse_ipv6_addr_handles_connection_errors() { |
| struct ErrorConnector; |
| |
| impl ProviderConnector for ErrorConnector { |
| fn connect(&self) -> Result<ProviderProxy, io::Error> { |
| Err(io::Error::new(io::ErrorKind::Other, "something bad happened")) |
| } |
| } |
| |
| assert_matches!(parse_ip_addr_with_provider(&ErrorConnector, "[fe80::1:2:3:4%25lo]", 8080).await, |
| Err(err) if err.kind() == io::ErrorKind::Other); |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_parse_ipv6_addr_handles_large_interface_indices() { |
| let (proxy, mut stream) = create_proxy_and_stream::<ProviderMarker>().unwrap(); |
| |
| let provider_fut = async move { |
| while let Some(req) = stream.try_next().await.unwrap_or(None) { |
| match req { |
| ProviderRequest::InterfaceNameToIndex { name: _, responder } => { |
| responder.send(Ok(u64::MAX)).unwrap() |
| } |
| _ => panic!("unexpected request"), |
| } |
| } |
| }; |
| |
| struct ErrorConnector { |
| proxy: RefCell<Option<ProviderProxy>>, |
| } |
| |
| impl ProviderConnector for ErrorConnector { |
| fn connect(&self) -> Result<ProviderProxy, io::Error> { |
| let proxy = self.proxy.borrow_mut().take().unwrap(); |
| Ok(proxy) |
| } |
| } |
| |
| let connector = ErrorConnector { proxy: RefCell::new(Some(proxy)) }; |
| |
| let parse_ip_fut = parse_ip_addr_with_provider(&connector, "[fe80::1:2:3:4%25lo]", 8080); |
| |
| // Join the two futures to make sure they both complete. |
| let ((), res) = future::join(provider_fut, parse_ip_fut).await; |
| |
| assert_matches!(res, Err(err) if err.kind() == io::ErrorKind::Other); |
| } |
| |
| struct ProxyConnector<T> { |
| proxy: T, |
| } |
| |
| impl LookupConnector for ProxyConnector<LookupProxy> { |
| fn connect(&self) -> Result<LookupProxy, io::Error> { |
| Ok(self.proxy.clone()) |
| } |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_resolve_ip_addr() { |
| let (sender, receiver) = |
| futures::channel::mpsc::unbounded::<Result<LookupResult, LookupError>>(); |
| let (proxy, stream) = create_proxy_and_stream::<LookupMarker>() |
| .expect("failed to create Lookup proxy and stream"); |
| const TEST_HOSTNAME: &'static str = "foobar.com"; |
| let name_lookup_fut = stream.zip(receiver).for_each(|(req, rsp)| match req { |
| Ok(LookupRequest::LookupIp { hostname, options, responder }) => { |
| assert_eq!(hostname.as_str(), TEST_HOSTNAME); |
| assert_eq!( |
| options, |
| LookupIpOptions { |
| ipv4_lookup: Some(true), |
| ipv6_lookup: Some(true), |
| sort_addresses: Some(true), |
| ..Default::default() |
| } |
| ); |
| let rsp = rsp.as_ref().map_err(|e| *e); |
| futures::future::ready(responder.send(rsp).expect("failed to send FIDL response")) |
| } |
| req => panic!("unexpected item in request stream {:?}", req), |
| }); |
| |
| let connector = ProxyConnector { proxy }; |
| |
| let ip_v4 = Ipv4Addr::LOCALHOST.into(); |
| let ip_v6 = Ipv6Addr::LOCALHOST.into(); |
| const PORT1: u16 = 1234; |
| const PORT2: u16 = 4321; |
| |
| let test_fut = async move { |
| // Test expectation's error variant is a tuple of the lookup error |
| // to inject and the expected io error kind returned. |
| type Expectation = Result<Vec<std::net::IpAddr>, (LookupError, io::ErrorKind)>; |
| let test_resolve = |port, expect: Expectation| { |
| let fidl_response = expect |
| .clone() |
| .map(|addrs| LookupResult { |
| addresses: Some( |
| addrs |
| .into_iter() |
| .map(|std| fidl_fuchsia_net_ext::IpAddress(std).into()) |
| .collect(), |
| ), |
| ..Default::default() |
| }) |
| .map_err(|(fidl_err, _io_err)| fidl_err); |
| let expect = expect |
| .map(|addrs| { |
| addrs.into_iter().map(|addr| SocketAddr::new(addr, port)).collect() |
| }) |
| .map_err(|(_fidl_err, io_err)| io_err); |
| let () = sender.unbounded_send(fidl_response).expect("failed to send expectation"); |
| resolve_ip_addr(&connector, TEST_HOSTNAME, port) |
| .map_ok(Iterator::collect::<Vec<_>>) |
| // Map IO error to kind so we can do equality. |
| .map_err(|err| err.kind()) |
| .map(move |result| { |
| assert_eq!(result, expect); |
| }) |
| }; |
| let () = test_resolve(PORT1, Ok(vec![ip_v4])).await; |
| let () = test_resolve(PORT2, Ok(vec![ip_v6])).await; |
| let () = test_resolve(PORT1, Ok(vec![ip_v4, ip_v6])).await; |
| let () = test_resolve(PORT1, Err((LookupError::NotFound, io::ErrorKind::Other))).await; |
| }; |
| |
| let ((), ()) = futures::future::join(name_lookup_fut, test_fut).await; |
| } |
| } |