blob: 555e840d3b1b2d3efb76f9b3e6fa7fd431207827 [file] [log] [blame]
// Copyright (C) 2018-2019, Cloudflare, Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright notice,
// this list of conditions and the following disclaimer.
//
// * Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//! 🥧 Savoury implementation of the QUIC transport protocol and HTTP/3.
//!
//! [quiche] is an implementation of the QUIC transport protocol and HTTP/3 as
//! specified by the [IETF]. It provides a low level API for processing QUIC
//! packets and handling connection state. The application is responsible for
//! providing I/O (e.g. sockets handling) as well as an event loop with support
//! for timers.
//!
//! [quiche]: https://github.com/cloudflare/quiche/
//! [ietf]: https://quicwg.org/
//!
//! ## Connection setup
//!
//! The first step in establishing a QUIC connection using quiche is creating a
//! configuration object:
//!
//! ```
//! let config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
//! # Ok::<(), quiche::Error>(())
//! ```
//!
//! This is shared among multiple connections and can be used to configure a
//! QUIC endpoint.
//!
//! On the client-side the [`connect()`] utility function can be used to create
//! a new connection, while [`accept()`] is for servers:
//!
//! ```
//! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
//! # let server_name = "quic.tech";
//! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
//! // Client connection.
//! let conn = quiche::connect(Some(&server_name), &scid, &mut config)?;
//!
//! // Server connection.
//! let conn = quiche::accept(&scid, None, &mut config)?;
//! # Ok::<(), quiche::Error>(())
//! ```
//!
//! ## Handling incoming packets
//!
//! Using the connection's [`recv()`] method the application can process
//! incoming packets that belong to that connection from the network:
//!
//! ```no_run
//! # let mut buf = [0; 512];
//! # let socket = std::net::UdpSocket::bind("127.0.0.1:0").unwrap();
//! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
//! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
//! # let mut conn = quiche::accept(&scid, None, &mut config)?;
//! loop {
//! let read = socket.recv(&mut buf).unwrap();
//!
//! let read = match conn.recv(&mut buf[..read]) {
//! Ok(v) => v,
//!
//! Err(quiche::Error::Done) => {
//! // Done reading.
//! break;
//! },
//!
//! Err(e) => {
//! // An error occurred, handle it.
//! break;
//! },
//! };
//! }
//! # Ok::<(), quiche::Error>(())
//! ```
//!
//! ## Generating outgoing packets
//!
//! Outgoing packet are generated using the connection's [`send()`] method
//! instead:
//!
//! ```no_run
//! # let mut out = [0; 512];
//! # let socket = std::net::UdpSocket::bind("127.0.0.1:0").unwrap();
//! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
//! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
//! # let mut conn = quiche::accept(&scid, None, &mut config)?;
//! loop {
//! let write = match conn.send(&mut out) {
//! Ok(v) => v,
//!
//! Err(quiche::Error::Done) => {
//! // Done writing.
//! break;
//! },
//!
//! Err(e) => {
//! // An error occurred, handle it.
//! break;
//! },
//! };
//!
//! socket.send(&out[..write]).unwrap();
//! }
//! # Ok::<(), quiche::Error>(())
//! ```
//!
//! When packets are sent, the application is responsible for maintaining a
//! timer to react to time-based connection events. The timer expiration can be
//! obtained using the connection's [`timeout()`] method.
//!
//! ```
//! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
//! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
//! # let mut conn = quiche::accept(&scid, None, &mut config)?;
//! let timeout = conn.timeout();
//! # Ok::<(), quiche::Error>(())
//! ```
//!
//! The application is responsible for providing a timer implementation, which
//! can be specific to the operating system or networking framework used. When
//! a timer expires, the connection's [`on_timeout()`] method should be called,
//! after which additional packets might need to be sent on the network:
//!
//! ```no_run
//! # let mut out = [0; 512];
//! # let socket = std::net::UdpSocket::bind("127.0.0.1:0").unwrap();
//! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
//! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
//! # let mut conn = quiche::accept(&scid, None, &mut config)?;
//! // Timeout expired, handle it.
//! conn.on_timeout();
//!
//! // Send more packets as needed after timeout.
//! loop {
//! let write = match conn.send(&mut out) {
//! Ok(v) => v,
//!
//! Err(quiche::Error::Done) => {
//! // Done writing.
//! break;
//! },
//!
//! Err(e) => {
//! // An error occurred, handle it.
//! break;
//! },
//! };
//!
//! socket.send(&out[..write]).unwrap();
//! }
//! # Ok::<(), quiche::Error>(())
//! ```
//!
//! ## Sending and receiving stream data
//!
//! After some back and forth, the connection will complete its handshake and
//! will be ready for sending or receiving application data.
//!
//! Data can be sent on a stream by using the [`stream_send()`] method:
//!
//! ```no_run
//! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
//! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
//! # let mut conn = quiche::accept(&scid, None, &mut config)?;
//! if conn.is_established() {
//! // Handshake completed, send some data on stream 0.
//! conn.stream_send(0, b"hello", true)?;
//! }
//! # Ok::<(), quiche::Error>(())
//! ```
//!
//! The application can check whether there are any readable streams by using
//! the connection's [`readable()`] method, which returns an iterator over all
//! the streams that have outstanding data to read.
//!
//! The [`stream_recv()`] method can then be used to retrieve the application
//! data from the readable stream:
//!
//! ```no_run
//! # let mut buf = [0; 512];
//! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
//! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
//! # let mut conn = quiche::accept(&scid, None, &mut config)?;
//! if conn.is_established() {
//! // Iterate over readable streams.
//! for stream_id in conn.readable() {
//! // Stream is readable, read until there's no more data.
//! while let Ok((read, fin)) = conn.stream_recv(stream_id, &mut buf) {
//! println!("Got {} bytes on stream {}", read, stream_id);
//! }
//! }
//! }
//! # Ok::<(), quiche::Error>(())
//! ```
//!
//! ## HTTP/3
//!
//! The quiche [HTTP/3 module] provides a high level API for sending and
//! receiving HTTP requests and responses on top of the QUIC transport protocol.
//!
//! [`connect()`]: fn.connect.html
//! [`accept()`]: fn.accept.html
//! [`recv()`]: struct.Connection.html#method.recv
//! [`send()`]: struct.Connection.html#method.send
//! [`timeout()`]: struct.Connection.html#method.timeout
//! [`on_timeout()`]: struct.Connection.html#method.on_timeout
//! [`stream_send()`]: struct.Connection.html#method.stream_send
//! [`readable()`]: struct.Connection.html#method.readable
//! [`stream_recv()`]: struct.Connection.html#method.stream_recv
//! [HTTP/3 module]: h3/index.html
//!
//! ## Congestion Control
//!
//! The quiche library provides a high-level API for configuring which
//! congestion control algorithm to use throughout the QUIC connection.
//!
//! When a QUIC connection is created, the application can optionally choose
//! which CC algorithm to use. See [`CongestionControlAlgorithm`] for currently
//! available congestion control algorithms.
//!
//! For example:
//!
//! ```
//! let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
//! config.set_cc_algorithm(quiche::CongestionControlAlgorithm::Reno);
//! ```
//!
//! Alternatively, you can configure the congestion control algorithm to use
//! by its name.
//!
//! ```
//! let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
//! config.set_cc_algorithm_name("reno").unwrap();
//! ```
//!
//! Note that the CC algorithm should be configured before calling [`connect()`]
//! or [`accept()`]. Otherwise the connection will use a default CC algorithm.
//!
//! [`CongestionControlAlgorithm`]: enum.CongestionControlAlgorithm.html
#![allow(improper_ctypes)]
#![allow(clippy::suspicious_operation_groupings)]
#![warn(missing_docs)]
#[macro_use]
extern crate log;
use std::cmp;
use std::time;
use std::pin::Pin;
use std::str::FromStr;
use std::sync::Mutex;
/// The current QUIC wire version.
pub const PROTOCOL_VERSION: u32 = PROTOCOL_VERSION_DRAFT29;
/// Supported QUIC versions.
///
/// Note that the older ones might not be fully supported.
const PROTOCOL_VERSION_DRAFT27: u32 = 0xff00_001b;
const PROTOCOL_VERSION_DRAFT28: u32 = 0xff00_001c;
const PROTOCOL_VERSION_DRAFT29: u32 = 0xff00_001d;
/// The maximum length of a connection ID.
pub const MAX_CONN_ID_LEN: usize = crate::packet::MAX_CID_LEN as usize;
/// The minimum length of Initial packets sent by a client.
pub const MIN_CLIENT_INITIAL_LEN: usize = 1200;
#[cfg(not(feature = "fuzzing"))]
const PAYLOAD_MIN_LEN: usize = 4;
#[cfg(feature = "fuzzing")]
// Due to the fact that in fuzzing mode we use a zero-length AEAD tag (which
// would normally be 16 bytes), we need to adjust the minimum payload size to
// account for that.
const PAYLOAD_MIN_LEN: usize = 20;
const MAX_AMPLIFICATION_FACTOR: usize = 3;
// The maximum number of tracked packet number ranges that need to be acked.
//
// This represents more or less how many ack blocks can fit in a typical packet.
const MAX_ACK_RANGES: usize = 68;
// The highest possible stream ID allowed.
const MAX_STREAM_ID: u64 = 1 << 60;
// The default max_datagram_size used in congestion control.
const MAX_SEND_UDP_PAYLOAD_SIZE: usize = 1200;
// The default length of DATAGRAM queues.
const DEFAULT_MAX_DGRAM_QUEUE_LEN: usize = 0;
// The DATAGRAM standard recommends either none or 65536 as maximum DATAGRAM
// frames size. We enforce the recommendation for forward compatibility.
const MAX_DGRAM_FRAME_SIZE: u64 = 65536;
// The length of the payload length field.
const PAYLOAD_LENGTH_LEN: usize = 2;
/// A specialized [`Result`] type for quiche operations.
///
/// This type is used throughout quiche's public API for any operation that
/// can produce an error.
///
/// [`Result`]: https://doc.rust-lang.org/std/result/enum.Result.html
pub type Result<T> = std::result::Result<T, Error>;
/// A QUIC error.
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum Error {
/// There is no more work to do.
Done,
/// The provided buffer is too short.
BufferTooShort,
/// The provided packet cannot be parsed because its version is unknown.
UnknownVersion,
/// The provided packet cannot be parsed because it contains an invalid
/// frame.
InvalidFrame,
/// The provided packet cannot be parsed.
InvalidPacket,
/// The operation cannot be completed because the connection is in an
/// invalid state.
InvalidState,
/// The operation cannot be completed because the stream is in an
/// invalid state.
InvalidStreamState,
/// The peer's transport params cannot be parsed.
InvalidTransportParam,
/// A cryptographic operation failed.
CryptoFail,
/// The TLS handshake failed.
TlsFail,
/// The peer violated the local flow control limits.
FlowControl,
/// The peer violated the local stream limits.
StreamLimit,
/// The specified stream was stopped by the peer.
///
/// The error code sent as part of the `STOP_SENDING` frame is provided as
/// associated data.
StreamStopped(u64),
/// The received data exceeds the stream's final size.
FinalSize,
/// Error in congestion control.
CongestionControl,
}
impl Error {
fn to_wire(self) -> u64 {
match self {
Error::Done => 0x0,
Error::InvalidFrame => 0x7,
Error::InvalidStreamState => 0x5,
Error::InvalidTransportParam => 0x8,
Error::FlowControl => 0x3,
Error::StreamLimit => 0x4,
Error::FinalSize => 0x6,
_ => 0xa,
}
}
#[cfg(feature = "ffi")]
fn to_c(self) -> libc::ssize_t {
match self {
Error::Done => -1,
Error::BufferTooShort => -2,
Error::UnknownVersion => -3,
Error::InvalidFrame => -4,
Error::InvalidPacket => -5,
Error::InvalidState => -6,
Error::InvalidStreamState => -7,
Error::InvalidTransportParam => -8,
Error::CryptoFail => -9,
Error::TlsFail => -10,
Error::FlowControl => -11,
Error::StreamLimit => -12,
Error::FinalSize => -13,
Error::CongestionControl => -14,
Error::StreamStopped { .. } => -15,
}
}
}
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{:?}", self)
}
}
impl std::error::Error for Error {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
None
}
}
impl std::convert::From<octets::BufferTooShortError> for Error {
fn from(_err: octets::BufferTooShortError) -> Self {
Error::BufferTooShort
}
}
/// The stream's side to shutdown.
///
/// This should be used when calling [`stream_shutdown()`].
///
/// [`stream_shutdown()`]: struct.Connection.html#method.stream_shutdown
#[repr(C)]
pub enum Shutdown {
/// Stop receiving stream data.
Read = 0,
/// Stop sending stream data.
Write = 1,
}
/// Stores configuration shared between multiple connections.
pub struct Config {
local_transport_params: TransportParams,
version: u32,
// BoringSSL's SSL_CTX structure is technically safe to share across threads
// but once shared, functions that modify it can't be used any more. We can't
// encode that in Rust, so just make it Send+Sync with a mutex to fulfill
// the Sync constraint.
tls_ctx: Mutex<tls::Context>,
application_protos: Vec<Vec<u8>>,
grease: bool,
cc_algorithm: CongestionControlAlgorithm,
hystart: bool,
dgram_recv_max_queue_len: usize,
dgram_send_max_queue_len: usize,
max_send_udp_payload_size: usize,
}
impl Config {
/// Creates a config object with the given version.
///
/// ## Examples:
///
/// ```
/// let config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
/// # Ok::<(), quiche::Error>(())
/// ```
pub fn new(version: u32) -> Result<Config> {
let tls_ctx = Mutex::new(tls::Context::new()?);
Ok(Config {
local_transport_params: TransportParams::default(),
version,
tls_ctx,
application_protos: Vec::new(),
grease: true,
cc_algorithm: CongestionControlAlgorithm::CUBIC,
hystart: true,
dgram_recv_max_queue_len: DEFAULT_MAX_DGRAM_QUEUE_LEN,
dgram_send_max_queue_len: DEFAULT_MAX_DGRAM_QUEUE_LEN,
max_send_udp_payload_size: MAX_SEND_UDP_PAYLOAD_SIZE,
})
}
/// Configures the given certificate chain.
///
/// The content of `file` is parsed as a PEM-encoded leaf certificate,
/// followed by optional intermediate certificates.
///
/// ## Examples:
///
/// ```no_run
/// # let mut config = quiche::Config::new(0xbabababa)?;
/// config.load_cert_chain_from_pem_file("/path/to/cert.pem")?;
/// # Ok::<(), quiche::Error>(())
/// ```
pub fn load_cert_chain_from_pem_file(&mut self, file: &str) -> Result<()> {
self.tls_ctx
.lock()
.unwrap()
.use_certificate_chain_file(file)
}
/// Configures the given private key.
///
/// The content of `file` is parsed as a PEM-encoded private key.
///
/// ## Examples:
///
/// ```no_run
/// # let mut config = quiche::Config::new(0xbabababa)?;
/// config.load_priv_key_from_pem_file("/path/to/key.pem")?;
/// # Ok::<(), quiche::Error>(())
/// ```
pub fn load_priv_key_from_pem_file(&mut self, file: &str) -> Result<()> {
self.tls_ctx.lock().unwrap().use_privkey_file(file)
}
/// Specifies a file where trusted CA certificates are stored for the
/// purposes of certificate verification.
///
/// The content of `file` is parsed as a PEM-encoded certificate chain.
///
/// ## Examples:
///
/// ```no_run
/// # let mut config = quiche::Config::new(0xbabababa)?;
/// config.load_verify_locations_from_file("/path/to/cert.pem")?;
/// # Ok::<(), quiche::Error>(())
/// ```
pub fn load_verify_locations_from_file(&mut self, file: &str) -> Result<()> {
self.tls_ctx
.lock()
.unwrap()
.load_verify_locations_from_file(file)
}
/// Specifies a directory where trusted CA certificates are stored for the
/// purposes of certificate verification.
///
/// The content of `dir` a set of PEM-encoded certificate chains.
///
/// ## Examples:
///
/// ```no_run
/// # let mut config = quiche::Config::new(0xbabababa)?;
/// config.load_verify_locations_from_directory("/path/to/certs")?;
/// # Ok::<(), quiche::Error>(())
/// ```
pub fn load_verify_locations_from_directory(
&mut self, dir: &str,
) -> Result<()> {
self.tls_ctx
.lock()
.unwrap()
.load_verify_locations_from_directory(dir)
}
/// Configures whether to verify the peer's certificate.
///
/// The default value is `true` for client connections, and `false` for
/// server ones.
pub fn verify_peer(&mut self, verify: bool) {
self.tls_ctx.lock().unwrap().set_verify(verify);
}
/// Configures whether to send GREASE values.
///
/// The default value is `true`.
pub fn grease(&mut self, grease: bool) {
self.grease = grease;
}
/// Enables logging of secrets.
///
/// When logging is enabled, the [`set_keylog()`] method must be called on
/// the connection for its cryptographic secrets to be logged in the
/// [keylog] format to the specified writer.
///
/// [`set_keylog()`]: struct.Connection.html#method.set_keylog
/// [keylog]: https://developer.mozilla.org/en-US/docs/Mozilla/Projects/NSS/Key_Log_Format
pub fn log_keys(&mut self) {
self.tls_ctx.lock().unwrap().enable_keylog();
}
/// Enables sending or receiving early data.
pub fn enable_early_data(&mut self) {
self.tls_ctx.lock().unwrap().set_early_data_enabled(true);
}
/// Configures the list of supported application protocols.
///
/// The list of protocols `protos` must be in wire-format (i.e. a series
/// of non-empty, 8-bit length-prefixed strings).
///
/// On the client this configures the list of protocols to send to the
/// server as part of the ALPN extension.
///
/// On the server this configures the list of supported protocols to match
/// against the client-supplied list.
///
/// Applications must set a value, but no default is provided.
///
/// ## Examples:
///
/// ```
/// # let mut config = quiche::Config::new(0xbabababa)?;
/// config.set_application_protos(b"\x08http/1.1\x08http/0.9")?;
/// # Ok::<(), quiche::Error>(())
/// ```
pub fn set_application_protos(&mut self, protos: &[u8]) -> Result<()> {
let mut b = octets::Octets::with_slice(&protos);
let mut protos_list = Vec::new();
while let Ok(proto) = b.get_bytes_with_u8_length() {
protos_list.push(proto.to_vec());
}
self.application_protos = protos_list;
self.tls_ctx
.lock()
.unwrap()
.set_alpn(&self.application_protos)
}
/// Sets the `max_idle_timeout` transport parameter, in milliseconds.
///
/// The default value is infinite, that is, no timeout is used.
pub fn set_max_idle_timeout(&mut self, v: u64) {
self.local_transport_params.max_idle_timeout = v;
}
/// Sets the `max_udp_payload_size transport` parameter.
///
/// The default value is `65527`.
pub fn set_max_recv_udp_payload_size(&mut self, v: usize) {
self.local_transport_params.max_udp_payload_size = v as u64;
}
/// Sets the maximum outgoing UDP payload size.
///
/// The default and minimum value is `1200`.
pub fn set_max_send_udp_payload_size(&mut self, v: usize) {
self.max_send_udp_payload_size = cmp::max(v, MAX_SEND_UDP_PAYLOAD_SIZE);
}
/// Sets the `initial_max_data` transport parameter.
///
/// When set to a non-zero value quiche will only allow at most `v` bytes
/// of incoming stream data to be buffered for the whole connection (that
/// is, data that is not yet read by the application) and will allow more
/// data to be received as the buffer is consumed by the application.
///
/// The default value is `0`.
pub fn set_initial_max_data(&mut self, v: u64) {
self.local_transport_params.initial_max_data = v;
}
/// Sets the `initial_max_stream_data_bidi_local` transport parameter.
///
/// When set to a non-zero value quiche will only allow at most `v` bytes
/// of incoming stream data to be buffered for each locally-initiated
/// bidirectional stream (that is, data that is not yet read by the
/// application) and will allow more data to be received as the buffer is
/// consumed by the application.
///
/// The default value is `0`.
pub fn set_initial_max_stream_data_bidi_local(&mut self, v: u64) {
self.local_transport_params
.initial_max_stream_data_bidi_local = v;
}
/// Sets the `initial_max_stream_data_bidi_remote` transport parameter.
///
/// When set to a non-zero value quiche will only allow at most `v` bytes
/// of incoming stream data to be buffered for each remotely-initiated
/// bidirectional stream (that is, data that is not yet read by the
/// application) and will allow more data to be received as the buffer is
/// consumed by the application.
///
/// The default value is `0`.
pub fn set_initial_max_stream_data_bidi_remote(&mut self, v: u64) {
self.local_transport_params
.initial_max_stream_data_bidi_remote = v;
}
/// Sets the `initial_max_stream_data_uni` transport parameter.
///
/// When set to a non-zero value quiche will only allow at most `v` bytes
/// of incoming stream data to be buffered for each unidirectional stream
/// (that is, data that is not yet read by the application) and will allow
/// more data to be received as the buffer is consumed by the application.
///
/// The default value is `0`.
pub fn set_initial_max_stream_data_uni(&mut self, v: u64) {
self.local_transport_params.initial_max_stream_data_uni = v;
}
/// Sets the `initial_max_streams_bidi` transport parameter.
///
/// When set to a non-zero value quiche will only allow `v` number of
/// concurrent remotely-initiated bidirectional streams to be open at any
/// given time and will increase the limit automatically as streams are
/// completed.
///
/// A bidirectional stream is considered completed when all incoming data
/// has been read by the application (up to the `fin` offset) or the
/// stream's read direction has been shutdown, and all outgoing data has
/// been acked by the peer (up to the `fin` offset) or the stream's write
/// direction has been shutdown.
///
/// The default value is `0`.
pub fn set_initial_max_streams_bidi(&mut self, v: u64) {
self.local_transport_params.initial_max_streams_bidi = v;
}
/// Sets the `initial_max_streams_uni` transport parameter.
///
/// When set to a non-zero value quiche will only allow `v` number of
/// concurrent remotely-initiated unidirectional streams to be open at any
/// given time and will increase the limit automatically as streams are
/// completed.
///
/// A unidirectional stream is considered completed when all incoming data
/// has been read by the application (up to the `fin` offset) or the
/// stream's read direction has been shutdown.
///
/// The default value is `0`.
pub fn set_initial_max_streams_uni(&mut self, v: u64) {
self.local_transport_params.initial_max_streams_uni = v;
}
/// Sets the `ack_delay_exponent` transport parameter.
///
/// The default value is `3`.
pub fn set_ack_delay_exponent(&mut self, v: u64) {
self.local_transport_params.ack_delay_exponent = v;
}
/// Sets the `max_ack_delay` transport parameter.
///
/// The default value is `25`.
pub fn set_max_ack_delay(&mut self, v: u64) {
self.local_transport_params.max_ack_delay = v;
}
/// Sets the `disable_active_migration` transport parameter.
///
/// The default value is `false`.
pub fn set_disable_active_migration(&mut self, v: bool) {
self.local_transport_params.disable_active_migration = v;
}
/// Sets the congestion control algorithm used by string.
///
/// The default value is `reno`. On error `Error::CongestionControl`
/// will be returned.
///
/// ## Examples:
///
/// ```
/// # let mut config = quiche::Config::new(0xbabababa)?;
/// config.set_cc_algorithm_name("reno");
/// # Ok::<(), quiche::Error>(())
/// ```
pub fn set_cc_algorithm_name(&mut self, name: &str) -> Result<()> {
self.cc_algorithm = CongestionControlAlgorithm::from_str(name)?;
Ok(())
}
/// Sets the congestion control algorithm used.
///
/// The default value is `CongestionControlAlgorithm::CUBIC`.
pub fn set_cc_algorithm(&mut self, algo: CongestionControlAlgorithm) {
self.cc_algorithm = algo;
}
/// Configures whether to enable HyStart++.
///
/// The default value is `true`.
pub fn enable_hystart(&mut self, v: bool) {
self.hystart = v;
}
/// Configures whether to enable receiving DATAGRAM frames.
///
/// When enabled, the `max_datagram_frame_size` transport parameter is set
/// to 65536 as recommended by draft-ietf-quic-datagram-01.
///
/// The default is `false`.
pub fn enable_dgram(
&mut self, enabled: bool, recv_queue_len: usize, send_queue_len: usize,
) {
self.local_transport_params.max_datagram_frame_size = if enabled {
Some(MAX_DGRAM_FRAME_SIZE)
} else {
None
};
self.dgram_recv_max_queue_len = recv_queue_len;
self.dgram_send_max_queue_len = send_queue_len;
}
}
/// A QUIC connection.
pub struct Connection {
/// QUIC wire version used for the connection.
version: u32,
/// Peer's connection ID.
dcid: ConnectionId<'static>,
/// Local connection ID.
scid: ConnectionId<'static>,
/// Unique opaque ID for the connection that can be used for logging.
trace_id: String,
/// Packet number spaces.
pkt_num_spaces: [packet::PktNumSpace; packet::EPOCH_COUNT],
/// Peer's transport parameters.
peer_transport_params: TransportParams,
/// Local transport parameters.
local_transport_params: TransportParams,
/// TLS handshake state.
///
/// Due to the requirement for `Connection` to be Send+Sync, and the fact
/// that BoringSSL's SSL structure is not thread safe, we need to wrap the
/// handshake object in a mutex.
handshake: Mutex<tls::Handshake>,
/// Loss recovery and congestion control state.
recovery: recovery::Recovery,
/// List of supported application protocols.
application_protos: Vec<Vec<u8>>,
/// Total number of received packets.
recv_count: usize,
/// Total number of sent packets.
sent_count: usize,
/// Total number of bytes received from the peer.
rx_data: u64,
/// Local flow control limit for the connection.
max_rx_data: u64,
/// Updated local flow control limit for the connection. This is used to
/// trigger sending MAX_DATA frames after a certain threshold.
max_rx_data_next: u64,
/// Whether we send MAX_DATA frame.
almost_full: bool,
/// Number of stream data bytes that can be buffered.
tx_cap: usize,
/// Total number of bytes sent to the peer.
tx_data: u64,
/// Peer's flow control limit for the connection.
max_tx_data: u64,
/// Total number of bytes the server can send before the peer's address
/// is verified.
max_send_bytes: usize,
/// Streams map, indexed by stream ID.
streams: stream::StreamMap,
/// Peer's original destination connection ID. Used by the client to
/// validate the server's transport parameter.
odcid: Option<ConnectionId<'static>>,
/// Peer's retry source connection ID. Used by the client during stateless
/// retry to validate the server's transport parameter.
rscid: Option<ConnectionId<'static>>,
/// Received address verification token.
token: Option<Vec<u8>>,
/// Error code to be sent to the peer in CONNECTION_CLOSE.
error: Option<u64>,
/// Error code to be sent to the peer in APPLICATION_CLOSE.
app_error: Option<u64>,
/// Error reason to be sent to the peer in APPLICATION_CLOSE.
app_reason: Vec<u8>,
/// Received path challenge.
challenge: Option<Vec<u8>>,
/// The connection-level limit at which send blocking occurred.
blocked_limit: Option<u64>,
/// Idle timeout expiration time.
idle_timer: Option<time::Instant>,
/// Draining timeout expiration time.
draining_timer: Option<time::Instant>,
/// The negotiated ALPN protocol.
alpn: Vec<u8>,
/// Whether this is a server-side connection.
is_server: bool,
/// Whether the initial secrets have been derived.
derived_initial_secrets: bool,
/// Whether a version negotiation packet has already been received. Only
/// relevant for client connections.
did_version_negotiation: bool,
/// Whether stateless retry has been performed.
did_retry: bool,
/// Whether the peer already updated its connection ID.
got_peer_conn_id: bool,
/// Whether the peer's address has been verified.
verified_peer_address: bool,
/// Whether the peer has verified our address.
peer_verified_address: bool,
/// Whether the peer's transport parameters were parsed.
parsed_peer_transport_params: bool,
/// Whether the connection handshake has been completed.
handshake_completed: bool,
/// Whether the HANDSHAKE_DONE has been sent.
handshake_done_sent: bool,
/// Whether the connection handshake has been confirmed.
handshake_confirmed: bool,
/// Whether an ack-eliciting packet has been sent since last receiving a
/// packet.
ack_eliciting_sent: bool,
/// Whether the connection is closed.
closed: bool,
/// Whether to send GREASE.
grease: bool,
/// TLS keylog writer.
keylog: Option<Box<dyn std::io::Write + Send + Sync>>,
/// Qlog streaming output.
#[cfg(feature = "qlog")]
qlog_streamer: Option<qlog::QlogStreamer>,
/// Whether peer transport parameters were qlogged.
#[cfg(feature = "qlog")]
qlogged_peer_params: bool,
/// DATAGRAM queues.
dgram_recv_queue: dgram::DatagramQueue,
dgram_send_queue: dgram::DatagramQueue,
}
/// Creates a new server-side connection.
///
/// The `scid` parameter represents the server's source connection ID, while
/// the optional `odcid` parameter represents the original destination ID the
/// client sent before a stateless retry (this is only required when using
/// the [`retry()`] function).
///
/// [`retry()`]: fn.retry.html
///
/// ## Examples:
///
/// ```no_run
/// # let mut config = quiche::Config::new(0xbabababa)?;
/// # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
/// let conn = quiche::accept(&scid, None, &mut config)?;
/// # Ok::<(), quiche::Error>(())
/// ```
#[inline]
pub fn accept(
scid: &ConnectionId, odcid: Option<&ConnectionId>, config: &mut Config,
) -> Result<Pin<Box<Connection>>> {
let conn = Connection::new(scid, odcid, config, true)?;
Ok(conn)
}
/// Creates a new client-side connection.
///
/// The `scid` parameter is used as the connection's source connection ID,
/// while the optional `server_name` parameter is used to verify the peer's
/// certificate.
///
/// ## Examples:
///
/// ```no_run
/// # let mut config = quiche::Config::new(0xbabababa)?;
/// # let server_name = "quic.tech";
/// # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
/// let conn = quiche::connect(Some(&server_name), &scid, &mut config)?;
/// # Ok::<(), quiche::Error>(())
/// ```
#[inline]
pub fn connect(
server_name: Option<&str>, scid: &ConnectionId, config: &mut Config,
) -> Result<Pin<Box<Connection>>> {
let conn = Connection::new(scid, None, config, false)?;
if let Some(server_name) = server_name {
conn.handshake.lock().unwrap().set_host_name(server_name)?;
}
Ok(conn)
}
/// Writes a version negotiation packet.
///
/// The `scid` and `dcid` parameters are the source connection ID and the
/// destination connection ID extracted from the received client's Initial
/// packet that advertises an unsupported version.
///
/// ## Examples:
///
/// ```no_run
/// # let mut buf = [0; 512];
/// # let mut out = [0; 512];
/// # let socket = std::net::UdpSocket::bind("127.0.0.1:0").unwrap();
/// let (len, src) = socket.recv_from(&mut buf).unwrap();
///
/// let hdr =
/// quiche::Header::from_slice(&mut buf[..len], quiche::MAX_CONN_ID_LEN)?;
///
/// if hdr.version != quiche::PROTOCOL_VERSION {
/// let len = quiche::negotiate_version(&hdr.scid, &hdr.dcid, &mut out)?;
/// socket.send_to(&out[..len], &src).unwrap();
/// }
/// # Ok::<(), quiche::Error>(())
/// ```
#[inline]
pub fn negotiate_version(
scid: &ConnectionId, dcid: &ConnectionId, out: &mut [u8],
) -> Result<usize> {
packet::negotiate_version(scid, dcid, out)
}
/// Writes a stateless retry packet.
///
/// The `scid` and `dcid` parameters are the source connection ID and the
/// destination connection ID extracted from the received client's Initial
/// packet, while `new_scid` is the server's new source connection ID and
/// `token` is the address validation token the client needs to echo back.
///
/// The application is responsible for generating the address validation
/// token to be sent to the client, and verifying tokens sent back by the
/// client. The generated token should include the `dcid` parameter, such
/// that it can be later extracted from the token and passed to the
/// [`accept()`] function as its `odcid` parameter.
///
/// [`accept()`]: fn.accept.html
///
/// ## Examples:
///
/// ```no_run
/// # let mut config = quiche::Config::new(0xbabababa)?;
/// # let mut buf = [0; 512];
/// # let mut out = [0; 512];
/// # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
/// # let socket = std::net::UdpSocket::bind("127.0.0.1:0").unwrap();
/// # fn mint_token(hdr: &quiche::Header, src: &std::net::SocketAddr) -> Vec<u8> {
/// # vec![]
/// # }
/// # fn validate_token<'a>(src: &std::net::SocketAddr, token: &'a [u8]) -> Option<quiche::ConnectionId<'a>> {
/// # None
/// # }
/// let (len, src) = socket.recv_from(&mut buf).unwrap();
///
/// let hdr = quiche::Header::from_slice(&mut buf[..len], quiche::MAX_CONN_ID_LEN)?;
///
/// let token = hdr.token.as_ref().unwrap();
///
/// // No token sent by client, create a new one.
/// if token.is_empty() {
/// let new_token = mint_token(&hdr, &src);
///
/// let len = quiche::retry(
/// &hdr.scid, &hdr.dcid, &scid, &new_token, hdr.version, &mut out,
/// )?;
///
/// socket.send_to(&out[..len], &src).unwrap();
/// return Ok(());
/// }
///
/// // Client sent token, validate it.
/// let odcid = validate_token(&src, token);
///
/// if odcid.is_none() {
/// // Invalid address validation token.
/// return Ok(());
/// }
///
/// let conn = quiche::accept(&scid, odcid.as_ref(), &mut config)?;
/// # Ok::<(), quiche::Error>(())
/// ```
#[inline]
pub fn retry(
scid: &ConnectionId, dcid: &ConnectionId, new_scid: &ConnectionId,
token: &[u8], version: u32, out: &mut [u8],
) -> Result<usize> {
packet::retry(scid, dcid, new_scid, token, version, out)
}
/// Returns true if the given protocol version is supported.
#[inline]
pub fn version_is_supported(version: u32) -> bool {
matches!(
version,
PROTOCOL_VERSION_DRAFT27 |
PROTOCOL_VERSION_DRAFT28 |
PROTOCOL_VERSION_DRAFT29
)
}
/// Pushes a frame to the output packet if there is enough space.
///
/// Returns `true` on success, `false` otherwise. In case of failure it means
/// there is no room to add the frame in the packet. You may retry to add the
/// frame later.
macro_rules! push_frame_to_pkt {
($out:expr, $frames:expr, $frame:expr, $left:expr) => {{
if $frame.wire_len() <= $left {
$left -= $frame.wire_len();
$frame.to_bytes(&mut $out)?;
$frames.push($frame);
true
} else {
false
}
}};
}
/// Conditional qlog action.
///
/// Executes the provided body if the qlog feature is enabled and quiche
/// has been condifigured with a log writer.
macro_rules! qlog_with {
($qlog_streamer:expr, $qlog_streamer_ref:ident, $body:block) => {{
#[cfg(feature = "qlog")]
{
if let Some($qlog_streamer_ref) = &mut $qlog_streamer {
$body
}
}
}};
}
impl Connection {
fn new(
scid: &ConnectionId, odcid: Option<&ConnectionId>, config: &mut Config,
is_server: bool,
) -> Result<Pin<Box<Connection>>> {
let tls = config.tls_ctx.lock().unwrap().new_handshake()?;
Connection::with_tls(scid, odcid, config, tls, is_server)
}
fn with_tls(
scid: &ConnectionId, odcid: Option<&ConnectionId>, config: &mut Config,
tls: tls::Handshake, is_server: bool,
) -> Result<Pin<Box<Connection>>> {
let max_rx_data = config.local_transport_params.initial_max_data;
let scid_as_hex: Vec<String> =
scid.iter().map(|b| format!("{:02x}", b)).collect();
let mut conn = Box::pin(Connection {
version: config.version,
dcid: ConnectionId::default(),
scid: scid.to_vec().into(),
trace_id: scid_as_hex.join(""),
pkt_num_spaces: [
packet::PktNumSpace::new(),
packet::PktNumSpace::new(),
packet::PktNumSpace::new(),
],
peer_transport_params: TransportParams::default(),
local_transport_params: config.local_transport_params.clone(),
handshake: Mutex::new(tls),
recovery: recovery::Recovery::new(&config),
application_protos: config.application_protos.clone(),
recv_count: 0,
sent_count: 0,
rx_data: 0,
max_rx_data,
max_rx_data_next: max_rx_data,
almost_full: false,
tx_cap: 0,
tx_data: 0,
max_tx_data: 0,
max_send_bytes: 0,
streams: stream::StreamMap::new(
config.local_transport_params.initial_max_streams_bidi,
config.local_transport_params.initial_max_streams_uni,
),
odcid: None,
rscid: None,
token: None,
error: None,
app_error: None,
app_reason: Vec::new(),
challenge: None,
blocked_limit: None,
idle_timer: None,
draining_timer: None,
alpn: Vec::new(),
is_server,
derived_initial_secrets: false,
did_version_negotiation: false,
did_retry: false,
got_peer_conn_id: false,
// If we did stateless retry assume the peer's address is verified.
verified_peer_address: odcid.is_some(),
// Assume clients validate the server's address implicitly.
peer_verified_address: is_server,
parsed_peer_transport_params: false,
handshake_completed: false,
handshake_done_sent: false,
handshake_confirmed: false,
ack_eliciting_sent: false,
closed: false,
grease: config.grease,
keylog: None,
#[cfg(feature = "qlog")]
qlog_streamer: None,
#[cfg(feature = "qlog")]
qlogged_peer_params: false,
dgram_recv_queue: dgram::DatagramQueue::new(
config.dgram_recv_max_queue_len,
),
dgram_send_queue: dgram::DatagramQueue::new(
config.dgram_send_max_queue_len,
),
});
if let Some(odcid) = odcid {
conn.local_transport_params
.original_destination_connection_id = Some(odcid.to_vec().into());
conn.local_transport_params.retry_source_connection_id =
Some(scid.to_vec().into());
conn.did_retry = true;
}
conn.local_transport_params.initial_source_connection_id =
Some(scid.to_vec().into());
conn.handshake.lock().unwrap().init(&conn)?;
conn.handshake.lock().unwrap().use_legacy_codepoint(true);
conn.encode_transport_params()?;
// Derive initial secrets for the client. We can do this here because
// we already generated the random destination connection ID.
if !is_server {
let mut dcid = [0; 16];
rand::rand_bytes(&mut dcid[..]);
let (aead_open, aead_seal) = crypto::derive_initial_key_material(
&dcid,
conn.version,
conn.is_server,
)?;
conn.dcid = dcid.to_vec().into();
conn.pkt_num_spaces[packet::EPOCH_INITIAL].crypto_open =
Some(aead_open);
conn.pkt_num_spaces[packet::EPOCH_INITIAL].crypto_seal =
Some(aead_seal);
conn.derived_initial_secrets = true;
}
Ok(conn)
}
/// Sets keylog output to the designated [`Writer`].
///
/// This needs to be called as soon as the connection is created, to avoid
/// missing some early logs.
///
/// [`Writer`]: https://doc.rust-lang.org/std/io/trait.Write.html
#[inline]
pub fn set_keylog(&mut self, writer: Box<dyn std::io::Write + Send + Sync>) {
self.keylog = Some(writer);
}
/// Sets qlog output to the designated [`Writer`].
///
/// This needs to be called as soon as the connection is created, to avoid
/// missing some early logs.
///
/// [`Writer`]: https://doc.rust-lang.org/std/io/trait.Write.html
#[cfg(feature = "qlog")]
pub fn set_qlog(
&mut self, writer: Box<dyn std::io::Write + Send + Sync>, title: String,
description: String,
) {
let vp = if self.is_server {
qlog::VantagePointType::Server
} else {
qlog::VantagePointType::Client
};
let trace = qlog::Trace::new(
qlog::VantagePoint {
name: None,
ty: vp,
flow: None,
},
Some(title.to_string()),
Some(description.to_string()),
Some(qlog::Configuration {
time_offset: Some("0".to_string()),
time_units: Some(qlog::TimeUnits::Ms),
original_uris: None,
}),
None,
);
let mut streamer = qlog::QlogStreamer::new(
qlog::QLOG_VERSION.to_string(),
Some(title),
Some(description),
None,
std::time::Instant::now(),
trace,
writer,
);
streamer.start_log().ok();
let handshake = self.handshake.lock().unwrap();
let ev = self.local_transport_params.to_qlog(
qlog::TransportOwner::Local,
self.version,
handshake.alpn_protocol(),
handshake.cipher(),
);
streamer.add_event(ev).ok();
self.qlog_streamer = Some(streamer);
}
/// Processes QUIC packets received from the peer.
///
/// On success the number of bytes processed from the input buffer is
/// returned. On error the connection will be closed by calling [`close()`]
/// with the appropriate error code.
///
/// Coalesced packets will be processed as necessary.
///
/// Note that the contents of the input buffer `buf` might be modified by
/// this function due to, for example, in-place decryption.
///
/// [`close()`]: struct.Connection.html#method.close
///
/// ## Examples:
///
/// ```no_run
/// # let mut buf = [0; 512];
/// # let socket = std::net::UdpSocket::bind("127.0.0.1:0").unwrap();
/// # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
/// # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
/// # let mut conn = quiche::accept(&scid, None, &mut config)?;
/// loop {
/// let read = socket.recv(&mut buf).unwrap();
///
/// let read = match conn.recv(&mut buf[..read]) {
/// Ok(v) => v,
///
/// Err(e) => {
/// // An error occurred, handle it.
/// break;
/// },
/// };
/// }
/// # Ok::<(), quiche::Error>(())
/// ```
pub fn recv(&mut self, buf: &mut [u8]) -> Result<usize> {
let len = buf.len();
if len == 0 {
return Err(Error::BufferTooShort);
}
// Keep track of how many bytes we received from the client, so we
// can limit bytes sent back before address validation, to a multiple
// of this. The limit needs to be increased early on, so that if there
// is an error there is enough credit to send a CONNECTION_CLOSE.
//
// It doesn't matter if the packets received were valid or not, we only
// need to track the total amount of bytes received.
if !self.verified_peer_address {
self.max_send_bytes += len * MAX_AMPLIFICATION_FACTOR;
}
let mut done = 0;
let mut left = len;
// Process coalesced packets.
while left > 0 {
let read = match self.recv_single(&mut buf[len - left..len]) {
Ok(v) => v,
Err(Error::Done) => left,
Err(e) => {
// In case of error processing the incoming packet, close
// the connection.
self.close(false, e.to_wire(), b"").ok();
return Err(e);
},
};
done += read;
left -= read;
}
Ok(done)
}
/// Processes a single QUIC packet received from the peer.
///
/// On success the number of bytes processed from the input buffer is
/// returned. When the [`Done`] error is returned, processing of the
/// remainder of the incoming UDP datagram should be interrupted.
///
/// On error, an error other than [`Done`] is returned.
///
/// [`Done`]: enum.Error.html#variant.Done
fn recv_single(&mut self, buf: &mut [u8]) -> Result<usize> {
let now = time::Instant::now();
if buf.is_empty() {
return Err(Error::Done);
}
if self.is_closed() || self.is_draining() {
return Err(Error::Done);
}
let is_closing = self.error.is_some() || self.app_error.is_some();
if is_closing {
return Err(Error::Done);
}
let mut b = octets::OctetsMut::with_slice(buf);
let mut hdr =
Header::from_bytes(&mut b, self.scid.len()).map_err(|e| {
drop_pkt_on_err(
e,
self.recv_count,
self.is_server,
&self.trace_id,
)
})?;
if hdr.ty == packet::Type::VersionNegotiation {
// Version negotiation packets can only be sent by the server.
if self.is_server {
return Err(Error::Done);
}
// Ignore duplicate version negotiation.
if self.did_version_negotiation {
return Err(Error::Done);
}
// Ignore version negotiation if any other packet has already been
// successfully processed.
if self.recv_count > 0 {
return Err(Error::Done);
}
if hdr.dcid != self.scid {
return Err(Error::Done);
}
if hdr.scid != self.dcid {
return Err(Error::Done);
}
trace!("{} rx pkt {:?}", self.trace_id, hdr);
let versions = hdr.versions.ok_or(Error::Done)?;
// Ignore version negotiation if the version already selected is
// listed.
if versions.iter().any(|&v| v == self.version) {
return Err(Error::Done);
}
match versions.iter().filter(|&&v| version_is_supported(v)).max() {
Some(v) => self.version = *v,
None => {
// We don't support any of the versions offered.
//
// While a man-in-the-middle attacker might be able to
// inject a version negotiation packet that triggers this
// failure, the window of opportunity is very small and
// this error is quite useful for debugging, so don't just
// ignore the packet.
return Err(Error::UnknownVersion);
},
};
self.did_version_negotiation = true;
// Derive Initial secrets based on the new version.
let (aead_open, aead_seal) = crypto::derive_initial_key_material(
&self.dcid,
self.version,
self.is_server,
)?;
// Reset connection state to force sending another Initial packet.
self.drop_epoch_state(packet::EPOCH_INITIAL, now);
self.got_peer_conn_id = false;
self.handshake.lock().unwrap().clear()?;
self.pkt_num_spaces[packet::EPOCH_INITIAL].crypto_open =
Some(aead_open);
self.pkt_num_spaces[packet::EPOCH_INITIAL].crypto_seal =
Some(aead_seal);
self.handshake.lock().unwrap().use_legacy_codepoint(true);
// Encode transport parameters again, as the new version might be
// using a different format.
self.encode_transport_params()?;
return Err(Error::Done);
}
if hdr.ty == packet::Type::Retry {
// Retry packets can only be sent by the server.
if self.is_server {
return Err(Error::Done);
}
// Ignore duplicate retry.
if self.did_retry {
return Err(Error::Done);
}
// Check if Retry packet is valid.
if packet::verify_retry_integrity(&b, &self.dcid, self.version)
.is_err()
{
return Err(Error::Done);
}
trace!("{} rx pkt {:?}", self.trace_id, hdr);
self.token = hdr.token;
self.did_retry = true;
// Remember peer's new connection ID.
self.odcid = Some(self.dcid.clone());
self.dcid = hdr.scid.clone();
self.rscid = Some(self.dcid.clone());
// Derive Initial secrets using the new connection ID.
let (aead_open, aead_seal) = crypto::derive_initial_key_material(
&hdr.scid,
self.version,
self.is_server,
)?;
// Reset connection state to force sending another Initial packet.
self.drop_epoch_state(packet::EPOCH_INITIAL, now);
self.got_peer_conn_id = false;
self.handshake.lock().unwrap().clear()?;
self.pkt_num_spaces[packet::EPOCH_INITIAL].crypto_open =
Some(aead_open);
self.pkt_num_spaces[packet::EPOCH_INITIAL].crypto_seal =
Some(aead_seal);
return Err(Error::Done);
}
if self.is_server && !self.did_version_negotiation {
if !version_is_supported(hdr.version) {
return Err(Error::UnknownVersion);
}
self.version = hdr.version;
self.did_version_negotiation = true;
// Encode transport parameters again, as the new version might be
// using a different format.
self.encode_transport_params()?;
}
if hdr.ty != packet::Type::Short && hdr.version != self.version {
// At this point version negotiation was already performed, so
// ignore packets that don't match the connection's version.
return Err(Error::Done);
}
// Long header packets have an explicit payload length, but short
// packets don't so just use the remaining capacity in the buffer.
let payload_len = if hdr.ty == packet::Type::Short {
b.cap()
} else {
b.get_varint().map_err(|e| {
drop_pkt_on_err(
e.into(),
self.recv_count,
self.is_server,
&self.trace_id,
)
})? as usize
};
// Derive initial secrets on the server.
if !self.derived_initial_secrets {
let (aead_open, aead_seal) = crypto::derive_initial_key_material(
&hdr.dcid,
self.version,
self.is_server,
)?;
self.pkt_num_spaces[packet::EPOCH_INITIAL].crypto_open =
Some(aead_open);
self.pkt_num_spaces[packet::EPOCH_INITIAL].crypto_seal =
Some(aead_seal);
self.derived_initial_secrets = true;
}
// Select packet number space epoch based on the received packet's type.
let epoch = hdr.ty.to_epoch()?;
// Select AEAD context used to open incoming packet.
#[allow(clippy::or_fun_call)]
let aead = (self.pkt_num_spaces[epoch].crypto_0rtt_open.as_ref())
// Only use 0-RTT key if incoming packet is 0-RTT.
.filter(|_| hdr.ty == packet::Type::ZeroRTT)
// Otherwise use the packet number space's main key.
.or(self.pkt_num_spaces[epoch].crypto_open.as_ref())
// Finally, discard packet if no usable key is available.
//
// TODO: buffer 0-RTT/1-RTT packets instead of discarding when the
// required key is not available yet, as an optimization.
.ok_or_else(|| {
drop_pkt_on_err(
Error::CryptoFail,
self.recv_count,
self.is_server,
&self.trace_id,
)
})?;
let aead_tag_len = aead.alg().tag_len();
packet::decrypt_hdr(&mut b, &mut hdr, &aead).map_err(|e| {
drop_pkt_on_err(e, self.recv_count, self.is_server, &self.trace_id)
})?;
let pn = packet::decode_pkt_num(
self.pkt_num_spaces[epoch].largest_rx_pkt_num,
hdr.pkt_num,
hdr.pkt_num_len,
);
let pn_len = hdr.pkt_num_len;
trace!(
"{} rx pkt {:?} len={} pn={}",
self.trace_id,
hdr,
payload_len,
pn
);
qlog_with!(self.qlog_streamer, q, {
let packet_size = b.len();
let qlog_pkt_hdr = qlog::PacketHeader::with_type(
hdr.ty.to_qlog(),
pn,
Some(packet_size as u64),
Some(payload_len as u64),
Some(hdr.version),
Some(&hdr.scid),
Some(&hdr.dcid),
);
q.add_event(qlog::event::Event::packet_received(
hdr.ty.to_qlog(),
qlog_pkt_hdr,
Some(Vec::new()),
None,
None,
None,
))
.ok();
});
let mut payload = packet::decrypt_pkt(
&mut b,
pn,
pn_len,
payload_len,
&aead,
)
.map_err(|e| {
drop_pkt_on_err(e, self.recv_count, self.is_server, &self.trace_id)
})?;
if self.pkt_num_spaces[epoch].recv_pkt_num.contains(pn) {
trace!("{} ignored duplicate packet {}", self.trace_id, pn);
return Err(Error::Done);
}
if !self.is_server && !self.got_peer_conn_id {
if self.odcid.is_none() {
self.odcid = Some(self.dcid.clone());
}
// Replace the randomly generated destination connection ID with
// the one supplied by the server.
self.dcid = hdr.scid.clone();
self.got_peer_conn_id = true;
}
if self.is_server && !self.got_peer_conn_id {
self.dcid = hdr.scid.clone();
if !self.did_retry && self.version >= PROTOCOL_VERSION_DRAFT28 {
self.local_transport_params
.original_destination_connection_id =
Some(hdr.dcid.to_vec().into());
self.encode_transport_params()?;
}
self.got_peer_conn_id = true;
}
// To avoid sending an ACK in response to an ACK-only packet, we need
// to keep track of whether this packet contains any frame other than
// ACK and PADDING.
let mut ack_elicited = false;
// Process packet payload.
while payload.cap() > 0 {
let frame = frame::Frame::from_bytes(&mut payload, hdr.ty)?;
qlog_with!(self.qlog_streamer, q, {
q.add_frame(frame.to_qlog(), false).ok();
});
if frame.ack_eliciting() {
ack_elicited = true;
}
if let Err(e) = self.process_frame(frame, epoch, now) {
qlog_with!(self.qlog_streamer, q, {
// Always conclude frame writing on error.
q.finish_frames().ok();
});
return Err(e);
}
}
qlog_with!(self.qlog_streamer, q, {
// Always conclude frame writing.
q.finish_frames().ok();
});
qlog_with!(self.qlog_streamer, q, {
let ev = self.recovery.to_qlog();
q.add_event(ev).ok();
});
// Only log the remote transport parameters once the connection is
// established (i.e. after frames have been fully parsed) and only
// once per connection.
if self.is_established() {
qlog_with!(self.qlog_streamer, q, {
if !self.qlogged_peer_params {
let handshake = self.handshake.lock().unwrap();
let ev = self.peer_transport_params.to_qlog(
qlog::TransportOwner::Remote,
self.version,
handshake.alpn_protocol(),
handshake.cipher(),
);
q.add_event(ev).ok();
self.qlogged_peer_params = true;
}
});
}
// Process acked frames.
for acked in self.recovery.acked[epoch].drain(..) {
match acked {
frame::Frame::ACK { ranges, .. } => {
// Stop acknowledging packets less than or equal to the
// largest acknowledged in the sent ACK frame that, in
// turn, got acked.
if let Some(largest_acked) = ranges.last() {
self.pkt_num_spaces[epoch]
.recv_pkt_need_ack
.remove_until(largest_acked);
}
},
frame::Frame::CryptoHeader { offset, length } => {
self.pkt_num_spaces[epoch]
.crypto_stream
.send
.ack_and_drop(offset, length);
},
frame::Frame::StreamHeader {
stream_id,
offset,
length,
..
} => {
let stream = match self.streams.get_mut(stream_id) {
Some(v) => v,
None => continue,
};
stream.send.ack_and_drop(offset, length);
// Only collect the stream if it is complete and not
// readable. If it is readable, it will get collected when
// stream_recv() is used.
if stream.is_complete() && !stream.is_readable() {
let local = stream.local;
self.streams.collect(stream_id, local);
}
},
frame::Frame::ResetStream { stream_id, .. } => {
let stream = match self.streams.get_mut(stream_id) {
Some(v) => v,
None => continue,
};
// Only collect the stream if it is complete and not
// readable. If it is readable, it will get collected when
// stream_recv() is used.
if stream.is_complete() && !stream.is_readable() {
let local = stream.local;
self.streams.collect(stream_id, local);
}
},
_ => (),
}
}
// We only record the time of arrival of the largest packet number
// that still needs to be acked, to be used for ACK delay calculation.
if self.pkt_num_spaces[epoch].recv_pkt_need_ack.last() < Some(pn) {
self.pkt_num_spaces[epoch].largest_rx_pkt_time = now;
}
self.pkt_num_spaces[epoch].recv_pkt_num.insert(pn);
self.pkt_num_spaces[epoch].recv_pkt_need_ack.push_item(pn);
self.pkt_num_spaces[epoch].ack_elicited =
cmp::max(self.pkt_num_spaces[epoch].ack_elicited, ack_elicited);
self.pkt_num_spaces[epoch].largest_rx_pkt_num =
cmp::max(self.pkt_num_spaces[epoch].largest_rx_pkt_num, pn);
if let Some(idle_timeout) = self.idle_timeout() {
self.idle_timer = Some(now + idle_timeout);
}
// Update send capacity.
self.tx_cap = cmp::min(
self.recovery.cwnd_available() as u64,
self.max_tx_data - self.tx_data,
) as usize;
self.recv_count += 1;
let read = b.off() + aead_tag_len;
// An Handshake packet has been received from the client and has been
// successfully processed, so we can drop the initial state and consider
// the client's address to be verified.
if self.is_server && hdr.ty == packet::Type::Handshake {
self.drop_epoch_state(packet::EPOCH_INITIAL, now);
self.verified_peer_address = true;
}
self.ack_eliciting_sent = false;
Ok(read)
}
/// Writes a single QUIC packet to be sent to the peer.
///
/// On success the number of bytes written to the output buffer is
/// returned, or [`Done`] if there was nothing to write.
///
/// The application should call `send()` multiple times until [`Done`] is
/// returned, indicating that there are no more packets to send. It is
/// recommended that `send()` be called in the following cases:
///
/// * When the application receives QUIC packets from the peer (that is,
/// any time [`recv()`] is also called).
///
/// * When the connection timer expires (that is, any time [`on_timeout()`]
/// is also called).
///
/// * When the application sends data to the peer (for examples, any time
/// [`stream_send()`] or [`stream_shutdown()`] are called).
///
/// [`Done`]: enum.Error.html#variant.Done
/// [`recv()`]: struct.Connection.html#method.recv
/// [`on_timeout()`]: struct.Connection.html#method.on_timeout
/// [`stream_send()`]: struct.Connection.html#method.stream_send
/// [`stream_shutdown()`]: struct.Connection.html#method.stream_shutdown
///
/// ## Examples:
///
/// ```no_run
/// # let mut out = [0; 512];
/// # let socket = std::net::UdpSocket::bind("127.0.0.1:0").unwrap();
/// # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
/// # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
/// # let mut conn = quiche::accept(&scid, None, &mut config)?;
/// loop {
/// let write = match conn.send(&mut out) {
/// Ok(v) => v,
///
/// Err(quiche::Error::Done) => {
/// // Done writing.
/// break;
/// },
///
/// Err(e) => {
/// // An error occurred, handle it.
/// break;
/// },
/// };
///
/// socket.send(&out[..write]).unwrap();
/// }
/// # Ok::<(), quiche::Error>(())
/// ```
pub fn send(&mut self, out: &mut [u8]) -> Result<usize> {
if out.is_empty() {
return Err(Error::BufferTooShort);
}
let mut has_initial = false;
let mut done = 0;
// Limit output packet size to respect the sender and receiver's
// maximum UDP payload size limit.
let mut left = cmp::min(out.len(), self.max_send_udp_payload_len());
// Limit data sent by the server based on the amount of data received
// from the client before its address is validated.
if !self.verified_peer_address && self.is_server {
left = cmp::min(left, self.max_send_bytes);
}
// Generate coalesced packets.
while left > 0 {
let (ty, written) =
match self.send_single(&mut out[done..done + left]) {
Ok(v) => v,
Err(Error::BufferTooShort) | Err(Error::Done) => break,
Err(e) => return Err(e),
};
done += written;
left -= written;
match ty {
packet::Type::Initial => has_initial = true,
// No more packets can be coalesced after a 1-RTT.
packet::Type::Short => break,
_ => (),
};
}
if done == 0 {
return Err(Error::Done);
}
// Pad UDP datagram if it contains a QUIC Initial packet.
if has_initial && left > 0 && done < MIN_CLIENT_INITIAL_LEN {
let pad_len = cmp::min(left, MIN_CLIENT_INITIAL_LEN - done);
// Fill padding area with null bytes, to avoid leaking information
// in case the application reuses the packet buffer.
out[done..done + pad_len].fill(0);
done += pad_len;
}
Ok(done)
}
fn send_single(&mut self, out: &mut [u8]) -> Result<(packet::Type, usize)> {
let now = time::Instant::now();
if out.is_empty() {
return Err(Error::BufferTooShort);
}
if self.is_closed() || self.is_draining() {
return Err(Error::Done);
}
// If the Initial secrets have not been derived yet, there's no point
// in trying to send a packet, so return early.
if !self.derived_initial_secrets {
return Err(Error::Done);
}
let is_closing = self.error.is_some() || self.app_error.is_some();
if !is_closing {
self.do_handshake()?;
}
let mut b = octets::OctetsMut::with_slice(out);
let epoch = self.write_epoch()?;
let pkt_type = packet::Type::from_epoch(epoch);
// Process lost frames.
for lost in self.recovery.lost[epoch].drain(..) {
match lost {
frame::Frame::CryptoHeader { offset, length } => {
self.pkt_num_spaces[epoch]
.crypto_stream
.send
.retransmit(offset, length);
},
frame::Frame::StreamHeader {
stream_id,
offset,
length,
fin,
} => {
let stream = match self.streams.get_mut(stream_id) {
Some(v) => v,
None => continue,
};
let was_flushable = stream.is_flushable();
let empty_fin = length == 0 && fin;
stream.send.retransmit(offset, length);
// If the stream is now flushable push it to the flushable
// queue, but only if it wasn't already queued.
//
// Consider the stream flushable also when we are sending a
// zero-length frame that has the fin flag set.
if (stream.is_flushable() || empty_fin) && !was_flushable {
let urgency = stream.urgency;
let incremental = stream.incremental;
self.streams.push_flushable(
stream_id,
urgency,
incremental,
);
}
},
frame::Frame::ACK { .. } => {
self.pkt_num_spaces[epoch].ack_elicited = true;
},
frame::Frame::ResetStream {
stream_id,
error_code,
final_size,
} =>
if self.streams.get(stream_id).is_some() {
self.streams
.mark_reset(stream_id, true, error_code, final_size);
},
frame::Frame::HandshakeDone => {
self.handshake_done_sent = false;
},
frame::Frame::MaxStreamData { stream_id, .. } => {
if self.streams.get(stream_id).is_some() {
self.streams.mark_almost_full(stream_id, true);
}
},
frame::Frame::MaxData { .. } => {
self.almost_full = true;
},
_ => (),
}
}
let mut left = b.cap();
// Limit output packet size by congestion window size.
left = cmp::min(left, self.recovery.cwnd_available());
let pn = self.pkt_num_spaces[epoch].next_pkt_num;
let pn_len = packet::pkt_num_len(pn)?;
// The AEAD overhead at the current encryption level.
let crypto_overhead = self.pkt_num_spaces[epoch]
.crypto_overhead()
.ok_or(Error::Done)?;
let hdr = Header {
ty: pkt_type,
version: self.version,
dcid: ConnectionId::from_ref(&self.dcid),
scid: ConnectionId::from_ref(&self.scid),
pkt_num: 0,
pkt_num_len: pn_len,
// Only clone token for Initial packets, as other packets don't have
// this field (Retry doesn't count, as it's not encoded as part of
// this code path).
token: if pkt_type == packet::Type::Initial {
self.token.clone()
} else {
None
},
versions: None,
key_phase: false,
};
hdr.to_bytes(&mut b)?;
// Calculate the space required for the packet, including the header
// the payload length, the packet number and the AEAD overhead.
let mut overhead = b.off() + pn_len + crypto_overhead;
// We assume that the payload length, which is only present in long
// header packets, can always be encoded with a 2-byte varint.
if pkt_type != packet::Type::Short {
overhead += 2;
}
// Make sure we have enough space left for the packet overhead.
match left.checked_sub(overhead) {
Some(v) => left = v,
None => {
// We can't send more because there isn't enough space available
// in the output buffer.
//
// This usually happens when we try to send a new packet but
// failed because cwnd is almost full. In such case app_limited
// is set to false here to make cwnd grow when ACK is received.
self.recovery.update_app_limited(false);
return Err(Error::Done);
},
}
// Make sure there is enough space for the minimum payload length.
if left < PAYLOAD_MIN_LEN {
self.recovery.update_app_limited(false);
return Err(Error::Done);
}
let mut frames: Vec<frame::Frame> = Vec::new();
let mut ack_eliciting = false;
let mut in_flight = false;
let mut has_data = false;
let header_offset = b.off();
// Reserve space for payload length in advance. Since we don't yet know
// what the final length will be, we reserve 2 bytes in all cases.
//
// Only long header packets have an explicit length field.
if pkt_type != packet::Type::Short {
b.skip(PAYLOAD_LENGTH_LEN)?;
}
packet::encode_pkt_num(pn, &mut b)?;
let payload_offset = b.off();
// Create ACK frame.
if self.pkt_num_spaces[epoch].recv_pkt_need_ack.len() > 0 &&
(self.pkt_num_spaces[epoch].ack_elicited ||
self.recovery.loss_probes[epoch] > 0) &&
!is_closing
{
let ack_delay =
self.pkt_num_spaces[epoch].largest_rx_pkt_time.elapsed();
let ack_delay = ack_delay.as_micros() as u64 /
2_u64
.pow(self.local_transport_params.ack_delay_exponent as u32);
let frame = frame::Frame::ACK {
ack_delay,
ranges: self.pkt_num_spaces[epoch].recv_pkt_need_ack.clone(),
};
if push_frame_to_pkt!(b, frames, frame, left) {
self.pkt_num_spaces[epoch].ack_elicited = false;
}
}
if pkt_type == packet::Type::Short && !is_closing {
// Create HANDSHAKE_DONE frame.
if self.is_established() &&
!self.handshake_done_sent &&
self.is_server
{
let frame = frame::Frame::HandshakeDone;
if push_frame_to_pkt!(b, frames, frame, left) {
self.handshake_done_sent = true;
ack_eliciting = true;
in_flight = true;
}
}
// Create MAX_STREAMS_BIDI frame.
if self.streams.should_update_max_streams_bidi() {
let frame = frame::Frame::MaxStreamsBidi {
max: self.streams.max_streams_bidi_next(),
};
if push_frame_to_pkt!(b, frames, frame, left) {
self.streams.update_max_streams_bidi();
ack_eliciting = true;
in_flight = true;
}
}
// Create MAX_STREAMS_UNI frame.
if self.streams.should_update_max_streams_uni() {
let frame = frame::Frame::MaxStreamsUni {
max: self.streams.max_streams_uni_next(),
};
if push_frame_to_pkt!(b, frames, frame, left) {
self.streams.update_max_streams_uni();
ack_eliciting = true;
in_flight = true;
}
}
// Create DATA_BLOCKED frame.
if let Some(limit) = self.blocked_limit {
let frame = frame::Frame::DataBlocked { limit };
if push_frame_to_pkt!(b, frames, frame, left) {
self.blocked_limit = None;
ack_eliciting = true;
in_flight = true;
}
}
// Create MAX_STREAM_DATA frames as needed.
for stream_id in self.streams.almost_full() {
let stream = match self.streams.get_mut(stream_id) {
Some(v) => v,
None => {
// The stream doesn't exist anymore, so remove it from
// the almost full set.
self.streams.mark_almost_full(stream_id, false);
continue;
},
};
let frame = frame::Frame::MaxStreamData {
stream_id,
max: stream.recv.max_data_next(),
};
if push_frame_to_pkt!(b, frames, frame, left) {
stream.recv.update_max_data();
self.streams.mark_almost_full(stream_id, false);
ack_eliciting = true;
in_flight = true;
// Also send MAX_DATA when MAX_STREAM_DATA is sent, to avoid a
// potential race condition.
self.almost_full = true;
}
}
// Create MAX_DATA frame as needed.
if self.almost_full && self.max_rx_data < self.max_rx_data_next {
let frame = frame::Frame::MaxData {
max: self.max_rx_data_next,
};
if push_frame_to_pkt!(b, frames, frame, left) {
self.almost_full = false;
// Commits the new max_rx_data limit.
self.max_rx_data = self.max_rx_data_next;
ack_eliciting = true;
in_flight = true;
}
}
// Create STOP_SENDING frames as needed.
for (stream_id, error_code) in self
.streams
.stopped()
.map(|(&k, &v)| (k, v))
.collect::<Vec<(u64, u64)>>()
{
let frame = frame::Frame::StopSending {
stream_id,
error_code,
};
if push_frame_to_pkt!(b, frames, frame, left) {
self.streams.mark_stopped(stream_id, false, 0);
ack_eliciting = true;
in_flight = true;
}
}
// Create RESET_STREAM frames as needed.
for (stream_id, (error_code, final_size)) in self
.streams
.reset()
.map(|(&k, &v)| (k, v))
.collect::<Vec<(u64, (u64, u64))>>()
{
let frame = frame::Frame::ResetStream {
stream_id,
error_code,
final_size,
};
if push_frame_to_pkt!(b, frames, frame, left) {
self.streams.mark_reset(stream_id, false, 0, 0);
ack_eliciting = true;
in_flight = true;
}
}
// Create STREAM_DATA_BLOCKED frames as needed.
for (stream_id, limit) in self
.streams
.blocked()
.map(|(&k, &v)| (k, v))
.collect::<Vec<(u64, u64)>>()
{
let frame = frame::Frame::StreamDataBlocked { stream_id, limit };
if push_frame_to_pkt!(b, frames, frame, left) {
self.streams.mark_blocked(stream_id, false, 0);
ack_eliciting = true;
in_flight = true;
}
}
}
// Create CONNECTION_CLOSE frame.
if let Some(err) = self.error {
let frame = frame::Frame::ConnectionClose {
error_code: err,
frame_type: 0,
reason: Vec::new(),
};
if push_frame_to_pkt!(b, frames, frame, left) {
self.draining_timer = Some(now + (self.recovery.pto() * 3));
ack_eliciting = true;
in_flight = true;
}
}
// Create APPLICATION_CLOSE frame.
if let Some(err) = self.app_error {
if pkt_type == packet::Type::Short {
let frame = frame::Frame::ApplicationClose {
error_code: err,
reason: self.app_reason.clone(),
};
if push_frame_to_pkt!(b, frames, frame, left) {
self.draining_timer = Some(now + (self.recovery.pto() * 3));
ack_eliciting = true;
in_flight = true;
}
}
}
// Create PATH_RESPONSE frame.
if let Some(ref challenge) = self.challenge {
let frame = frame::Frame::PathResponse {
data: challenge.clone(),
};
if push_frame_to_pkt!(b, frames, frame, left) {
self.challenge = None;
ack_eliciting = true;
in_flight = true;
}
}
// Create CRYPTO frame.
if self.pkt_num_spaces[epoch].crypto_stream.is_flushable() &&
left > frame::MAX_CRYPTO_OVERHEAD &&
!is_closing
{
let max_len = left - frame::MAX_CRYPTO_OVERHEAD;
let crypto_off =
self.pkt_num_spaces[epoch].crypto_stream.send.off_front();
// Encode the frame.
//
// Instead of creating a `frame::Frame` object, encode the frame
// directly into the packet buffer.
//
// First we reserve some space in the output buffer for writing the
// frame header (we assume the length field is always a 2-byte
// varint as we don't know the value yet).
//
// Then we emit the data from the crypto stream's send buffer.
//
// Finally we go back and encode the frame header with the now
// available information.
let hdr_off = b.off();
let hdr_len = 1 + // frame type
octets::varint_len(crypto_off) + // offset
2; // length, always encode as 2-byte varint
let (mut crypto_hdr, mut crypto_payload) =
b.split_at(hdr_off + hdr_len)?;
// Write stream data into the packet buffer.
let (len, _) = self.pkt_num_spaces[epoch]
.crypto_stream
.send
.emit(&mut crypto_payload.as_mut()[..max_len])?;
// Encode the frame's header.
//
// Due to how `OctetsMut::split_at()` works, `crypto_hdr` starts
// from the initial offset of `b` (rather than the current offset),
// so it needs to be advanced to the initial frame offset.
crypto_hdr.skip(hdr_off)?;
frame::encode_crypto_header(crypto_off, len as u64, &mut crypto_hdr)?;
// Advance the packet buffer's offset.
b.skip(hdr_len + len)?;
let frame = frame::Frame::CryptoHeader {
offset: crypto_off,
length: len,
};
if push_frame_to_pkt!(b, frames, frame, left) {
ack_eliciting = true;
in_flight = true;
has_data = true;
}
}
// Create DATAGRAM frame.
if pkt_type == packet::Type::Short &&
left > frame::MAX_DGRAM_OVERHEAD &&
!is_closing
{
if let Some(max_dgram_payload) = self.dgram_max_writable_len() {
while let Some(len) = self.dgram_send_queue.peek_front_len() {
if (len + frame::MAX_DGRAM_OVERHEAD) <= left {
// Front of the queue fits this packet, send it
match self.dgram_send_queue.pop() {
Some(data) => {
let frame = frame::Frame::Datagram { data };
if push_frame_to_pkt!(b, frames, frame, left) {
ack_eliciting = true;
in_flight = true;
}
},
None => continue,
};
} else if len > max_dgram_payload {
// This dgram frame will never fit. Let's purge it.
self.dgram_send_queue.pop();
} else {
break;
}
}
}
}
// Create a single STREAM frame for the first stream that is flushable.
if pkt_type == packet::Type::Short &&
left > frame::MAX_STREAM_OVERHEAD &&
!is_closing
{
while let Some(stream_id) = self.streams.pop_flushable() {
let stream = match self.streams.get_mut(stream_id) {
Some(v) => v,
None => continue,
};
// Avoid sending frames for streams that were already stopped.
//
// This might happen if stream data was buffered but not yet
// flushed on the wire when a STOP_SENDING frame is received.
if stream.send.is_stopped() {
continue;
}
let stream_off = stream.send.off_front();
// Encode the frame.
//
// Instead of creating a `frame::Frame` object, encode the frame
// directly into the packet buffer.
//
// First we reserve some space in the output buffer for writing
// the frame header (we assume the length field is
// always a 2-byte varint as we don't know the
// value yet).
//
// Then we emit the data from the stream's send buffer.
//
// Finally we go back and encode the frame header with the now
// available information.
let hdr_off = b.off();
let hdr_len = 1 + // frame type
octets::varint_len(stream_id) + // stream_id
octets::varint_len(stream_off) + // offset
2; // length, always encode as 2-byte varint
let max_len = match left.checked_sub(hdr_len) {
Some(v) => v,
None => continue,
};
let (mut stream_hdr, mut stream_payload) =
b.split_at(hdr_off + hdr_len)?;
// Write stream data into the packet buffer.
let (len, fin) =
stream.send.emit(&mut stream_payload.as_mut()[..max_len])?;
// Encode the frame's header.
//
// Due to how `OctetsMut::split_at()` works, `stream_hdr` starts
// from the initial offset of `b` (rather than the current
// offset), so it needs to be advanced to the initial frame
// offset.
stream_hdr.skip(hdr_off)?;
frame::encode_stream_header(
stream_id,
stream_off,
len as u64,
fin,
&mut stream_hdr,
)?;
// Advance the packet buffer's offset.
b.skip(hdr_len + len)?;
let frame = frame::Frame::StreamHeader {
stream_id,
offset: stream_off,
length: len,
fin,
};
if push_frame_to_pkt!(b, frames, frame, left) {
ack_eliciting = true;
in_flight = true;
has_data = true;
}
// If the stream is still flushable, push it to the back of the
// queue again.
if stream.is_flushable() {
let urgency = stream.urgency;
let incremental = stream.incremental;
self.streams.push_flushable(stream_id, urgency, incremental);
}
// When fuzzing, try to coalesce multiple STREAM frames in the
// same packet, so it's easier to generate fuzz corpora.
if cfg!(feature = "fuzzing") && left > frame::MAX_STREAM_OVERHEAD
{
continue;
}
break;
}
}
// Create PING for PTO probe if no other ack-elicitng frame is sent.
if self.recovery.loss_probes[epoch] > 0 &&
!ack_eliciting &&
left >= 1 &&
!is_closing
{
let frame = frame::Frame::Ping;
if push_frame_to_pkt!(b, frames, frame, left) {
ack_eliciting = true;
in_flight = true;
}
}
if ack_eliciting {
self.recovery.loss_probes[epoch] =
self.recovery.loss_probes[epoch].saturating_sub(1);
}
if frames.is_empty() {
// When we reach this point we are not able to write more, so set
// app_limited to false.
self.recovery.update_app_limited(false);
return Err(Error::Done);
}
// Pad payload so that it's always at least 4 bytes.
if b.off() - payload_offset < PAYLOAD_MIN_LEN {
let payload_len = b.off() - payload_offset;
let frame = frame::Frame::Padding {
len: PAYLOAD_MIN_LEN - payload_len,
};
#[allow(unused_assignments)]
if push_frame_to_pkt!(b, frames, frame, left) {
in_flight = true;
}
}
let payload_len = b.off() - payload_offset;
let payload_len = payload_len + crypto_overhead;
// Fill in payload length.
if pkt_type != packet::Type::Short {
let len = pn_len + payload_len;
let (_, mut payload_with_len) = b.split_at(header_offset)?;
payload_with_len
.put_varint_with_len(len as u64, PAYLOAD_LENGTH_LEN)?;
}
trace!(
"{} tx pkt {:?} len={} pn={}",
self.trace_id,
hdr,
payload_len,
pn
);
qlog_with!(self.qlog_streamer, q, {
let qlog_pkt_hdr = qlog::PacketHeader::with_type(
hdr.ty.to_qlog(),
pn,
Some(payload_len as u64 + payload_offset as u64),
Some(payload_len as u64),
Some(hdr.version),
Some(&hdr.scid),
Some(&hdr.dcid),
);
let packet_sent_ev = qlog::event::Event::packet_sent_min(
hdr.ty.to_qlog(),
qlog_pkt_hdr,
Some(Vec::new()),
);
q.add_event(packet_sent_ev).ok();
});
for frame in &mut frames {
trace!("{} tx frm {:?}", self.trace_id, frame);
qlog_with!(self.qlog_streamer, q, {
q.add_frame(frame.to_qlog(), false).ok();
});
// Once frames have been serialized they are passed to the Recovery
// module which manages retransmission. However, some frames do not
// contain retransmittable data, so drop it here.
frame.shrink_for_retransmission();
}
qlog_with!(self.qlog_streamer, q, {
q.finish_frames().ok();
});
let aead = match self.pkt_num_spaces[epoch].crypto_seal {
Some(ref v) => v,
None => return Err(Error::InvalidState),
};
let written = packet::encrypt_pkt(
&mut b,
pn,
pn_len,
payload_len,
payload_offset,
aead,
)?;
let sent_pkt = recovery::Sent {
pkt_num: pn,
frames,
time_sent: now,
time_acked: None,
time_lost: None,
size: if ack_eliciting { written } else { 0 },
ack_eliciting,
in_flight,
delivered: 0,
delivered_time: now,
recent_delivered_packet_sent_time: now,
is_app_limited: false,
has_data,
};
self.recovery.on_packet_sent(
sent_pkt,
epoch,
self.handshake_status(),
now,
&self.trace_id,
);
qlog_with!(self.qlog_streamer, q, {
let ev = self.recovery.to_qlog();
q.add_event(ev).ok();
});
self.pkt_num_spaces[epoch].next_pkt_num += 1;
self.sent_count += 1;