blob: e65e19a4cba418a582af4aa4a6cbe2ba8992abaf [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
futures::{
future::{Future, FutureExt},
io::{self, AsyncRead, AsyncWrite},
task::{Context, Poll},
},
hyper::{
client::{
connect::{Connected, Connection},
Client,
},
Body,
},
std::{
marker::PhantomData,
net::{
AddrParseError, Ipv4Addr, Ipv6Addr, Shutdown, SocketAddr, SocketAddrV4, SocketAddrV6,
},
num::ParseIntError,
pin::Pin,
},
tokio::io::ReadBuf,
};
#[cfg(not(target_os = "fuchsia"))]
use async_net as net;
#[cfg(target_os = "fuchsia")]
use fuchsia_async::net;
#[cfg(not(target_os = "fuchsia"))]
mod not_fuchsia;
#[cfg(not(target_os = "fuchsia"))]
pub use not_fuchsia::*;
#[cfg(target_os = "fuchsia")]
mod fuchsia;
#[cfg(target_os = "fuchsia")]
pub use crate::fuchsia::*;
mod session_cache;
pub use session_cache::C4CapableSessionCache;
#[cfg(target_os = "fuchsia")]
mod happy_eyeballs;
/// A Fuchsia-compatible hyper client configured for making HTTP requests.
pub type HttpClient = Client<HyperConnector, Body>;
/// A Fuchsia-compatible hyper client configured for making HTTP and HTTPS requests.
pub type HttpsClient = Client<hyper_rustls::HttpsConnector<HyperConnector>, Body>;
/// A trait to implement a builder for a Fuchsia compatible hyper client
/// configured for either only HTTP or both HTTP and HTTPS requests.
pub trait MakeClientBuilder: Sized {
fn builder() -> HttpClientBuilder<Self> {
HttpClientBuilder::default()
}
}
impl MakeClientBuilder for HttpClient {}
impl MakeClientBuilder for HttpsClient {}
/// A future that yields a hyper-compatible TCP stream.
#[must_use = "futures do nothing unless polled"]
pub struct HyperConnectorFuture {
// FIXME(https://github.com/rust-lang/rust/issues/63063): We should be able to remove this
// `Box` once rust allows impl Traits in type aliases.
fut: Pin<Box<dyn Future<Output = Result<TcpStream, io::Error>> + Send>>,
}
impl Future for HyperConnectorFuture {
type Output = Result<TcpStream, io::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.fut.as_mut().poll(cx)
}
}
pub struct TcpStream {
pub stream: net::TcpStream,
}
impl tokio::io::AsyncRead for TcpStream {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
Pin::new(&mut self.stream).poll_read(cx, buf.initialize_unfilled()).map_ok(|sz| {
buf.advance(sz);
()
})
}
// TODO: override poll_read_buf and call readv on the underlying stream
}
impl tokio::io::AsyncWrite for TcpStream {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut self.stream).poll_write(cx, buf)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.get_mut().stream).poll_flush(cx)
}
fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(self.get_mut().stream.shutdown(Shutdown::Write))
}
// TODO: override poll_write_buf and call writev on the underlying stream
}
impl Connection for TcpStream {
fn connected(&self) -> Connected {
Connected::new()
}
}
#[non_exhaustive]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
/// A container of TCP settings to be applied to the sockets created by the hyper client.
pub struct TcpOptions {
/// This sets TCP_KEEPIDLE and SO_KEEPALIVE.
pub keepalive_idle: Option<std::time::Duration>,
/// This sets TCP_KEEPINTVL and SO_KEEPALIVE.
pub keepalive_interval: Option<std::time::Duration>,
/// This sets TCP_KEEPCNT and SO_KEEPALIVE.
pub keepalive_count: Option<u32>,
}
impl TcpOptions {
/// keepalive_timeout returns a TCP keepalive policy that times out after the specified
/// duration. The keepalive policy returned waits for half of the supplied duration before
/// sending keepalive packets, and attempts to keep the connection alive three times for
/// the remaining period.
///
/// If the supplied duration does not contain at least one whole second, no TCP keepalive
/// policy is returned.
pub fn keepalive_timeout(dur: std::time::Duration) -> Self {
if dur.as_secs() == 0 {
return TcpOptions::default();
}
TcpOptions {
keepalive_idle: dur.checked_div(2),
keepalive_interval: dur.checked_div(6),
keepalive_count: Some(3),
}
}
pub(crate) fn apply<T: std::os::fd::AsFd>(&self, stream: &T) -> io::Result<()> {
let stream = socket2::SockRef::from(stream);
let mut any = false;
let mut keepalive = socket2::TcpKeepalive::new();
if let Some(idle) = self.keepalive_idle {
any = true;
keepalive = keepalive.with_time(idle);
};
if let Some(interval) = self.keepalive_interval {
any = true;
keepalive = keepalive.with_interval(interval);
}
if let Some(count) = self.keepalive_count {
any = true;
keepalive = keepalive.with_retries(count);
}
if any {
stream.set_tcp_keepalive(&keepalive)
} else {
Ok(())
}
}
}
/// Extra socket options to ensure that requests are made through a particular
/// device or over a particular IP domain.
#[derive(Clone, Debug, Default)]
pub struct SocketOptions {
/// Specifies, as a string, the device that the created socket should bind to.
pub bind_device: Option<String>,
}
#[derive(Clone)]
pub struct Executor;
impl<F: Future + Send + 'static> hyper::rt::Executor<F> for Executor {
fn execute(&self, fut: F) {
fuchsia_async::Task::spawn(fut.map(|_| ())).detach()
}
}
#[derive(Clone)]
pub struct LocalExecutor;
/// Implements the Builder pattern for constructing HTTP or HTTPS clients.
#[derive(Clone)]
pub struct HttpClientBuilder<T> {
tcp_options: Option<TcpOptions>,
socket_options: Option<SocketOptions>,
tls: Option<rustls::ClientConfig>,
phantom: PhantomData<T>,
}
impl<T> Default for HttpClientBuilder<T> {
fn default() -> Self {
Self {
tcp_options: Default::default(),
socket_options: Default::default(),
tls: Default::default(),
phantom: Default::default(),
}
}
}
impl HttpClientBuilder<HttpClient> {
/// Constructs an HttpClient
pub fn build(mut self) -> HttpClient {
Client::builder().executor(Executor).build(self.connector())
}
}
impl HttpClientBuilder<HttpsClient> {
/// Constructs an HttpsClient
pub fn build(mut self) -> HttpsClient {
let https = hyper_rustls::HttpsConnector::from((
self.connector(),
self.tls.unwrap_or_else(|| {
let mut tls = new_rustls_client_config();
configure_cert_store(&mut tls);
tls
}),
));
Client::builder().executor(Executor).build(https)
}
/// Overrides the default tls `ClientConfig`
pub fn tls(self, tls: rustls::ClientConfig) -> Self {
Self { tls: Some(tls), ..self }
}
}
impl<T> HttpClientBuilder<T> {
fn connector(&mut self) -> HyperConnector {
HyperConnector::from((
self.tcp_options.take().unwrap_or_default(),
self.socket_options.take().unwrap_or_default(),
))
}
/// Sets the TCP options for the underlying HyperConnector
pub fn tcp_options(self, tcp_options: TcpOptions) -> Self {
Self { tcp_options: Some(tcp_options), ..self }
}
/// Sets the SocketOptions for the underlying HyperConnector
pub fn socket_options(self, socket_options: SocketOptions) -> Self {
Self { socket_options: Some(socket_options), ..self }
}
}
impl<F: Future + 'static> hyper::rt::Executor<F> for LocalExecutor {
fn execute(&self, fut: F) {
fuchsia_async::Task::local(fut.map(drop)).detach()
}
}
/// Returns a new Fuchsia-compatible hyper client for making HTTP requests.
pub fn new_client() -> HttpClient {
HttpClient::builder().build()
}
pub fn new_https_client_dangerous(
tls: rustls::ClientConfig,
tcp_options: TcpOptions,
) -> HttpsClient {
HttpsClient::builder().tls(tls).tcp_options(tcp_options).build()
}
/// Returns a new Fuchsia-compatible hyper client for making HTTP and HTTPS requests.
pub fn new_https_client_from_tcp_options(tcp_options: TcpOptions) -> HttpsClient {
HttpsClient::builder().tcp_options(tcp_options).build()
}
/// Returns a new Fuchsia-compatible hyper client for making HTTP and HTTPS requests.
pub fn new_https_client() -> HttpsClient {
HttpsClient::builder().build()
}
/// Returns a rustls::ClientConfig for further construction with improved session cache and without
/// a configured certificate store.
pub fn new_rustls_client_config() -> rustls::ClientConfig {
let mut config = rustls::ClientConfig::new();
// The default depth for the ClientSessionMemoryCache in the default ClientConfig is 32; this
// value is assumed to be a sufficient default here as well.
config.set_persistence(session_cache::C4CapableSessionCache::new(32));
config
}
pub(crate) async fn parse_ip_addr<'a, F, Fut>(
host: &'a str,
port: u16,
interface_name_to_index: F,
) -> Result<Option<SocketAddr>, io::Error>
where
F: Fn(&'a str) -> Fut + 'a,
Fut: Future<Output = Result<u32, io::Error>> + 'a,
{
match host.parse::<Ipv4Addr>() {
Ok(addr) => {
return Ok(Some(SocketAddr::V4(SocketAddrV4::new(addr, port))));
}
Err(AddrParseError { .. }) => {}
}
// IPv6 literals are always enclosed in [].
if !host.starts_with("[") || !host.ends_with(']') {
return Ok(None);
}
let host = &host[1..host.len() - 1];
// IPv6 addresses with zones always contain "%25", which is "%" URL encoded.
let (host, zone_id) = if let Some((host, zone_id)) = host.split_once("%25") {
(host, Some(zone_id))
} else {
(host, None)
};
let addr = match host.parse::<Ipv6Addr>() {
Ok(addr) => addr,
Err(AddrParseError { .. }) => {
return Ok(None);
}
};
let scope_id = if let Some(zone_id) = zone_id {
// rfc6874 section 4 states:
//
// The security considerations from the URI syntax specification
// [RFC3986] and the IPv6 Scoped Address Architecture specification
// [RFC4007] apply. In particular, this URI format creates a specific
// pathway by which a deceitful zone index might be communicated, as
// mentioned in the final security consideration of the Scoped Address
// Architecture specification. It is emphasised that the format is
// intended only for debugging purposes, but of course this intention
// does not prevent misuse.
//
// To limit this risk, implementations MUST NOT allow use of this format
// except for well-defined usages, such as sending to link-local
// addresses under prefix fe80::/10. At the time of writing, this is
// the only well-defined usage known.
//
// Since the only known use-case of IPv6 Zone Identifiers on Fuchsia is to communicate
// with link-local devices, restrict addresses to link-local zone identifiers.
//
// TODO: use Ipv6Addr::is_unicast_link_local_strict when available in stable rust.
if addr.segments()[..4] != [0xfe80, 0, 0, 0] {
return Err(io::Error::new(
io::ErrorKind::Other,
"zone_id is only usable with link local addresses",
));
}
// TODO: validate that the value matches rfc6874 grammar `ZoneID = 1*( unreserved / pct-encoded )`.
match zone_id.parse::<u32>() {
Ok(scope_id) => scope_id,
Err(ParseIntError { .. }) => interface_name_to_index(zone_id).await?,
}
} else {
0
};
Ok(Some(SocketAddr::V6(SocketAddrV6::new(addr, port, 0, scope_id))))
}
#[cfg(target_os = "fuchsia")]
pub(crate) fn connect_and_bind_device<D: AsRef<[u8]>>(
addr: SocketAddr,
bind_device: Option<D>,
) -> io::Result<net::TcpConnector> {
let socket = socket2::Socket::new(
match addr {
SocketAddr::V4(_) => socket2::Domain::IPV4,
SocketAddr::V6(_) => socket2::Domain::IPV6,
},
socket2::Type::STREAM,
Some(socket2::Protocol::TCP),
)?;
if let Some(bind_device) = bind_device {
socket.bind_device(Some(bind_device.as_ref()))?;
}
net::TcpStream::connect_from_raw(socket, addr)
}
#[cfg(test)]
mod tests {
use {
super::*,
assert_matches::assert_matches,
fuchsia_async::{self as fasync},
};
async fn unsupported(_name: &str) -> Result<u32, io::Error> {
panic!("should not have happened")
}
#[fasync::run_singlethreaded(test)]
async fn test_parse_ipv4_addr() {
let expected = "1.2.3.4:8080".parse::<SocketAddr>().unwrap();
assert_matches!(
parse_ip_addr("1.2.3.4", 8080, unsupported).await,
Ok(Some(addr)) if addr == expected);
}
#[fasync::run_singlethreaded(test)]
async fn test_parse_invalid_addresses() {
assert_matches!(parse_ip_addr("1.2.3", 8080, unsupported).await, Ok(None));
assert_matches!(parse_ip_addr("1.2.3.4.5", 8080, unsupported).await, Ok(None));
assert_matches!(parse_ip_addr("localhost", 8080, unsupported).await, Ok(None));
assert_matches!(parse_ip_addr("[fe80::1:2:3:4", 8080, unsupported).await, Ok(None));
assert_matches!(parse_ip_addr("[[fe80::1:2:3:4]", 8080, unsupported).await, Ok(None));
assert_matches!(parse_ip_addr("[]", 8080, unsupported).await, Ok(None));
}
#[fasync::run_singlethreaded(test)]
async fn test_parse_ipv6_addr() {
let expected = "[fe80::1:2:3:4]:8080".parse::<SocketAddr>().unwrap();
assert_matches!(
parse_ip_addr("[fe80::1:2:3:4]", 8080, unsupported).await,
Ok(Some(addr)) if addr == expected
);
}
#[fasync::run_singlethreaded(test)]
async fn test_parse_ipv6_addr_with_zone_must_be_local() {
assert_matches!(
parse_ip_addr("[fe81::1:2:3:4%252]", 8080, unsupported).await,
Err(err) if err.kind() == io::ErrorKind::Other
);
}
}