blob: 79373119ff9fd04a971971f9cf2f40e045b0450b [file] [log] [blame]
// Copyright (C) 2020, Cloudflare, Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright notice,
// this list of conditions and the following disclaimer.
//
// * Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
use crate::args::*;
use crate::common::*;
use std::net::ToSocketAddrs;
use std::io::prelude::*;
use std::rc::Rc;
use std::cell::RefCell;
use ring::rand::*;
const MAX_DATAGRAM_SIZE: usize = 1350;
#[derive(Debug)]
pub enum ClientError {
HandshakeFail,
HttpFail,
Other(String),
}
pub fn connect(
args: ClientArgs, conn_args: CommonArgs,
output_sink: impl FnMut(String) + 'static,
) -> Result<(), ClientError> {
let mut buf = [0; 65535];
let mut out = [0; MAX_DATAGRAM_SIZE];
let output_sink =
Rc::new(RefCell::new(output_sink)) as Rc<RefCell<dyn FnMut(_)>>;
// Setup the event loop.
let mut poll = mio::Poll::new().unwrap();
let mut events = mio::Events::with_capacity(1024);
// We'll only connect to the first server provided in URL list.
let connect_url = &args.urls[0];
// Resolve server address.
let peer_addr = if let Some(addr) = &args.connect_to {
addr.parse().expect("--connect-to is expected to be a string containing an IPv4 or IPv6 address with a port. E.g. 192.0.2.0:443")
} else {
connect_url.to_socket_addrs().unwrap().next().unwrap()
};
// Bind to INADDR_ANY or IN6ADDR_ANY depending on the IP family of the
// server address. This is needed on macOS and BSD variants that don't
// support binding to IN6ADDR_ANY for both v4 and v6.
let bind_addr = match peer_addr {
std::net::SocketAddr::V4(_) => format!("0.0.0.0:{}", args.source_port),
std::net::SocketAddr::V6(_) => format!("[::]:{}", args.source_port),
};
// Create the UDP socket backing the QUIC connection, and register it with
// the event loop.
let mut socket =
mio::net::UdpSocket::bind(bind_addr.parse().unwrap()).unwrap();
poll.registry()
.register(&mut socket, mio::Token(0), mio::Interest::READABLE)
.unwrap();
let migrate_socket = if args.perform_migration {
let mut socket =
mio::net::UdpSocket::bind(bind_addr.parse().unwrap()).unwrap();
poll.registry()
.register(&mut socket, mio::Token(1), mio::Interest::READABLE)
.unwrap();
Some(socket)
} else {
None
};
// Create the configuration for the QUIC connection.
let mut config = quiche::Config::new(args.version).unwrap();
config.verify_peer(!args.no_verify);
config.set_application_protos(&conn_args.alpns).unwrap();
config.set_max_idle_timeout(conn_args.idle_timeout);
config.set_max_recv_udp_payload_size(MAX_DATAGRAM_SIZE);
config.set_max_send_udp_payload_size(MAX_DATAGRAM_SIZE);
config.set_initial_max_data(conn_args.max_data);
config.set_initial_max_stream_data_bidi_local(conn_args.max_stream_data);
config.set_initial_max_stream_data_bidi_remote(conn_args.max_stream_data);
config.set_initial_max_stream_data_uni(conn_args.max_stream_data);
config.set_initial_max_streams_bidi(conn_args.max_streams_bidi);
config.set_initial_max_streams_uni(conn_args.max_streams_uni);
config.set_disable_active_migration(!conn_args.enable_active_migration);
config.set_active_connection_id_limit(conn_args.max_active_cids);
config.set_max_connection_window(conn_args.max_window);
config.set_max_stream_window(conn_args.max_stream_window);
let mut keylog = None;
if let Some(keylog_path) = std::env::var_os("SSLKEYLOGFILE") {
let file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(keylog_path)
.unwrap();
keylog = Some(file);
config.log_keys();
}
if conn_args.no_grease {
config.grease(false);
}
if conn_args.early_data {
config.enable_early_data();
}
config
.set_cc_algorithm_name(&conn_args.cc_algorithm)
.unwrap();
if conn_args.disable_hystart {
config.enable_hystart(false);
}
if conn_args.dgrams_enabled {
config.enable_dgram(true, 1000, 1000);
}
let mut http_conn: Option<Box<dyn HttpConn>> = None;
let mut app_proto_selected = false;
// Generate a random source connection ID for the connection.
let mut scid = [0; quiche::MAX_CONN_ID_LEN];
let rng = SystemRandom::new();
rng.fill(&mut scid[..]).unwrap();
let scid = quiche::ConnectionId::from_ref(&scid);
let local_addr = socket.local_addr().unwrap();
// Create a QUIC connection and initiate handshake.
let mut conn = quiche::connect(
connect_url.domain(),
&scid,
local_addr,
peer_addr,
&mut config,
)
.unwrap();
if let Some(keylog) = &mut keylog {
if let Ok(keylog) = keylog.try_clone() {
conn.set_keylog(Box::new(keylog));
}
}
// Only bother with qlog if the user specified it.
#[cfg(feature = "qlog")]
{
if let Some(dir) = std::env::var_os("QLOGDIR") {
let id = format!("{scid:?}");
let writer = make_qlog_writer(&dir, "client", &id);
conn.set_qlog(
std::boxed::Box::new(writer),
"quiche-client qlog".to_string(),
format!("{} id={}", "quiche-client qlog", id),
);
}
}
if let Some(session_file) = &args.session_file {
if let Ok(session) = std::fs::read(session_file) {
conn.set_session(&session).ok();
}
}
info!(
"connecting to {:} from {:} with scid {:?}",
peer_addr,
socket.local_addr().unwrap(),
scid,
);
let (write, send_info) = conn.send(&mut out).expect("initial send failed");
while let Err(e) = socket.send_to(&out[..write], send_info.to) {
if e.kind() == std::io::ErrorKind::WouldBlock {
trace!(
"{} -> {}: send() would block",
socket.local_addr().unwrap(),
send_info.to
);
continue;
}
return Err(ClientError::Other(format!("send() failed: {e:?}")));
}
trace!("written {}", write);
let app_data_start = std::time::Instant::now();
let mut pkt_count = 0;
let mut scid_sent = false;
let mut new_path_probed = false;
let mut migrated = false;
loop {
if !conn.is_in_early_data() || app_proto_selected {
poll.poll(&mut events, conn.timeout()).unwrap();
}
// If the event loop reported no events, it means that the timeout
// has expired, so handle it without attempting to read packets. We
// will then proceed with the send loop.
if events.is_empty() {
trace!("timed out");
conn.on_timeout();
}
// Read incoming UDP packets from the socket and feed them to quiche,
// until there are no more packets to read.
for event in &events {
let socket = match event.token() {
mio::Token(0) => &socket,
mio::Token(1) => migrate_socket.as_ref().unwrap(),
_ => unreachable!(),
};
let local_addr = socket.local_addr().unwrap();
'read: loop {
let (len, from) = match socket.recv_from(&mut buf) {
Ok(v) => v,
Err(e) => {
// There are no more UDP packets to read on this socket.
// Process subsequent events.
if e.kind() == std::io::ErrorKind::WouldBlock {
trace!("{}: recv() would block", local_addr);
break 'read;
}
return Err(ClientError::Other(format!(
"{local_addr}: recv() failed: {e:?}"
)));
},
};
trace!("{}: got {} bytes", local_addr, len);
if let Some(target_path) = conn_args.dump_packet_path.as_ref() {
let path = format!("{target_path}/{pkt_count}.pkt");
if let Ok(f) = std::fs::File::create(path) {
let mut f = std::io::BufWriter::new(f);
f.write_all(&buf[..len]).ok();
}
}
pkt_count += 1;
let recv_info = quiche::RecvInfo {
to: local_addr,
from,
};
// Process potentially coalesced packets.
let read = match conn.recv(&mut buf[..len], recv_info) {
Ok(v) => v,
Err(e) => {
error!("{}: recv failed: {:?}", local_addr, e);
continue 'read;
},
};
trace!("{}: processed {} bytes", local_addr, read);
}
}
trace!("done reading");
if conn.is_closed() {
info!(
"connection closed, {:?} {:?}",
conn.stats(),
conn.path_stats().collect::<Vec<quiche::PathStats>>()
);
if !conn.is_established() {
error!(
"connection timed out after {:?}",
app_data_start.elapsed(),
);
return Err(ClientError::HandshakeFail);
}
if let Some(session_file) = &args.session_file {
if let Some(session) = conn.session() {
std::fs::write(session_file, session).ok();
}
}
if let Some(h_conn) = http_conn {
if h_conn.report_incomplete(&app_data_start) {
return Err(ClientError::HttpFail);
}
}
break;
}
// Create a new application protocol session once the QUIC connection is
// established.
if (conn.is_established() || conn.is_in_early_data()) &&
(!args.perform_migration || migrated) &&
!app_proto_selected
{
// At this stage the ALPN negotiation succeeded and selected a
// single application protocol name. We'll use this to construct
// the correct type of HttpConn but `application_proto()`
// returns a slice, so we have to convert it to a str in order
// to compare to our lists of protocols. We `unwrap()` because
// we need the value and if something fails at this stage, there
// is not much anyone can do to recover.
let app_proto = conn.application_proto();
if alpns::HTTP_09.contains(&app_proto) {
http_conn = Some(Http09Conn::with_urls(
&args.urls,
args.reqs_cardinal,
Rc::clone(&output_sink),
));
app_proto_selected = true;
} else if alpns::HTTP_3.contains(&app_proto) {
let dgram_sender = if conn_args.dgrams_enabled {
Some(Http3DgramSender::new(
conn_args.dgram_count,
conn_args.dgram_data.clone(),
0,
))
} else {
None
};
http_conn = Some(Http3Conn::with_urls(
&mut conn,
&args.urls,
args.reqs_cardinal,
&args.req_headers,
&args.body,
&args.method,
args.send_priority_update,
conn_args.max_field_section_size,
conn_args.qpack_max_table_capacity,
conn_args.qpack_blocked_streams,
args.dump_json,
dgram_sender,
Rc::clone(&output_sink),
));
app_proto_selected = true;
}
}
// If we have an HTTP connection, first issue the requests then
// process received data.
if let Some(h_conn) = http_conn.as_mut() {
h_conn.send_requests(&mut conn, &args.dump_response_path);
h_conn.handle_responses(&mut conn, &mut buf, &app_data_start);
}
// Handle path events.
while let Some(qe) = conn.path_event_next() {
match qe {
quiche::PathEvent::New(..) => unreachable!(),
quiche::PathEvent::Validated(local_addr, peer_addr) => {
info!(
"Path ({}, {}) is now validated",
local_addr, peer_addr
);
conn.migrate(local_addr, peer_addr).unwrap();
migrated = true;
},
quiche::PathEvent::FailedValidation(local_addr, peer_addr) => {
info!(
"Path ({}, {}) failed validation",
local_addr, peer_addr
);
},
quiche::PathEvent::Closed(local_addr, peer_addr) => {
info!(
"Path ({}, {}) is now closed and unusable",
local_addr, peer_addr
);
},
quiche::PathEvent::ReusedSourceConnectionId(
cid_seq,
old,
new,
) => {
info!(
"Peer reused cid seq {} (initially {:?}) on {:?}",
cid_seq, old, new
);
},
quiche::PathEvent::PeerMigrated(..) => unreachable!(),
}
}
// See whether source Connection IDs have been retired.
while let Some(retired_scid) = conn.retired_scid_next() {
info!("Retiring source CID {:?}", retired_scid);
}
// Provides as many CIDs as possible.
while conn.source_cids_left() > 0 {
let (scid, reset_token) = generate_cid_and_reset_token(&rng);
if conn.new_source_cid(&scid, reset_token, false).is_err() {
break;
}
scid_sent = true;
}
if args.perform_migration &&
!new_path_probed &&
scid_sent &&
conn.available_dcids() > 0
{
let additional_local_addr =
migrate_socket.as_ref().unwrap().local_addr().unwrap();
conn.probe_path(additional_local_addr, peer_addr).unwrap();
new_path_probed = true;
}
// Generate outgoing QUIC packets and send them on the UDP socket, until
// quiche reports that there are no more packets to be sent.
let mut sockets = vec![&socket];
if let Some(migrate_socket) = migrate_socket.as_ref() {
sockets.push(migrate_socket);
}
for socket in sockets {
let local_addr = socket.local_addr().unwrap();
for peer_addr in conn.paths_iter(local_addr) {
loop {
let (write, send_info) = match conn.send_on_path(
&mut out,
Some(local_addr),
Some(peer_addr),
) {
Ok(v) => v,
Err(quiche::Error::Done) => {
trace!(
"{} -> {}: done writing",
local_addr,
peer_addr
);
break;
},
Err(e) => {
error!(
"{} -> {}: send failed: {:?}",
local_addr, peer_addr, e
);
conn.close(false, 0x1, b"fail").ok();
break;
},
};
if let Err(e) = socket.send_to(&out[..write], send_info.to) {
if e.kind() == std::io::ErrorKind::WouldBlock {
trace!(
"{} -> {}: send() would block",
local_addr,
send_info.to
);
break;
}
return Err(ClientError::Other(format!(
"{} -> {}: send() failed: {:?}",
local_addr, send_info.to, e
)));
}
trace!(
"{} -> {}: written {}",
local_addr,
send_info.to,
write
);
}
}
}
if conn.is_closed() {
info!(
"connection closed, {:?} {:?}",
conn.stats(),
conn.path_stats().collect::<Vec<quiche::PathStats>>()
);
if !conn.is_established() {
error!(
"connection timed out after {:?}",
app_data_start.elapsed(),
);
return Err(ClientError::HandshakeFail);
}
if let Some(session_file) = &args.session_file {
if let Some(session) = conn.session() {
std::fs::write(session_file, session).ok();
}
}
if let Some(h_conn) = http_conn {
if h_conn.report_incomplete(&app_data_start) {
return Err(ClientError::HttpFail);
}
}
break;
}
}
Ok(())
}