blob: 9bcf2c91bb183be95ba367bba975096f0fc6b959 [file] [log] [blame] [edit]
// Copyright (C) 2018-2019, Cloudflare, Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright notice,
// this list of conditions and the following disclaimer.
//
// * Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//! 🥧 Savoury implementation of the QUIC transport protocol and HTTP/3.
//!
//! [quiche] is an implementation of the QUIC transport protocol and HTTP/3 as
//! specified by the [IETF]. It provides a low level API for processing QUIC
//! packets and handling connection state. The application is responsible for
//! providing I/O (e.g. sockets handling) as well as an event loop with support
//! for timers.
//!
//! [quiche]: https://github.com/cloudflare/quiche/
//! [ietf]: https://quicwg.org/
//!
//! ## Connection setup
//!
//! The first step in establishing a QUIC connection using quiche is creating a
//! configuration object:
//!
//! ```
//! let config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
//! # Ok::<(), quiche::Error>(())
//! ```
//!
//! This is shared among multiple connections and can be used to configure a
//! QUIC endpoint.
//!
//! On the client-side the [`connect()`] utility function can be used to create
//! a new connection, while [`accept()`] is for servers:
//!
//! ```
//! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
//! # let server_name = "quic.tech";
//! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
//! # let to = "127.0.0.1:1234".parse().unwrap();
//! // Client connection.
//! let conn = quiche::connect(Some(&server_name), &scid, to, &mut config)?;
//!
//! // Server connection.
//! # let from = "127.0.0.1:1234".parse().unwrap();
//! let conn = quiche::accept(&scid, None, from, &mut config)?;
//! # Ok::<(), quiche::Error>(())
//! ```
//!
//! In both cases, the application is responsible for generating a new source
//! connection ID that will be used to identify the new connection.
//!
//! The application also need to pass the address of the remote peer of the
//! connection: in the case of a client that would be the address of the server
//! it is trying to connect to, and for a server that is the address of the
//! client that initiated the connection.
//!
//! ## Handling incoming packets
//!
//! Using the connection's [`recv()`] method the application can process
//! incoming packets that belong to that connection from the network:
//!
//! ```no_run
//! # let mut buf = [0; 512];
//! # let socket = std::net::UdpSocket::bind("127.0.0.1:0").unwrap();
//! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
//! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
//! # let from = "127.0.0.1:1234".parse().unwrap();
//! # let mut conn = quiche::accept(&scid, None, from, &mut config)?;
//! loop {
//! let (read, from) = socket.recv_from(&mut buf).unwrap();
//!
//! let recv_info = quiche::RecvInfo { from };
//!
//! let read = match conn.recv(&mut buf[..read], recv_info) {
//! Ok(v) => v,
//!
//! Err(quiche::Error::Done) => {
//! // Done reading.
//! break;
//! },
//!
//! Err(e) => {
//! // An error occurred, handle it.
//! break;
//! },
//! };
//! }
//! # Ok::<(), quiche::Error>(())
//! ```
//!
//! The application has to pass a [`RecvInfo`] structure in order to provide
//! additional information about the received packet (such as the address it
//! was received from).
//!
//! ## Generating outgoing packets
//!
//! Outgoing packet are generated using the connection's [`send()`] method
//! instead:
//!
//! ```no_run
//! # let mut out = [0; 512];
//! # let socket = std::net::UdpSocket::bind("127.0.0.1:0").unwrap();
//! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
//! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
//! # let from = "127.0.0.1:1234".parse().unwrap();
//! # let mut conn = quiche::accept(&scid, None, from, &mut config)?;
//! loop {
//! let (write, send_info) = match conn.send(&mut out) {
//! Ok(v) => v,
//!
//! Err(quiche::Error::Done) => {
//! // Done writing.
//! break;
//! },
//!
//! Err(e) => {
//! // An error occurred, handle it.
//! break;
//! },
//! };
//!
//! socket.send_to(&out[..write], &send_info.to).unwrap();
//! }
//! # Ok::<(), quiche::Error>(())
//! ```
//!
//! The application will be provided with a [`SendInfo`] structure providing
//! additional information about the newly created packet (such as the address
//! the packet should be sent to).
//!
//! When packets are sent, the application is responsible for maintaining a
//! timer to react to time-based connection events. The timer expiration can be
//! obtained using the connection's [`timeout()`] method.
//!
//! ```
//! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
//! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
//! # let from = "127.0.0.1:1234".parse().unwrap();
//! # let mut conn = quiche::accept(&scid, None, from, &mut config)?;
//! let timeout = conn.timeout();
//! # Ok::<(), quiche::Error>(())
//! ```
//!
//! The application is responsible for providing a timer implementation, which
//! can be specific to the operating system or networking framework used. When
//! a timer expires, the connection's [`on_timeout()`] method should be called,
//! after which additional packets might need to be sent on the network:
//!
//! ```no_run
//! # let mut out = [0; 512];
//! # let socket = std::net::UdpSocket::bind("127.0.0.1:0").unwrap();
//! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
//! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
//! # let from = "127.0.0.1:1234".parse().unwrap();
//! # let mut conn = quiche::accept(&scid, None, from, &mut config)?;
//! // Timeout expired, handle it.
//! conn.on_timeout();
//!
//! // Send more packets as needed after timeout.
//! loop {
//! let (write, send_info) = match conn.send(&mut out) {
//! Ok(v) => v,
//!
//! Err(quiche::Error::Done) => {
//! // Done writing.
//! break;
//! },
//!
//! Err(e) => {
//! // An error occurred, handle it.
//! break;
//! },
//! };
//!
//! socket.send_to(&out[..write], &send_info.to).unwrap();
//! }
//! # Ok::<(), quiche::Error>(())
//! ```
//!
//! ## Sending and receiving stream data
//!
//! After some back and forth, the connection will complete its handshake and
//! will be ready for sending or receiving application data.
//!
//! Data can be sent on a stream by using the [`stream_send()`] method:
//!
//! ```no_run
//! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
//! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
//! # let from = "127.0.0.1:1234".parse().unwrap();
//! # let mut conn = quiche::accept(&scid, None, from, &mut config)?;
//! if conn.is_established() {
//! // Handshake completed, send some data on stream 0.
//! conn.stream_send(0, b"hello", true)?;
//! }
//! # Ok::<(), quiche::Error>(())
//! ```
//!
//! The application can check whether there are any readable streams by using
//! the connection's [`readable()`] method, which returns an iterator over all
//! the streams that have outstanding data to read.
//!
//! The [`stream_recv()`] method can then be used to retrieve the application
//! data from the readable stream:
//!
//! ```no_run
//! # let mut buf = [0; 512];
//! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
//! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
//! # let from = "127.0.0.1:1234".parse().unwrap();
//! # let mut conn = quiche::accept(&scid, None, from, &mut config)?;
//! if conn.is_established() {
//! // Iterate over readable streams.
//! for stream_id in conn.readable() {
//! // Stream is readable, read until there's no more data.
//! while let Ok((read, fin)) = conn.stream_recv(stream_id, &mut buf) {
//! println!("Got {} bytes on stream {}", read, stream_id);
//! }
//! }
//! }
//! # Ok::<(), quiche::Error>(())
//! ```
//!
//! ## HTTP/3
//!
//! The quiche [HTTP/3 module] provides a high level API for sending and
//! receiving HTTP requests and responses on top of the QUIC transport protocol.
//!
//! [`connect()`]: fn.connect.html
//! [`accept()`]: fn.accept.html
//! [`recv()`]: struct.Connection.html#method.recv
//! [`RecvInfo`]: struct.RecvInfo.html
//! [`send()`]: struct.Connection.html#method.send
//! [`SendInfo`]: struct.SendInfo.html
//! [`timeout()`]: struct.Connection.html#method.timeout
//! [`on_timeout()`]: struct.Connection.html#method.on_timeout
//! [`stream_send()`]: struct.Connection.html#method.stream_send
//! [`readable()`]: struct.Connection.html#method.readable
//! [`stream_recv()`]: struct.Connection.html#method.stream_recv
//! [HTTP/3 module]: h3/index.html
//!
//! ## Congestion Control
//!
//! The quiche library provides a high-level API for configuring which
//! congestion control algorithm to use throughout the QUIC connection.
//!
//! When a QUIC connection is created, the application can optionally choose
//! which CC algorithm to use. See [`CongestionControlAlgorithm`] for currently
//! available congestion control algorithms.
//!
//! For example:
//!
//! ```
//! let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
//! config.set_cc_algorithm(quiche::CongestionControlAlgorithm::Reno);
//! ```
//!
//! Alternatively, you can configure the congestion control algorithm to use
//! by its name.
//!
//! ```
//! let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
//! config.set_cc_algorithm_name("reno").unwrap();
//! ```
//!
//! Note that the CC algorithm should be configured before calling [`connect()`]
//! or [`accept()`]. Otherwise the connection will use a default CC algorithm.
//!
//! [`CongestionControlAlgorithm`]: enum.CongestionControlAlgorithm.html
#![allow(improper_ctypes)]
#![allow(clippy::suspicious_operation_groupings)]
#![allow(clippy::upper_case_acronyms)]
#![warn(missing_docs)]
#[macro_use]
extern crate log;
use std::cmp;
use std::time;
use std::net::SocketAddr;
use std::pin::Pin;
use std::str::FromStr;
use std::collections::VecDeque;
/// The current QUIC wire version.
pub const PROTOCOL_VERSION: u32 = PROTOCOL_VERSION_V1;
/// Supported QUIC versions.
///
/// Note that the older ones might not be fully supported.
const PROTOCOL_VERSION_V1: u32 = 0x0000_0001;
const PROTOCOL_VERSION_DRAFT27: u32 = 0xff00_001b;
const PROTOCOL_VERSION_DRAFT28: u32 = 0xff00_001c;
const PROTOCOL_VERSION_DRAFT29: u32 = 0xff00_001d;
/// The maximum length of a connection ID.
pub const MAX_CONN_ID_LEN: usize = crate::packet::MAX_CID_LEN as usize;
/// The minimum length of Initial packets sent by a client.
pub const MIN_CLIENT_INITIAL_LEN: usize = 1200;
#[cfg(not(feature = "fuzzing"))]
const PAYLOAD_MIN_LEN: usize = 4;
#[cfg(feature = "fuzzing")]
// Due to the fact that in fuzzing mode we use a zero-length AEAD tag (which
// would normally be 16 bytes), we need to adjust the minimum payload size to
// account for that.
const PAYLOAD_MIN_LEN: usize = 20;
const MAX_AMPLIFICATION_FACTOR: usize = 3;
// The maximum number of tracked packet number ranges that need to be acked.
//
// This represents more or less how many ack blocks can fit in a typical packet.
const MAX_ACK_RANGES: usize = 68;
// The highest possible stream ID allowed.
const MAX_STREAM_ID: u64 = 1 << 60;
// The default max_datagram_size used in congestion control.
const MAX_SEND_UDP_PAYLOAD_SIZE: usize = 1200;
// The default length of DATAGRAM queues.
const DEFAULT_MAX_DGRAM_QUEUE_LEN: usize = 0;
// The DATAGRAM standard recommends either none or 65536 as maximum DATAGRAM
// frames size. We enforce the recommendation for forward compatibility.
const MAX_DGRAM_FRAME_SIZE: u64 = 65536;
// The length of the payload length field.
const PAYLOAD_LENGTH_LEN: usize = 2;
// The number of undecryptable that can be buffered.
const MAX_UNDECRYPTABLE_PACKETS: usize = 10;
const RESERVED_VERSION_MASK: u32 = 0xfafafafa;
/// A specialized [`Result`] type for quiche operations.
///
/// This type is used throughout quiche's public API for any operation that
/// can produce an error.
///
/// [`Result`]: https://doc.rust-lang.org/std/result/enum.Result.html
pub type Result<T> = std::result::Result<T, Error>;
/// A QUIC error.
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum Error {
/// There is no more work to do.
Done,
/// The provided buffer is too short.
BufferTooShort,
/// The provided packet cannot be parsed because its version is unknown.
UnknownVersion,
/// The provided packet cannot be parsed because it contains an invalid
/// frame.
InvalidFrame,
/// The provided packet cannot be parsed.
InvalidPacket,
/// The operation cannot be completed because the connection is in an
/// invalid state.
InvalidState,
/// The operation cannot be completed because the stream is in an
/// invalid state.
///
/// The stream ID is provided as associated data.
InvalidStreamState(u64),
/// The peer's transport params cannot be parsed.
InvalidTransportParam,
/// A cryptographic operation failed.
CryptoFail,
/// The TLS handshake failed.
TlsFail,
/// The peer violated the local flow control limits.
FlowControl,
/// The peer violated the local stream limits.
StreamLimit,
/// The specified stream was stopped by the peer.
///
/// The error code sent as part of the `STOP_SENDING` frame is provided as
/// associated data.
StreamStopped(u64),
/// The specified stream was reset by the peer.
///
/// The error code sent as part of the `RESET_STREAM` frame is provided as
/// associated data.
StreamReset(u64),
/// The received data exceeds the stream's final size.
FinalSize,
/// Error in congestion control.
CongestionControl,
}
impl Error {
fn to_wire(self) -> u64 {
match self {
Error::Done => 0x0,
Error::InvalidFrame => 0x7,
Error::InvalidStreamState(..) => 0x5,
Error::InvalidTransportParam => 0x8,
Error::FlowControl => 0x3,
Error::StreamLimit => 0x4,
Error::FinalSize => 0x6,
_ => 0xa,
}
}
#[cfg(feature = "ffi")]
fn to_c(self) -> libc::ssize_t {
match self {
Error::Done => -1,
Error::BufferTooShort => -2,
Error::UnknownVersion => -3,
Error::InvalidFrame => -4,
Error::InvalidPacket => -5,
Error::InvalidState => -6,
Error::InvalidStreamState(_) => -7,
Error::InvalidTransportParam => -8,
Error::CryptoFail => -9,
Error::TlsFail => -10,
Error::FlowControl => -11,
Error::StreamLimit => -12,
Error::FinalSize => -13,
Error::CongestionControl => -14,
Error::StreamStopped { .. } => -15,
Error::StreamReset { .. } => -16,
}
}
}
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{:?}", self)
}
}
impl std::error::Error for Error {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
None
}
}
impl std::convert::From<octets::BufferTooShortError> for Error {
fn from(_err: octets::BufferTooShortError) -> Self {
Error::BufferTooShort
}
}
/// Ancillary information about incoming packets.
#[derive(Clone, Copy, Debug, PartialEq)]
pub struct RecvInfo {
/// The address the packet was received from.
pub from: SocketAddr,
}
/// Ancillary information about outgoing packets.
#[derive(Clone, Copy, Debug, PartialEq)]
pub struct SendInfo {
/// The address the packet should be sent to.
pub to: SocketAddr,
/// The time to send the packet out.
pub at: time::Instant,
}
/// Represents information carried by `CONNECTION_CLOSE` frames.
#[derive(Clone, Debug, PartialEq)]
pub struct ConnectionError {
/// Whether the error came from the application or the transport layer.
pub is_app: bool,
/// The error code carried by the `CONNECTION_CLOSE` frame.
pub error_code: u64,
/// The reason carried by the `CONNECTION_CLOSE` frame.
pub reason: Vec<u8>,
}
/// The stream's side to shutdown.
///
/// This should be used when calling [`stream_shutdown()`].
///
/// [`stream_shutdown()`]: struct.Connection.html#method.stream_shutdown
#[repr(C)]
pub enum Shutdown {
/// Stop receiving stream data.
Read = 0,
/// Stop sending stream data.
Write = 1,
}
/// Qlog logging level.
#[repr(C)]
#[cfg(feature = "qlog")]
pub enum QlogLevel {
/// Logs any events of Core importance.
Core = 0,
/// Logs any events of Core and Base importance.
Base = 1,
/// Logs any events of Core, Base and Extra importance
Extra = 2,
}
/// Stores configuration shared between multiple connections.
pub struct Config {
local_transport_params: TransportParams,
version: u32,
tls_ctx: tls::Context,
application_protos: Vec<Vec<u8>>,
grease: bool,
cc_algorithm: CongestionControlAlgorithm,
hystart: bool,
dgram_recv_max_queue_len: usize,
dgram_send_max_queue_len: usize,
max_send_udp_payload_size: usize,
}
// See https://quicwg.org/base-drafts/rfc9000.html#section-15
fn is_reserved_version(version: u32) -> bool {
version & RESERVED_VERSION_MASK == version
}
impl Config {
/// Creates a config object with the given version.
///
/// ## Examples:
///
/// ```
/// let config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
/// # Ok::<(), quiche::Error>(())
/// ```
pub fn new(version: u32) -> Result<Config> {
if !is_reserved_version(version) && !version_is_supported(version) {
return Err(Error::UnknownVersion);
}
let tls_ctx = tls::Context::new()?;
Ok(Config {
local_transport_params: TransportParams::default(),
version,
tls_ctx,
application_protos: Vec::new(),
grease: true,
cc_algorithm: CongestionControlAlgorithm::CUBIC,
hystart: true,
dgram_recv_max_queue_len: DEFAULT_MAX_DGRAM_QUEUE_LEN,
dgram_send_max_queue_len: DEFAULT_MAX_DGRAM_QUEUE_LEN,
max_send_udp_payload_size: MAX_SEND_UDP_PAYLOAD_SIZE,
})
}
/// Configures the given certificate chain.
///
/// The content of `file` is parsed as a PEM-encoded leaf certificate,
/// followed by optional intermediate certificates.
///
/// ## Examples:
///
/// ```no_run
/// # let mut config = quiche::Config::new(0xbabababa)?;
/// config.load_cert_chain_from_pem_file("/path/to/cert.pem")?;
/// # Ok::<(), quiche::Error>(())
/// ```
pub fn load_cert_chain_from_pem_file(&mut self, file: &str) -> Result<()> {
self.tls_ctx.use_certificate_chain_file(file)
}
/// Configures the given private key.
///
/// The content of `file` is parsed as a PEM-encoded private key.
///
/// ## Examples:
///
/// ```no_run
/// # let mut config = quiche::Config::new(0xbabababa)?;
/// config.load_priv_key_from_pem_file("/path/to/key.pem")?;
/// # Ok::<(), quiche::Error>(())
/// ```
pub fn load_priv_key_from_pem_file(&mut self, file: &str) -> Result<()> {
self.tls_ctx.use_privkey_file(file)
}
/// Specifies a file where trusted CA certificates are stored for the
/// purposes of certificate verification.
///
/// The content of `file` is parsed as a PEM-encoded certificate chain.
///
/// ## Examples:
///
/// ```no_run
/// # let mut config = quiche::Config::new(0xbabababa)?;
/// config.load_verify_locations_from_file("/path/to/cert.pem")?;
/// # Ok::<(), quiche::Error>(())
/// ```
pub fn load_verify_locations_from_file(&mut self, file: &str) -> Result<()> {
self.tls_ctx.load_verify_locations_from_file(file)
}
/// Specifies a directory where trusted CA certificates are stored for the
/// purposes of certificate verification.
///
/// The content of `dir` a set of PEM-encoded certificate chains.
///
/// ## Examples:
///
/// ```no_run
/// # let mut config = quiche::Config::new(0xbabababa)?;
/// config.load_verify_locations_from_directory("/path/to/certs")?;
/// # Ok::<(), quiche::Error>(())
/// ```
pub fn load_verify_locations_from_directory(
&mut self, dir: &str,
) -> Result<()> {
self.tls_ctx.load_verify_locations_from_directory(dir)
}
/// Configures whether to verify the peer's certificate.
///
/// The default value is `true` for client connections, and `false` for
/// server ones.
pub fn verify_peer(&mut self, verify: bool) {
self.tls_ctx.set_verify(verify);
}
/// Configures whether to send GREASE values.
///
/// The default value is `true`.
pub fn grease(&mut self, grease: bool) {
self.grease = grease;
}
/// Enables logging of secrets.
///
/// When logging is enabled, the [`set_keylog()`] method must be called on
/// the connection for its cryptographic secrets to be logged in the
/// [keylog] format to the specified writer.
///
/// [`set_keylog()`]: struct.Connection.html#method.set_keylog
/// [keylog]: https://developer.mozilla.org/en-US/docs/Mozilla/Projects/NSS/Key_Log_Format
pub fn log_keys(&mut self) {
self.tls_ctx.enable_keylog();
}
/// Configures the session ticket key material.
///
/// On the server this key will be used to encrypt and decrypt session
/// tickets, used to perform session resumption without server-side state.
///
/// By default a key is generated internally, and rotated regularly, so
/// applications don't need to call this unless they need to use a
/// specific key (e.g. in order to support resumption across multiple
/// servers), in which case the application is also responsible for
/// rotating the key to provide forward secrecy.
pub fn set_ticket_key(&mut self, key: &[u8]) -> Result<()> {
self.tls_ctx.set_ticket_key(key)
}
/// Enables sending or receiving early data.
pub fn enable_early_data(&mut self) {
self.tls_ctx.set_early_data_enabled(true);
}
/// Configures the list of supported application protocols.
///
/// The list of protocols `protos` must be in wire-format (i.e. a series
/// of non-empty, 8-bit length-prefixed strings).
///
/// On the client this configures the list of protocols to send to the
/// server as part of the ALPN extension.
///
/// On the server this configures the list of supported protocols to match
/// against the client-supplied list.
///
/// Applications must set a value, but no default is provided.
///
/// ## Examples:
///
/// ```
/// # let mut config = quiche::Config::new(0xbabababa)?;
/// config.set_application_protos(b"\x08http/1.1\x08http/0.9")?;
/// # Ok::<(), quiche::Error>(())
/// ```
pub fn set_application_protos(&mut self, protos: &[u8]) -> Result<()> {
let mut b = octets::Octets::with_slice(protos);
let mut protos_list = Vec::new();
while let Ok(proto) = b.get_bytes_with_u8_length() {
protos_list.push(proto.to_vec());
}
self.application_protos = protos_list;
self.tls_ctx.set_alpn(&self.application_protos)
}
/// Sets the `max_idle_timeout` transport parameter, in milliseconds.
///
/// The default value is infinite, that is, no timeout is used.
pub fn set_max_idle_timeout(&mut self, v: u64) {
self.local_transport_params.max_idle_timeout = v;
}
/// Sets the `max_udp_payload_size transport` parameter.
///
/// The default value is `65527`.
pub fn set_max_recv_udp_payload_size(&mut self, v: usize) {
self.local_transport_params.max_udp_payload_size = v as u64;
}
/// Sets the maximum outgoing UDP payload size.
///
/// The default and minimum value is `1200`.
pub fn set_max_send_udp_payload_size(&mut self, v: usize) {
self.max_send_udp_payload_size = cmp::max(v, MAX_SEND_UDP_PAYLOAD_SIZE);
}
/// Sets the `initial_max_data` transport parameter.
///
/// When set to a non-zero value quiche will only allow at most `v` bytes
/// of incoming stream data to be buffered for the whole connection (that
/// is, data that is not yet read by the application) and will allow more
/// data to be received as the buffer is consumed by the application.
///
/// The default value is `0`.
pub fn set_initial_max_data(&mut self, v: u64) {
self.local_transport_params.initial_max_data = v;
}
/// Sets the `initial_max_stream_data_bidi_local` transport parameter.
///
/// When set to a non-zero value quiche will only allow at most `v` bytes
/// of incoming stream data to be buffered for each locally-initiated
/// bidirectional stream (that is, data that is not yet read by the
/// application) and will allow more data to be received as the buffer is
/// consumed by the application.
///
/// The default value is `0`.
pub fn set_initial_max_stream_data_bidi_local(&mut self, v: u64) {
self.local_transport_params
.initial_max_stream_data_bidi_local = v;
}
/// Sets the `initial_max_stream_data_bidi_remote` transport parameter.
///
/// When set to a non-zero value quiche will only allow at most `v` bytes
/// of incoming stream data to be buffered for each remotely-initiated
/// bidirectional stream (that is, data that is not yet read by the
/// application) and will allow more data to be received as the buffer is
/// consumed by the application.
///
/// The default value is `0`.
pub fn set_initial_max_stream_data_bidi_remote(&mut self, v: u64) {
self.local_transport_params
.initial_max_stream_data_bidi_remote = v;
}
/// Sets the `initial_max_stream_data_uni` transport parameter.
///
/// When set to a non-zero value quiche will only allow at most `v` bytes
/// of incoming stream data to be buffered for each unidirectional stream
/// (that is, data that is not yet read by the application) and will allow
/// more data to be received as the buffer is consumed by the application.
///
/// The default value is `0`.
pub fn set_initial_max_stream_data_uni(&mut self, v: u64) {
self.local_transport_params.initial_max_stream_data_uni = v;
}
/// Sets the `initial_max_streams_bidi` transport parameter.
///
/// When set to a non-zero value quiche will only allow `v` number of
/// concurrent remotely-initiated bidirectional streams to be open at any
/// given time and will increase the limit automatically as streams are
/// completed.
///
/// A bidirectional stream is considered completed when all incoming data
/// has been read by the application (up to the `fin` offset) or the
/// stream's read direction has been shutdown, and all outgoing data has
/// been acked by the peer (up to the `fin` offset) or the stream's write
/// direction has been shutdown.
///
/// The default value is `0`.
pub fn set_initial_max_streams_bidi(&mut self, v: u64) {
self.local_transport_params.initial_max_streams_bidi = v;
}
/// Sets the `initial_max_streams_uni` transport parameter.
///
/// When set to a non-zero value quiche will only allow `v` number of
/// concurrent remotely-initiated unidirectional streams to be open at any
/// given time and will increase the limit automatically as streams are
/// completed.
///
/// A unidirectional stream is considered completed when all incoming data
/// has been read by the application (up to the `fin` offset) or the
/// stream's read direction has been shutdown.
///
/// The default value is `0`.
pub fn set_initial_max_streams_uni(&mut self, v: u64) {
self.local_transport_params.initial_max_streams_uni = v;
}
/// Sets the `ack_delay_exponent` transport parameter.
///
/// The default value is `3`.
pub fn set_ack_delay_exponent(&mut self, v: u64) {
self.local_transport_params.ack_delay_exponent = v;
}
/// Sets the `max_ack_delay` transport parameter.
///
/// The default value is `25`.
pub fn set_max_ack_delay(&mut self, v: u64) {
self.local_transport_params.max_ack_delay = v;
}
/// Sets the `disable_active_migration` transport parameter.
///
/// The default value is `false`.
pub fn set_disable_active_migration(&mut self, v: bool) {
self.local_transport_params.disable_active_migration = v;
}
/// Sets the congestion control algorithm used by string.
///
/// The default value is `cubic`. On error `Error::CongestionControl`
/// will be returned.
///
/// ## Examples:
///
/// ```
/// # let mut config = quiche::Config::new(0xbabababa)?;
/// config.set_cc_algorithm_name("reno");
/// # Ok::<(), quiche::Error>(())
/// ```
pub fn set_cc_algorithm_name(&mut self, name: &str) -> Result<()> {
self.cc_algorithm = CongestionControlAlgorithm::from_str(name)?;
Ok(())
}
/// Sets the congestion control algorithm used.
///
/// The default value is `CongestionControlAlgorithm::CUBIC`.
pub fn set_cc_algorithm(&mut self, algo: CongestionControlAlgorithm) {
self.cc_algorithm = algo;
}
/// Configures whether to enable HyStart++.
///
/// The default value is `true`.
pub fn enable_hystart(&mut self, v: bool) {
self.hystart = v;
}
/// Configures whether to enable receiving DATAGRAM frames.
///
/// When enabled, the `max_datagram_frame_size` transport parameter is set
/// to 65536 as recommended by draft-ietf-quic-datagram-01.
///
/// The default is `false`.
pub fn enable_dgram(
&mut self, enabled: bool, recv_queue_len: usize, send_queue_len: usize,
) {
self.local_transport_params.max_datagram_frame_size = if enabled {
Some(MAX_DGRAM_FRAME_SIZE)
} else {
None
};
self.dgram_recv_max_queue_len = recv_queue_len;
self.dgram_send_max_queue_len = send_queue_len;
}
}
/// A QUIC connection.
pub struct Connection {
/// QUIC wire version used for the connection.
version: u32,
/// Peer's connection ID.
dcid: ConnectionId<'static>,
/// Local connection ID.
scid: ConnectionId<'static>,
/// Unique opaque ID for the connection that can be used for logging.
trace_id: String,
/// Packet number spaces.
pkt_num_spaces: [packet::PktNumSpace; packet::EPOCH_COUNT],
/// Peer's transport parameters.
peer_transport_params: TransportParams,
/// Local transport parameters.
local_transport_params: TransportParams,
/// TLS handshake state.
handshake: tls::Handshake,
/// Serialized TLS session buffer.
///
/// This field is populated when a new session ticket is processed on the
/// client. On the server this is empty.
session: Option<Vec<u8>>,
/// Loss recovery and congestion control state.
recovery: recovery::Recovery,
peer_addr: SocketAddr,
/// List of supported application protocols.
application_protos: Vec<Vec<u8>>,
/// Total number of received packets.
recv_count: usize,
/// Total number of sent packets.
sent_count: usize,
/// Total number of packets sent with data retransmitted.
retrans_count: usize,
/// Total number of bytes received from the peer.
rx_data: u64,
/// Local flow control limit for the connection.
max_rx_data: u64,
/// Updated local flow control limit for the connection. This is used to
/// trigger sending MAX_DATA frames after a certain threshold.
max_rx_data_next: u64,
/// Whether we send MAX_DATA frame.
almost_full: bool,
/// Number of stream data bytes that can be buffered.
tx_cap: usize,
/// Total number of bytes sent to the peer.
tx_data: u64,
/// Peer's flow control limit for the connection.
max_tx_data: u64,
/// Total number of bytes the server can send before the peer's address
/// is verified.
max_send_bytes: usize,
/// Total number of bytes retransmitted over the connection.
/// This counts only STREAM and CRYPTO data.
stream_retrans_bytes: u64,
/// Total number of bytes sent over the connection.
sent_bytes: u64,
/// Total number of bytes recevied over the connection.
recv_bytes: u64,
/// Streams map, indexed by stream ID.
streams: stream::StreamMap,
/// Peer's original destination connection ID. Used by the client to
/// validate the server's transport parameter.
odcid: Option<ConnectionId<'static>>,
/// Peer's retry source connection ID. Used by the client during stateless
/// retry to validate the server's transport parameter.
rscid: Option<ConnectionId<'static>>,
/// Received address verification token.
token: Option<Vec<u8>>,
/// Error code and reason to be sent to the peer in a CONNECTION_CLOSE
/// frame.
local_error: Option<ConnectionError>,
/// Error code and reason received from the peer in a CONNECTION_CLOSE
/// frame.
peer_error: Option<ConnectionError>,
/// Received path challenge.
challenge: Option<Vec<u8>>,
/// The connection-level limit at which send blocking occurred.
blocked_limit: Option<u64>,
/// Idle timeout expiration time.
idle_timer: Option<time::Instant>,
/// Draining timeout expiration time.
draining_timer: Option<time::Instant>,
/// List of raw packets that were received before they could be decrypted.
undecryptable_pkts: VecDeque<(Vec<u8>, RecvInfo)>,
/// The negotiated ALPN protocol.
alpn: Vec<u8>,
/// Whether this is a server-side connection.
is_server: bool,
/// Whether the initial secrets have been derived.
derived_initial_secrets: bool,
/// Whether a version negotiation packet has already been received. Only
/// relevant for client connections.
did_version_negotiation: bool,
/// Whether stateless retry has been performed.
did_retry: bool,
/// Whether the peer already updated its connection ID.
got_peer_conn_id: bool,
/// Whether the peer's address has been verified.
verified_peer_address: bool,
/// Whether the peer has verified our address.
peer_verified_address: bool,
/// Whether the peer's transport parameters were parsed.
parsed_peer_transport_params: bool,
/// Whether the connection handshake has been completed.
handshake_completed: bool,
/// Whether the HANDSHAKE_DONE frame has been sent.
handshake_done_sent: bool,
/// Whether the HANDSHAKE_DONE frame has been acked.
handshake_done_acked: bool,
/// Whether the connection handshake has been confirmed.
handshake_confirmed: bool,
/// Whether an ack-eliciting packet has been sent since last receiving a
/// packet.
ack_eliciting_sent: bool,
/// Whether the connection is closed.
closed: bool,
// Whether the connection was timed out
timed_out: bool,
/// Whether to send GREASE.
grease: bool,
/// TLS keylog writer.
keylog: Option<Box<dyn std::io::Write + Send + Sync>>,
#[cfg(feature = "qlog")]
qlog: QlogInfo,
/// DATAGRAM queues.
dgram_recv_queue: dgram::DatagramQueue,
dgram_send_queue: dgram::DatagramQueue,
/// Whether to emit DATAGRAM frames in the next packet.
emit_dgram: bool,
}
/// Creates a new server-side connection.
///
/// The `scid` parameter represents the server's source connection ID, while
/// the optional `odcid` parameter represents the original destination ID the
/// client sent before a stateless retry (this is only required when using
/// the [`retry()`] function).
///
/// [`retry()`]: fn.retry.html
///
/// ## Examples:
///
/// ```no_run
/// # let mut config = quiche::Config::new(0xbabababa)?;
/// # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
/// # let from = "127.0.0.1:1234".parse().unwrap();
/// let conn = quiche::accept(&scid, None, from, &mut config)?;
/// # Ok::<(), quiche::Error>(())
/// ```
#[inline]
pub fn accept(
scid: &ConnectionId, odcid: Option<&ConnectionId>, from: SocketAddr,
config: &mut Config,
) -> Result<Pin<Box<Connection>>> {
let conn = Connection::new(scid, odcid, from, config, true)?;
Ok(conn)
}
/// Creates a new client-side connection.
///
/// The `scid` parameter is used as the connection's source connection ID,
/// while the optional `server_name` parameter is used to verify the peer's
/// certificate.
///
/// ## Examples:
///
/// ```no_run
/// # let mut config = quiche::Config::new(0xbabababa)?;
/// # let server_name = "quic.tech";
/// # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
/// # let to = "127.0.0.1:1234".parse().unwrap();
/// let conn = quiche::connect(Some(&server_name), &scid, to, &mut config)?;
/// # Ok::<(), quiche::Error>(())
/// ```
#[inline]
pub fn connect(
server_name: Option<&str>, scid: &ConnectionId, to: SocketAddr,
config: &mut Config,
) -> Result<Pin<Box<Connection>>> {
let mut conn = Connection::new(scid, None, to, config, false)?;
if let Some(server_name) = server_name {
conn.handshake.set_host_name(server_name)?;
}
Ok(conn)
}
/// Writes a version negotiation packet.
///
/// The `scid` and `dcid` parameters are the source connection ID and the
/// destination connection ID extracted from the received client's Initial
/// packet that advertises an unsupported version.
///
/// ## Examples:
///
/// ```no_run
/// # let mut buf = [0; 512];
/// # let mut out = [0; 512];
/// # let socket = std::net::UdpSocket::bind("127.0.0.1:0").unwrap();
/// let (len, src) = socket.recv_from(&mut buf).unwrap();
///
/// let hdr =
/// quiche::Header::from_slice(&mut buf[..len], quiche::MAX_CONN_ID_LEN)?;
///
/// if hdr.version != quiche::PROTOCOL_VERSION {
/// let len = quiche::negotiate_version(&hdr.scid, &hdr.dcid, &mut out)?;
/// socket.send_to(&out[..len], &src).unwrap();
/// }
/// # Ok::<(), quiche::Error>(())
/// ```
#[inline]
pub fn negotiate_version(
scid: &ConnectionId, dcid: &ConnectionId, out: &mut [u8],
) -> Result<usize> {
packet::negotiate_version(scid, dcid, out)
}
/// Writes a stateless retry packet.
///
/// The `scid` and `dcid` parameters are the source connection ID and the
/// destination connection ID extracted from the received client's Initial
/// packet, while `new_scid` is the server's new source connection ID and
/// `token` is the address validation token the client needs to echo back.
///
/// The application is responsible for generating the address validation
/// token to be sent to the client, and verifying tokens sent back by the
/// client. The generated token should include the `dcid` parameter, such
/// that it can be later extracted from the token and passed to the
/// [`accept()`] function as its `odcid` parameter.
///
/// [`accept()`]: fn.accept.html
///
/// ## Examples:
///
/// ```no_run
/// # let mut config = quiche::Config::new(0xbabababa)?;
/// # let mut buf = [0; 512];
/// # let mut out = [0; 512];
/// # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
/// # let socket = std::net::UdpSocket::bind("127.0.0.1:0").unwrap();
/// # fn mint_token(hdr: &quiche::Header, src: &std::net::SocketAddr) -> Vec<u8> {
/// # vec![]
/// # }
/// # fn validate_token<'a>(src: &std::net::SocketAddr, token: &'a [u8]) -> Option<quiche::ConnectionId<'a>> {
/// # None
/// # }
/// let (len, src) = socket.recv_from(&mut buf).unwrap();
///
/// let hdr = quiche::Header::from_slice(&mut buf[..len], quiche::MAX_CONN_ID_LEN)?;
///
/// let token = hdr.token.as_ref().unwrap();
///
/// // No token sent by client, create a new one.
/// if token.is_empty() {
/// let new_token = mint_token(&hdr, &src);
///
/// let len = quiche::retry(
/// &hdr.scid, &hdr.dcid, &scid, &new_token, hdr.version, &mut out,
/// )?;
///
/// socket.send_to(&out[..len], &src).unwrap();
/// return Ok(());
/// }
///
/// // Client sent token, validate it.
/// let odcid = validate_token(&src, token);
///
/// if odcid.is_none() {
/// // Invalid address validation token.
/// return Ok(());
/// }
///
/// let conn = quiche::accept(&scid, odcid.as_ref(), src, &mut config)?;
/// # Ok::<(), quiche::Error>(())
/// ```
#[inline]
pub fn retry(
scid: &ConnectionId, dcid: &ConnectionId, new_scid: &ConnectionId,
token: &[u8], version: u32, out: &mut [u8],
) -> Result<usize> {
packet::retry(scid, dcid, new_scid, token, version, out)
}
/// Returns true if the given protocol version is supported.
#[inline]
pub fn version_is_supported(version: u32) -> bool {
matches!(
version,
PROTOCOL_VERSION_V1 |
PROTOCOL_VERSION_DRAFT27 |
PROTOCOL_VERSION_DRAFT28 |
PROTOCOL_VERSION_DRAFT29
)
}
/// Pushes a frame to the output packet if there is enough space.
///
/// Returns `true` on success, `false` otherwise. In case of failure it means
/// there is no room to add the frame in the packet. You may retry to add the
/// frame later.
macro_rules! push_frame_to_pkt {
($out:expr, $frames:expr, $frame:expr, $left:expr) => {{
if $frame.wire_len() <= $left {
$left -= $frame.wire_len();
$frame.to_bytes(&mut $out)?;
$frames.push($frame);
true
} else {
false
}
}};
}
/// Conditional qlog actions.
///
/// Executes the provided body if the qlog feature is enabled and quiche
/// has been configured with a log writer.
macro_rules! qlog_with {
($qlog:expr, $qlog_streamer_ref:ident, $body:block) => {{
#[cfg(feature = "qlog")]
{
if let Some($qlog_streamer_ref) = &mut $qlog.streamer {
$body
}
}
}};
}
/// Executes the provided body if the qlog feature is enabled, quiche has been
/// configured with a log writer, the event's importance is within the
/// confgured level.
macro_rules! qlog_with_type {
($ty:expr, $qlog:expr, $qlog_streamer_ref:ident, $body:block) => {{
#[cfg(feature = "qlog")]
{
if qlog::EventImportance::from($ty).is_contained_in(&$qlog.level) {
if let Some($qlog_streamer_ref) = &mut $qlog.streamer {
$body
}
}
}
}};
}
#[cfg(feature = "qlog")]
const QLOG_PARAMS_SET: qlog::EventType =
qlog::EventType::TransportEventType(qlog::TransportEventType::ParametersSet);
#[cfg(feature = "qlog")]
const QLOG_PACKET_RX: qlog::EventType =
qlog::EventType::TransportEventType(qlog::TransportEventType::PacketReceived);
#[cfg(feature = "qlog")]
const QLOG_PACKET_TX: qlog::EventType =
qlog::EventType::TransportEventType(qlog::TransportEventType::PacketSent);
#[cfg(feature = "qlog")]
const QLOG_DATA_MV: qlog::EventType =
qlog::EventType::TransportEventType(qlog::TransportEventType::DataMoved);
#[cfg(feature = "qlog")]
const QLOG_METRICS: qlog::EventType =
qlog::EventType::RecoveryEventType(qlog::RecoveryEventType::MetricsUpdated);
#[cfg(feature = "qlog")]
struct QlogInfo {
streamer: Option<qlog::QlogStreamer>,
logged_peer_params: bool,
level: qlog::EventImportance,
}
#[cfg(feature = "qlog")]
impl Default for QlogInfo {
fn default() -> Self {
QlogInfo {
streamer: None,
logged_peer_params: false,
level: qlog::EventImportance::Base,
}
}
}
impl Connection {
fn new(
scid: &ConnectionId, odcid: Option<&ConnectionId>, peer: SocketAddr,
config: &mut Config, is_server: bool,
) -> Result<Pin<Box<Connection>>> {
let tls = config.tls_ctx.new_handshake()?;
Connection::with_tls(scid, odcid, peer, config, tls, is_server)
}
fn with_tls(
scid: &ConnectionId, odcid: Option<&ConnectionId>, peer: SocketAddr,
config: &mut Config, tls: tls::Handshake, is_server: bool,
) -> Result<Pin<Box<Connection>>> {
let max_rx_data = config.local_transport_params.initial_max_data;
let scid_as_hex: Vec<String> =
scid.iter().map(|b| format!("{:02x}", b)).collect();
let mut conn = Box::pin(Connection {
version: config.version,
dcid: ConnectionId::default(),
scid: scid.to_vec().into(),
trace_id: scid_as_hex.join(""),
pkt_num_spaces: [
packet::PktNumSpace::new(),
packet::PktNumSpace::new(),
packet::PktNumSpace::new(),
],
peer_transport_params: TransportParams::default(),
local_transport_params: config.local_transport_params.clone(),
handshake: tls,
session: None,
recovery: recovery::Recovery::new(config),
peer_addr: peer,
application_protos: config.application_protos.clone(),
recv_count: 0,
sent_count: 0,
retrans_count: 0,
sent_bytes: 0,
recv_bytes: 0,
rx_data: 0,
max_rx_data,
max_rx_data_next: max_rx_data,
almost_full: false,
tx_cap: 0,
tx_data: 0,
max_tx_data: 0,
stream_retrans_bytes: 0,
max_send_bytes: 0,
streams: stream::StreamMap::new(
config.local_transport_params.initial_max_streams_bidi,
config.local_transport_params.initial_max_streams_uni,
),
odcid: None,
rscid: None,
token: None,
local_error: None,
peer_error: None,
challenge: None,
blocked_limit: None,
idle_timer: None,
draining_timer: None,
undecryptable_pkts: VecDeque::new(),
alpn: Vec::new(),
is_server,
derived_initial_secrets: false,
did_version_negotiation: false,
did_retry: false,
got_peer_conn_id: false,
// If we did stateless retry assume the peer's address is verified.
verified_peer_address: odcid.is_some(),
// Assume clients validate the server's address implicitly.
peer_verified_address: is_server,
parsed_peer_transport_params: false,
handshake_completed: false,
handshake_done_sent: false,
handshake_done_acked: false,
handshake_confirmed: false,
ack_eliciting_sent: false,
closed: false,
timed_out: false,
grease: config.grease,
keylog: None,
#[cfg(feature = "qlog")]
qlog: Default::default(),
dgram_recv_queue: dgram::DatagramQueue::new(
config.dgram_recv_max_queue_len,
),
dgram_send_queue: dgram::DatagramQueue::new(
config.dgram_send_max_queue_len,
),
emit_dgram: true,
});
if let Some(odcid) = odcid {
conn.local_transport_params
.original_destination_connection_id = Some(odcid.to_vec().into());
conn.local_transport_params.retry_source_connection_id =
Some(scid.to_vec().into());
conn.did_retry = true;
}
conn.local_transport_params.initial_source_connection_id =
Some(scid.to_vec().into());
let conn_ptr = &conn as &Connection as *const Connection;
conn.handshake.init(conn_ptr, is_server)?;
conn.handshake
.use_legacy_codepoint(config.version != PROTOCOL_VERSION_V1);
conn.encode_transport_params()?;
// Derive initial secrets for the client. We can do this here because
// we already generated the random destination connection ID.
if !is_server {
let mut dcid = [0; 16];
rand::rand_bytes(&mut dcid[..]);
let (aead_open, aead_seal) = crypto::derive_initial_key_material(
&dcid,
conn.version,
conn.is_server,
)?;
conn.dcid = dcid.to_vec().into();
conn.pkt_num_spaces[packet::EPOCH_INITIAL].crypto_open =
Some(aead_open);
conn.pkt_num_spaces[packet::EPOCH_INITIAL].crypto_seal =
Some(aead_seal);
conn.derived_initial_secrets = true;
}
Ok(conn)
}
/// Sets keylog output to the designated [`Writer`].
///
/// This needs to be called as soon as the connection is created, to avoid
/// missing some early logs.
///
/// [`Writer`]: https://doc.rust-lang.org/std/io/trait.Write.html
#[inline]
pub fn set_keylog(&mut self, writer: Box<dyn std::io::Write + Send + Sync>) {
self.keylog = Some(writer);
}
/// Sets qlog output to the designated [`Writer`].
///
/// Only events included in `QlogLevel::Base` are written.
///
/// This needs to be called as soon as the connection is created, to avoid
/// missing some early logs.
///
/// [`Writer`]: https://doc.rust-lang.org/std/io/trait.Write.html
#[cfg(feature = "qlog")]
pub fn set_qlog(
&mut self, writer: Box<dyn std::io::Write + Send + Sync>, title: String,
description: String,
) {
self.set_qlog_with_level(writer, title, description, QlogLevel::Base)
}
/// Sets qlog output to the designated [`Writer`].
///
/// Only qlog events included in the specified `QlogLevel` are written
///
/// This needs to be called as soon as the connection is created, to avoid
/// missing some early logs.
///
/// [`Writer`]: https://doc.rust-lang.org/std/io/trait.Write.html
#[cfg(feature = "qlog")]
pub fn set_qlog_with_level(
&mut self, writer: Box<dyn std::io::Write + Send + Sync>, title: String,
description: String, qlog_level: QlogLevel,
) {
let vp = if self.is_server {
qlog::VantagePointType::Server
} else {
qlog::VantagePointType::Client
};
let level = match qlog_level {
QlogLevel::Core => qlog::EventImportance::Core,
QlogLevel::Base => qlog::EventImportance::Base,
QlogLevel::Extra => qlog::EventImportance::Extra,
};
self.qlog.level = level;
let trace = qlog::Trace::new(
qlog::VantagePoint {
name: None,
ty: vp,
flow: None,
},
Some(title.to_string()),
Some(description.to_string()),
Some(qlog::Configuration {
time_offset: Some(0.0),
original_uris: None,
}),
None,
);
let mut streamer = qlog::QlogStreamer::new(
qlog::QLOG_VERSION.to_string(),
Some(title),
Some(description),
None,
time::Instant::now(),
trace,
self.qlog.level.clone(),
writer,
);
streamer.start_log().ok();
let ev_data = self
.local_transport_params
.to_qlog(qlog::TransportOwner::Local, self.handshake.cipher());
// This event occurs very early, so just mark the relative time as 0.0.
streamer
.add_event(qlog::Event::with_time(0.0, ev_data))
.ok();
self.qlog.streamer = Some(streamer);
}
/// Configures the given session for resumption.
///
/// On the client, this can be used to offer the given serialized session,
/// as returned by [`session()`], for resumption.
///
/// This must only be called immediately after creating a connection, that
/// is, before any packet is sent or received.
///
/// [`session()`]: struct.Connection.html#method.session
#[inline]
pub fn set_session(&mut self, session: &[u8]) -> Result<()> {
let mut b = octets::Octets::with_slice(session);
let session_len = b.get_u64()? as usize;
let session_bytes = b.get_bytes(session_len)?;
self.handshake.set_session(session_bytes.as_ref())?;
let raw_params_len = b.get_u64()? as usize;
let raw_params_bytes = b.get_bytes(raw_params_len)?;
let peer_params =
TransportParams::decode(raw_params_bytes.as_ref(), self.is_server)?;
self.process_peer_transport_params(peer_params);
Ok(())
}
/// Processes QUIC packets received from the peer.
///
/// On success the number of bytes processed from the input buffer is
/// returned. On error the connection will be closed by calling [`close()`]
/// with the appropriate error code.
///
/// Coalesced packets will be processed as necessary.
///
/// Note that the contents of the input buffer `buf` might be modified by
/// this function due to, for example, in-place decryption.
///
/// [`close()`]: struct.Connection.html#method.close
///
/// ## Examples:
///
/// ```no_run
/// # let mut buf = [0; 512];
/// # let socket = std::net::UdpSocket::bind("127.0.0.1:0").unwrap();
/// # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
/// # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
/// # let from = "127.0.0.1:1234".parse().unwrap();
/// # let mut conn = quiche::accept(&scid, None, from, &mut config)?;
/// loop {
/// let (read, from) = socket.recv_from(&mut buf).unwrap();
///
/// let recv_info = quiche::RecvInfo { from };
///
/// let read = match conn.recv(&mut buf[..read], recv_info) {
/// Ok(v) => v,
///
/// Err(e) => {
/// // An error occurred, handle it.
/// break;
/// },
/// };
/// }
/// # Ok::<(), quiche::Error>(())
/// ```
pub fn recv(&mut self, buf: &mut [u8], info: RecvInfo) -> Result<usize> {
let len = buf.len();
if len == 0 {
return Err(Error::BufferTooShort);
}
// Keep track of how many bytes we received from the client, so we
// can limit bytes sent back before address validation, to a multiple
// of this. The limit needs to be increased early on, so that if there
// is an error there is enough credit to send a CONNECTION_CLOSE.
//
// It doesn't matter if the packets received were valid or not, we only
// need to track the total amount of bytes received.
if !self.verified_peer_address {
self.max_send_bytes += len * MAX_AMPLIFICATION_FACTOR;
}
let mut done = 0;
let mut left = len;
// Process coalesced packets.
while left > 0 {
let read = match self.recv_single(&mut buf[len - left..len], &info) {
Ok(v) => v,
Err(Error::Done) => left,
Err(e) => {
// In case of error processing the incoming packet, close
// the connection.
self.close(false, e.to_wire(), b"").ok();
return Err(e);
},
};
done += read;
left -= read;
}
// Process previously undecryptable 0-RTT packets if the decryption key
// is now available.
if self.pkt_num_spaces[packet::EPOCH_APPLICATION]
.crypto_0rtt_open
.is_some()
{
while let Some((mut pkt, info)) = self.undecryptable_pkts.pop_front()
{
if let Err(e) = self.recv(&mut pkt, info) {
self.undecryptable_pkts.clear();
// Even though the packet was previously "accepted", it
// should be safe to forward the error, as it also comes
// from the `recv()` method.
return Err(e);
}
}
}
Ok(done)
}
/// Processes a single QUIC packet received from the peer.
///
/// On success the number of bytes processed from the input buffer is
/// returned. When the [`Done`] error is returned, processing of the
/// remainder of the incoming UDP datagram should be interrupted.
///
/// On error, an error other than [`Done`] is returned.
///
/// [`Done`]: enum.Error.html#variant.Done
fn recv_single(&mut self, buf: &mut [u8], info: &RecvInfo) -> Result<usize> {
let now = time::Instant::now();
if buf.is_empty() {
return Err(Error::Done);
}
if self.is_closed() || self.is_draining() {
return Err(Error::Done);
}
let is_closing = self.local_error.is_some();
if is_closing {
return Err(Error::Done);
}
let mut b = octets::OctetsMut::with_slice(buf);
let mut hdr =
Header::from_bytes(&mut b, self.scid.len()).map_err(|e| {
drop_pkt_on_err(
e,
self.recv_count,
self.is_server,
&self.trace_id,
)
})?;
if hdr.ty == packet::Type::VersionNegotiation {
// Version negotiation packets can only be sent by the server.
if self.is_server {
return Err(Error::Done);
}
// Ignore duplicate version negotiation.
if self.did_version_negotiation {
return Err(Error::Done);
}
// Ignore version negotiation if any other packet has already been
// successfully processed.
if self.recv_count > 0 {
return Err(Error::Done);
}
if hdr.dcid != self.scid {
return Err(Error::Done);
}
if hdr.scid != self.dcid {
return Err(Error::Done);
}
trace!("{} rx pkt {:?}", self.trace_id, hdr);
let versions = hdr.versions.ok_or(Error::Done)?;
// Ignore version negotiation if the version already selected is
// listed.
if versions.iter().any(|&v| v == self.version) {
return Err(Error::Done);
}
let supported_versions =
versions.iter().filter(|&&v| version_is_supported(v));
let mut found_version = false;
for &v in supported_versions {
found_version = true;
// The final version takes precedence over draft ones.
if v == PROTOCOL_VERSION_V1 {
self.version = v;
break;
}
self.version = cmp::max(self.version, v);
}
if !found_version {
// We don't support any of the versions offered.
//
// While a man-in-the-middle attacker might be able to
// inject a version negotiation packet that triggers this
// failure, the window of opportunity is very small and
// this error is quite useful for debugging, so don't just
// ignore the packet.
return Err(Error::UnknownVersion);
}
self.did_version_negotiation = true;
// Derive Initial secrets based on the new version.
let (aead_open, aead_seal) = crypto::derive_initial_key_material(
&self.dcid,
self.version,
self.is_server,
)?;
// Reset connection state to force sending another Initial packet.
self.drop_epoch_state(packet::EPOCH_INITIAL, now);
self.got_peer_conn_id = false;
self.handshake.clear()?;
self.pkt_num_spaces[packet::EPOCH_INITIAL].crypto_open =
Some(aead_open);
self.pkt_num_spaces[packet::EPOCH_INITIAL].crypto_seal =
Some(aead_seal);
self.handshake
.use_legacy_codepoint(self.version != PROTOCOL_VERSION_V1);
// Encode transport parameters again, as the new version might be
// using a different format.
self.encode_transport_params()?;
return Err(Error::Done);
}
if hdr.ty == packet::Type::Retry {
// Retry packets can only be sent by the server.
if self.is_server {
return Err(Error::Done);
}
// Ignore duplicate retry.
if self.did_retry {
return Err(Error::Done);
}
// Check if Retry packet is valid.
if packet::verify_retry_integrity(&b, &self.dcid, self.version)
.is_err()
{
return Err(Error::Done);
}
trace!("{} rx pkt {:?}", self.trace_id, hdr);
self.token = hdr.token;
self.did_retry = true;
// Remember peer's new connection ID.
self.odcid = Some(self.dcid.clone());
self.dcid = hdr.scid.clone();
self.rscid = Some(self.dcid.clone());
// Derive Initial secrets using the new connection ID.
let (aead_open, aead_seal) = crypto::derive_initial_key_material(
&hdr.scid,
self.version,
self.is_server,
)?;
// Reset connection state to force sending another Initial packet.
self.drop_epoch_state(packet::EPOCH_INITIAL, now);
self.got_peer_conn_id = false;
self.handshake.clear()?;
self.pkt_num_spaces[packet::EPOCH_INITIAL].crypto_open =
Some(aead_open);
self.pkt_num_spaces[packet::EPOCH_INITIAL].crypto_seal =
Some(aead_seal);
return Err(Error::Done);
}
if self.is_server && !self.did_version_negotiation {
if !version_is_supported(hdr.version) {
return Err(Error::UnknownVersion);
}
self.version = hdr.version;
self.did_version_negotiation = true;
self.handshake
.use_legacy_codepoint(self.version != PROTOCOL_VERSION_V1);
// Encode transport parameters again, as the new version might be
// using a different format.
self.encode_transport_params()?;
}
if hdr.ty != packet::Type::Short && hdr.version != self.version {
// At this point version negotiation was already performed, so
// ignore packets that don't match the connection's version.
return Err(Error::Done);
}
// Long header packets have an explicit payload length, but short
// packets don't so just use the remaining capacity in the buffer.
let payload_len = if hdr.ty == packet::Type::Short {
b.cap()
} else {
b.get_varint().map_err(|e| {
drop_pkt_on_err(
e.into(),
self.recv_count,
self.is_server,
&self.trace_id,
)
})? as usize
};
// Make sure the buffer is same or larger than an explicit
// payload length.
if payload_len > b.cap() {
return Err(drop_pkt_on_err(
Error::InvalidPacket,
self.recv_count,
self.is_server,
&self.trace_id,
));
}
// Derive initial secrets on the server.
if !self.derived_initial_secrets {
let (aead_open, aead_seal) = crypto::derive_initial_key_material(
&hdr.dcid,
self.version,
self.is_server,
)?;
self.pkt_num_spaces[packet::EPOCH_INITIAL].crypto_open =
Some(aead_open);
self.pkt_num_spaces[packet::EPOCH_INITIAL].crypto_seal =
Some(aead_seal);
self.derived_initial_secrets = true;
}
// Select packet number space epoch based on the received packet's type.
let epoch = hdr.ty.to_epoch()?;
// Select AEAD context used to open incoming packet.
let aead = if hdr.ty == packet::Type::ZeroRTT {
// Only use 0-RTT key if incoming packet is 0-RTT.
self.pkt_num_spaces[epoch].crypto_0rtt_open.as_ref()
} else {
// Otherwise use the packet number space's main key.
self.pkt_num_spaces[epoch].crypto_open.as_ref()
};
// Finally, discard packet if no usable key is available.
let aead = match aead {
Some(v) => v,
None => {
if hdr.ty == packet::Type::ZeroRTT &&
self.undecryptable_pkts.len() < MAX_UNDECRYPTABLE_PACKETS &&
!self.is_established()
{
// Buffer 0-RTT packets when the required read key is not
// available yet, and process them later.
//
// TODO: in the future we might want to buffer other types
// of undecryptable packets as well.
let pkt_len = b.off() + payload_len;
let pkt = (b.buf()[..pkt_len]).to_vec();
self.undecryptable_pkts.push_back((pkt, *info));
return Ok(pkt_len);
}
let e = drop_pkt_on_err(
Error::CryptoFail,
self.recv_count,
self.is_server,
&self.trace_id,
);
return Err(e);
},
};
let aead_tag_len = aead.alg().tag_len();
packet::decrypt_hdr(&mut b, &mut hdr, aead).map_err(|e| {
drop_pkt_on_err(e, self.recv_count, self.is_server, &self.trace_id)
})?;
let pn = packet::decode_pkt_num(
self.pkt_num_spaces[epoch].largest_rx_pkt_num,
hdr.pkt_num,
hdr.pkt_num_len,
);
let pn_len = hdr.pkt_num_len;
trace!(
"{} rx pkt {:?} len={} pn={}",
self.trace_id,
hdr,
payload_len,
pn
);
qlog_with_type!(QLOG_PACKET_RX, self.qlog, q, {
let packet_size = b.len();
let qlog_pkt_hdr = qlog::PacketHeader::with_type(
hdr.ty.to_qlog(),
pn,
Some(hdr.version),
Some(&hdr.scid),
Some(&hdr.dcid),
);
let qlog_raw_info = qlog::RawInfo {
length: Some(packet_size as u64),
payload_length: Some(payload_len as u64),
data: None,
};
let ev_data = qlog::EventData::PacketReceived {
header: qlog_pkt_hdr,
frames: Some(vec![]),
is_coalesced: None,
retry_token: None,
stateless_reset_token: None,
supported_versions: None,
raw: Some(qlog_raw_info),
datagram_id: None,
};
q.add_event_data_with_instant(ev_data, now).ok();
});
let mut payload = packet::decrypt_pkt(
&mut b,
pn,
pn_len,
payload_len,
aead,
)
.map_err(|e| {
drop_pkt_on_err(e, self.recv_count, self.is_server, &self.trace_id)
})?;
if self.pkt_num_spaces[epoch].recv_pkt_num.contains(pn) {
trace!("{} ignored duplicate packet {}", self.trace_id, pn);
return Err(Error::Done);
}
// Packets with no frames are invalid.
if payload.cap() == 0 {
return Err(Error::InvalidPacket);
}
if !self.is_server && !self.got_peer_conn_id {
if self.odcid.is_none() {
self.odcid = Some(self.dcid.clone());
}
// Replace the randomly generated destination connection ID with
// the one supplied by the server.
self.dcid = hdr.scid.clone();
self.got_peer_conn_id = true;
}
if self.is_server && !self.got_peer_conn_id {
self.dcid = hdr.scid.clone();
if !self.did_retry &&
(self.version >= PROTOCOL_VERSION_DRAFT28 ||
self.version == PROTOCOL_VERSION_V1)
{
self.local_transport_params
.original_destination_connection_id =
Some(hdr.dcid.to_vec().into());
self.encode_transport_params()?;
}
self.got_peer_conn_id = true;
}
// To avoid sending an ACK in response to an ACK-only packet, we need
// to keep track of whether this packet contains any frame other than
// ACK and PADDING.
let mut ack_elicited = false;
// Process packet payload.
while payload.cap() > 0 {
let frame = frame::Frame::from_bytes(&mut payload, hdr.ty)?;
qlog_with_type!(QLOG_PACKET_RX, self.qlog, q, {
q.add_frame(frame.to_qlog(), false).ok();
});
if frame.ack_eliciting() {
ack_elicited = true;
}
if let Err(e) = self.process_frame(frame, epoch, now) {
qlog_with_type!(QLOG_PACKET_RX, self.qlog, q, {
// Always conclude frame writing on error.
q.finish_frames().ok();
});
return Err(e);
}
}
qlog_with_type!(QLOG_PACKET_RX, self.qlog, q, {
// Always conclude frame writing.
q.finish_frames().ok();
});
qlog_with_type!(QLOG_PACKET_RX, self.qlog, q, {
if let Some(ev_data) = self.recovery.maybe_qlog() {
q.add_event_data_with_instant(ev_data, now).ok();
}
});
// Only log the remote transport parameters once the connection is
// established (i.e. after frames have been fully parsed) and only
// once per connection.
if self.is_established() {
qlog_with_type!(QLOG_PARAMS_SET, self.qlog, q, {
if !self.qlog.logged_peer_params {
let ev_data = self.peer_transport_params.to_qlog(
qlog::TransportOwner::Remote,
self.handshake.cipher(),
);
q.add_event_data_with_instant(ev_data, now).ok();
self.qlog.logged_peer_params = true;
}
});
}
// Process acked frames.
for acked in self.recovery.acked[epoch].drain(..) {
match acked {
frame::Frame::ACK { ranges, .. } => {
// Stop acknowledging packets less than or equal to the
// largest acknowledged in the sent ACK frame that, in
// turn, got acked.
if let Some(largest_acked) = ranges.last() {
self.pkt_num_spaces[epoch]
.recv_pkt_need_ack
.remove_until(largest_acked);
}
},
frame::Frame::CryptoHeader { offset, length } => {
self.pkt_num_spaces[epoch]
.crypto_stream
.send
.ack_and_drop(offset, length);
},
frame::Frame::StreamHeader {
stream_id,
offset,
length,
..
} => {
let stream = match self.streams.get_mut(stream_id) {
Some(v) => v,
None => continue,
};
stream.send.ack_and_drop(offset, length);
// Only collect the stream if it is complete and not
// readable. If it is readable, it will get collected when
// stream_recv() is used.
if stream.is_complete() && !stream.is_readable() {
let local = stream.local;
self.streams.collect(stream_id, local);
}
},
frame::Frame::HandshakeDone => {
// Explicitly set this to true, so that if the frame was
// already scheduled for retransmission, it is aborted.
self.handshake_done_sent = true;
self.handshake_done_acked = true;
},
frame::Frame::ResetStream { stream_id, .. } => {
let stream = match self.streams.get_mut(stream_id) {
Some(v) => v,
None => continue,
};
// Only collect the stream if it is complete and not
// readable. If it is readable, it will get collected when
// stream_recv() is used.
if stream.is_complete() && !stream.is_readable() {
let local = stream.local;
self.streams.collect(stream_id, local);
}
},
_ => (),
}
}
// We only record the time of arrival of the largest packet number
// that still needs to be acked, to be used for ACK delay calculation.
if self.pkt_num_spaces[epoch].recv_pkt_need_ack.last() < Some(pn) {
self.pkt_num_spaces[epoch].largest_rx_pkt_time = now;
}
self.pkt_num_spaces[epoch].recv_pkt_num.insert(pn);
self.pkt_num_spaces[epoch].recv_pkt_need_ack.push_item(pn);
self.pkt_num_spaces[epoch].ack_elicited =
cmp::max(self.pkt_num_spaces[epoch].ack_elicited, ack_elicited);
self.pkt_num_spaces[epoch].largest_rx_pkt_num =
cmp::max(self.pkt_num_spaces[epoch].largest_rx_pkt_num, pn);
if let Some(idle_timeout) = self.idle_timeout() {
self.idle_timer = Some(now + idle_timeout);
}
// Update send capacity.
self.update_tx_cap();
self.recv_count += 1;
let read = b.off() + aead_tag_len;
self.recv_bytes += read as u64;
// An Handshake packet has been received from the client and has been
// successfully processed, so we can drop the initial state and consider
// the client's address to be verified.
if self.is_server && hdr.ty == packet::Type::Handshake {
self.drop_epoch_state(packet::EPOCH_INITIAL, now);
self.verified_peer_address = true;
}
self.ack_eliciting_sent = false;
Ok(read)
}
/// Writes a single QUIC packet to be sent to the peer.
///
/// On success the number of bytes written to the output buffer is
/// returned, or [`Done`] if there was nothing to write.
///
/// The application should call `send()` multiple times until [`Done`] is
/// returned, indicating that there are no more packets to send. It is
/// recommended that `send()` be called in the following cases:
///
/// * When the application receives QUIC packets from the peer (that is,
/// any time [`recv()`] is also called).
///
/// * When the connection timer expires (that is, any time [`on_timeout()`]
/// is also called).
///
/// * When the application sends data to the peer (for examples, any time
/// [`stream_send()`] or [`stream_shutdown()`] are called).
///
/// [`Done`]: enum.Error.html#variant.Done
/// [`recv()`]: struct.Connection.html#method.recv
/// [`on_timeout()`]: struct.Connection.html#method.on_timeout
/// [`stream_send()`]: struct.Connection.html#method.stream_send
/// [`stream_shutdown()`]: struct.Connection.html#method.stream_shutdown
///
/// ## Examples:
///
/// ```no_run
/// # let mut out = [0; 512];
/// # let socket = std::net::UdpSocket::bind("127.0.0.1:0").unwrap();
/// # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
/// # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
/// # let from = "127.0.0.1:1234".parse().unwrap();
/// # let mut conn = quiche::accept(&scid, None, from, &mut config)?;
/// loop {
/// let (write, send_info) = match conn.send(&mut out) {
/// Ok(v) => v,
///
/// Err(quiche::Error::Done) => {
/// // Done writing.
/// break;
/// },
///
/// Err(e) => {
/// // An error occurred, handle it.
/// break;
/// },
/// };
///
/// socket.send_to(&out[..write], &send_info.to).unwrap();
/// }
/// # Ok::<(), quiche::Error>(())
/// ```
pub fn send(&mut self, out: &mut [u8]) -> Result<(usize, SendInfo)> {
if out.is_empty() {
return Err(Error::BufferTooShort);
}
if self.is_closed() || self.is_draining() {
return Err(Error::Done);
}
if self.local_error.is_none() {
self.do_handshake()?;
}
// Process previously undecryptable 0-RTT packets if the decryption key
// is now available.
if self.pkt_num_spaces[packet::EPOCH_APPLICATION]
.crypto_0rtt_open
.is_some()
{
while let Some((mut pkt, info)) = self.undecryptable_pkts.pop_front()
{
if self.recv(&mut pkt, info).is_err() {
self.undecryptable_pkts.clear();
// Forwarding the error value here could confuse
// applications, as they may not expect getting a `recv()`
// error when calling `send()`.
//
// We simply fall-through to sending packets, which should
// take care of terminating the connection as needed.
break;
}
}
}
// There's no point in trying to send a packet if the Initial secrets
// have not been derived yet, so return early.
if !self.derived_initial_secrets {
return Err(Error::Done);
}
let mut has_initial = false;
let mut done = 0;
// Limit output packet size to respect the sender and receiver's
// maximum UDP payload size limit.
let mut left = cmp::min(out.len(), self.max_send_udp_payload_size());
// Limit data sent by the server based on the amount of data received
// from the client before its address is validated.
if !self.verified_peer_address && self.is_server {
left = cmp::min(left, self.max_send_bytes);
}
// Generate coalesced packets.
while left > 0 {
let (ty, written) = match self
.send_single(&mut out[done..done + left], has_initial)
{
Ok(v) => v,
Err(Error::BufferTooShort) | Err(Error::Done) => break,
Err(e) => return Err(e),
};
done += written;
left -= written;
match ty {
packet::Type::Initial => has_initial = true,
// No more packets can be coalesced after a 1-RTT.
packet::Type::Short => break,
_ => (),
};
// When sending multiple PTO probes, don't coalesce them together,
// so they are sent on separate UDP datagrams.
if let Ok(epoch) = ty.to_epoch() {
if self.recovery.loss_probes[epoch] > 0 {
break;
}
}
}
if done == 0 {
return Err(Error::Done);
}
// Pad UDP datagram if it contains a QUIC Initial packet.
if has_initial && left > 0 && done < MIN_CLIENT_INITIAL_LEN {
let pad_len = cmp::min(left, MIN_CLIENT_INITIAL_LEN - done);
// Fill padding area with null bytes, to avoid leaking information
// in case the application reuses the packet buffer.
out[done..done + pad_len].fill(0);
done += pad_len;
}
let info = SendInfo {
to: self.peer_addr,
at: self
.recovery
.get_packet_send_time()
.unwrap_or_else(time::Instant::now),
};
Ok((done, info))
}
fn send_single(
&mut self, out: &mut [u8], has_initial: bool,
) -> Result<(packet::Type, usize)> {
let now = time::Instant::now();
if out.is_empty() {
return Err(Error::BufferTooShort);
}
if self.is_draining() {
return Err(Error::Done);
}
let is_closing = self.local_error.is_some();
let mut b = octets::OctetsMut::with_slice(out);
let pkt_type = self.write_pkt_type()?;
let epoch = pkt_type.to_epoch()?;
let stream_retrans_bytes = self.stream_retrans_bytes;
// Process lost frames.
for lost in self.recovery.lost[epoch].drain(..) {
match lost {
frame::Frame::CryptoHeader { offset, length } => {
self.pkt_num_spaces[epoch]
.crypto_stream
.send
.retransmit(offset, length);
self.stream_retrans_bytes += length as u64;
},
frame::Frame::StreamHeader {
stream_id,
offset,
length,
fin,
} => {
let stream = match self.streams.get_mut(stream_id) {
Some(v) => v,
None => continue,
};
let was_flushable = stream.is_flushable();
let empty_fin = length == 0 && fin;
stream.send.retransmit(offset, length);
// If the stream is now flushable push it to the flushable
// queue, but only if it wasn't already queued.
//
// Consider the stream flushable also when we are sending a
// zero-length frame that has the fin flag set.
if (stream.is_flushable() || empty_fin) && !was_flushable {
let urgency = stream.urgency;
let incremental = stream.incremental;
self.streams.push_flushable(
stream_id,
urgency,
incremental,
);
}
self.stream_retrans_bytes += length as u64;
},
frame::Frame::ACK { .. } => {
self.pkt_num_spaces[epoch].ack_elicited = true;
},
frame::Frame::ResetStream {
stream_id,
error_code,
final_size,
} =>
if self.streams.get(stream_id).is_some() {
self.streams
.mark_reset(stream_id, true, error_code, final_size);
},
// Retransmit HANDSHAKE_DONE only if it hasn't been acked at
// least once already.
frame::Frame::HandshakeDone if !self.handshake_done_acked => {
self.handshake_done_sent = false;
},
frame::Frame::MaxStreamData { stream_id, .. } => {
if self.streams.get(stream_id).is_some() {
self.streams.mark_almost_full(stream_id, true);
}
},
frame::Frame::MaxData { .. } => {
self.almost_full = true;
},
_ => (),
}
}
if stream_retrans_bytes > self.stream_retrans_bytes {
self.retrans_count += 1;
}
let mut left = b.cap();
// Limit output packet size by congestion window size.
left = cmp::min(left, self.recovery.cwnd_available());
let pn = self.pkt_num_spaces[epoch].next_pkt_num;
let pn_len = packet::pkt_num_len(pn)?;
// The AEAD overhead at the current encryption level.
let crypto_overhead = self.pkt_num_spaces[epoch]
.crypto_overhead()
.ok_or(Error::Done)?;
let hdr = Header {
ty: pkt_type,
version: self.version,
dcid: ConnectionId::from_ref(&self.dcid),
scid: ConnectionId::from_ref(&self.scid),
pkt_num: 0,
pkt_num_len: pn_len,
// Only clone token for Initial packets, as other packets don't have
// this field (Retry doesn't count, as it's not encoded as part of
// this code path).
token: if pkt_type == packet::Type::Initial {
self.token.clone()
} else {
None
},
versions: None,
key_phase: false,
};
hdr.to_bytes(&mut b)?;
// Calculate the space required for the packet, including the header
// the payload length, the packet number and the AEAD overhead.
let mut overhead = b.off() + pn_len + crypto_overhead;
// We assume that the payload length, which is only present in long
// header packets, can always be encoded with a 2-byte varint.
if pkt_type != packet::Type::Short {
overhead += PAYLOAD_LENGTH_LEN;
}
// Make sure we have enough space left for the packet overhead.
match left.checked_sub(overhead) {
Some(v) => left = v,
None => {
// We can't send more because there isn't enough space available
// in the output buffer.
//
// This usually happens when we try to send a new packet but
// failed because cwnd is almost full. In such case app_limited
// is set to false here to make cwnd grow when ACK is received.
self.recovery.update_app_limited(false);
return Err(Error::Done);
},
}
// Make sure there is enough space for the minimum payload length.
if left < PAYLOAD_MIN_LEN {
self.recovery.update_app_limited(false);
return Err(Error::Done);
}
let mut frames: Vec<frame::Frame> = Vec::new();
let mut ack_eliciting = false;
let mut in_flight = false;
let mut has_data = false;
let header_offset = b.off();
// Reserve space for payload length in advance. Since we don't yet know
// what the final length will be, we reserve 2 bytes in all cases.
//
// Only long header packets have an explicit length field.
if pkt_type != packet::Type::Short {
b.skip(PAYLOAD_LENGTH_LEN)?;
}
packet::encode_pkt_num(pn, &mut b)?;
let payload_offset = b.off();
// Create ACK frame.
if self.pkt_num_spaces[epoch].recv_pkt_need_ack.len() > 0 &&
(self.pkt_num_spaces[epoch].ack_elicited ||
self.recovery.loss_probes[epoch] > 0) &&
!is_closing
{
let ack_delay =
self.pkt_num_spaces[epoch].largest_rx_pkt_time.elapsed();
let ack_delay = ack_delay.as_micros() as u64 /
2_u64
.pow(self.local_transport_params.ack_delay_exponent as u32);
let frame = frame::Frame::ACK {
ack_delay,
ranges: self.pkt_num_spaces[epoch].recv_pkt_need_ack.clone(),
ecn_counts: None, // sending ECN is not supported at this time
};
if push_frame_to_pkt!(b, frames, frame, left) {
self.pkt_num_spaces[epoch].ack_elicited = false;
}
}
if pkt_type == packet::Type::Short && !is_closing {
// Create HANDSHAKE_DONE frame.
if self.should_send_handshake_done() {
let frame = frame::Frame::HandshakeDone;
if push_frame_to_pkt!(b, frames, frame, left) {
self.handshake_done_sent = true;
ack_eliciting = true;
in_flight = true;
}
}
// Create MAX_STREAMS_BIDI frame.
if self.streams.should_update_max_streams_bidi() {
let frame = frame::Frame::MaxStreamsBidi {
max: self.streams.max_streams_bidi_next(),
};
if push_frame_to_pkt!(b, frames, frame, left) {
self.streams.update_max_streams_bidi();
ack_eliciting = true;
in_flight = true;
}
}
// Create MAX_STREAMS_UNI frame.
if self.streams.should_update_max_streams_uni() {
let frame = frame::Frame::MaxStreamsUni {
max: self.streams.max_streams_uni_next(),
};
if push_frame_to_pkt!(b, frames, frame, left) {
self.streams.update_max_streams_uni();
ack_eliciting = true;
in_flight = true;
}
}
// Create DATA_BLOCKED frame.
if let Some(limit) = self.blocked_limit {
let frame = frame::Frame::DataBlocked { limit };
if push_frame_to_pkt!(b, frames, frame, left) {
self.blocked_limit = None;
ack_eliciting = true;
in_flight = true;
}
}
// Create MAX_STREAM_DATA frames as needed.
for stream_id in self.streams.almost_full() {
let stream = match self.streams.get_mut(stream_id) {
Some(v) => v,
None => {
// The stream doesn't exist anymore, so remove it from
// the almost full set.
self.streams.mark_almost_full(stream_id, false);
continue;
},
};
let frame = frame::Frame::MaxStreamData {
stream_id,
max: stream.recv.max_data_next(),
};
if push_frame_to_pkt!(b, frames, frame, left) {
stream.recv.update_max_data();
self.streams.mark_almost_full(stream_id, false);
ack_eliciting = true;
in_flight = true;
// Also send MAX_DATA when MAX_STREAM_DATA is sent, to avoid a
// potential race condition.
self.almost_full = true;
}
}
// Create MAX_DATA frame as needed.
if self.almost_full && self.max_rx_data < self.max_rx_data_next {
let frame = frame::Frame::MaxData {
max: self.max_rx_data_next,
};
if push_frame_to_pkt!(b, frames, frame, left) {
self.almost_full = false;
// Commits the new max_rx_data limit.
self.max_rx_data = self.max_rx_data_next;
ack_eliciting = true;
in_flight = true;
}
}
// Create STOP_SENDING frames as needed.
for (stream_id, error_code) in self
.streams
.stopped()
.map(|(&k, &v)| (k, v))
.collect::<Vec<(u64, u64)>>()
{
let frame = frame::Frame::StopSending {
stream_id,
error_code,
};
if push_frame_to_pkt!(b, frames, frame, left) {
self.streams.mark_stopped(stream_id, false, 0);
ack_eliciting = true;
in_flight = true;
}
}
// Create RESET_STREAM frames as needed.
for (stream_id, (error_code, final_size)) in self
.streams
.reset()
.map(|(&k, &v)| (k, v))
.collect::<Vec<(u64, (u64, u64))>>()
{
let frame = frame::Frame::ResetStream {
stream_id,
error_code,
final_size,
};
if push_frame_to_pkt!(b, frames, frame, left) {
self.streams.mark_reset(stream_id, false, 0, 0);
ack_eliciting = true;
in_flight = true;
}
}
// Create STREAM_DATA_BLOCKED frames as needed.
for (stream_id, limit) in self
.streams
.blocked()
.map(|(&k, &v)| (k, v))
.collect::<Vec<(u64, u64)>>()
{
let frame = frame::Frame::StreamDataBlocked { stream_id, limit };
if push_frame_to_pkt!(b, frames, frame, left) {
self.streams.mark_blocked(stream_id, false, 0);
ack_eliciting = true;
in_flight = true;
}
}
}
// Create CONNECTION_CLOSE frame.
if let Some(conn_err) = self.local_error.as_ref() {
if conn_err.is_app {
// Create ApplicationClose frame.
if pkt_type == packet::Type::Short {
let frame = frame::Frame::ApplicationClose {
error_code: conn_err.error_code,
reason: conn_err.reason.clone(),
};
if push_frame_to_pkt!(b, frames, frame, left) {
self.draining_timer =
Some(now + (self.recovery.pto() * 3));
ack_eliciting = true;
in_flight = true;
}
}
} else {
// Create ConnectionClose frame.
let frame = frame::Frame::ConnectionClose {
error_code: conn_err.error_code,
frame_type: 0,
reason: conn_err.reason.clone(),
};
if push_frame_to_pkt!(b, frames, frame, left) {
self.draining_timer = Some(now + (self.recovery.pto() * 3));
ack_eliciting = true;
in_flight = true;
}
}
}
// Create PATH_RESPONSE frame.
if let Some(ref challenge) = self.challenge {
let frame = frame::Frame::PathResponse {
data: challenge.clone(),
};
if push_frame_to_pkt!(b, frames, frame, left) {
self.challenge = None;
ack_eliciting = true;
in_flight = true;
}
}
// Create CRYPTO frame.
if self.pkt_num_spaces[epoch].crypto_stream.is_flushable() &&
left > frame::MAX_CRYPTO_OVERHEAD &&
!is_closing
{
let crypto_off =
self.pkt_num_spaces[epoch].crypto_stream.send.off_front();
// Encode the frame.
//
// Instead of creating a `frame::Frame` object, encode the frame
// directly into the packet buffer.
//
// First we reserve some space in the output buffer for writing the
// frame header (we assume the length field is always a 2-byte
// varint as we don't know the value yet).
//
// Then we emit the data from the crypto stream's send buffer.
//
// Finally we go back and encode the frame header with the now
// available information.
let hdr_off = b.off();
let hdr_len = 1 + // frame type
octets::varint_len(crypto_off) + // offset
2; // length, always encode as 2-byte varint
if let Some(max_len) = left.checked_sub(hdr_len) {
let (mut crypto_hdr, mut crypto_payload) =
b.split_at(hdr_off + hdr_len)?;
// Write stream data into the packet buffer.
let (len, _) = self.pkt_num_spaces[epoch]
.crypto_stream
.send
.emit(&mut crypto_payload.as_mut()[..max_len])?;
// Encode the frame's header.
//
// Due to how `OctetsMut::split_at()` works, `crypto_hdr` starts
// from the initial offset of `b` (rather than the current
// offset), so it needs to be advanced to the
// initial frame offset.
crypto_hdr.skip(hdr_off)?;
frame::encode_crypto_header(
crypto_off,
len as u64,
&mut crypto_hdr,
)?;
// Advance the packet buffer's offset.
b.skip(hdr_len + len)?;
let frame = frame::Frame::CryptoHeader {
offset: crypto_off,
length: len,
};
if push_frame_to_pkt!(b, frames, frame, left) {
ack_eliciting = true;
in_flight = true;
has_data = true;
}
}
}
// The preference of data-bearing frame to include in a packet
// is managed by `self.emit_dgram`. However, whether any frames
// can be sent depends on the state of their buffers. In the case
// where one type is preferred but its buffer is empty, fall back
// to the other type in order not to waste this function call.
let mut dgram_emitted = false;
let dgrams_to_emit = self.dgram_max_writable_len().is_some();
let stream_to_emit = self.streams.has_flushable();
let mut do_dgram = self.emit_dgram && dgrams_to_emit;
let do_stream = !self.emit_dgram && stream_to_emit;
if !do_stream && dgrams_to_emit {
do_dgram = true;
}
// Create DATAGRAM frame.
if (pkt_type == packet::Type::Short || pkt_type == packet::Type::ZeroRTT) &&
left > frame::MAX_DGRAM_OVERHEAD &&
!is_closing &&
do_dgram
{
if let Some(max_dgram_payload) = self.dgram_max_writable_len() {
while let Some(len) = self.dgram_send_queue.peek_front_len() {
let hdr_off = b.off();
let hdr_len = 1 + // frame type
2; // length, always encode as 2-byte varint
if (hdr_len + len) <= left {
// Front of the queue fits this packet, send it.
match self.dgram_send_queue.pop() {
Some(data) => {
// Encode the frame.
//
// Instead of creating a `frame::Frame` object,
// encode the frame directly into the packet
// buffer.
//
// First we reserve some space in the output
// buffer for writing the frame header (we
// assume the length field is always a 2-byte
// varint as we don't know the value yet).
//
// Then we emit the data from the DATAGRAM's
// buffer.
//
// Finally we go back and encode the frame
// header with the now available information.
let (mut dgram_hdr, mut dgram_payload) =
b.split_at(hdr_off + hdr_len)?;
dgram_payload.as_mut()[..len]
.copy_from_slice(&data);
// Encode the frame's header.
//
// Due to how `OctetsMut::split_at()` works,
// `dgram_hdr` starts from the initial offset
// of `b` (rather than the current offset), so
// it needs to be advanced to the initial frame
// offset.
dgram_hdr.skip(hdr_off)?;
frame::encode_dgram_header(
len as u64,
&mut dgram_hdr,
)?;
// Advance the packet buffer's offset.
b.skip(hdr_len + len)?;
let frame =
frame::Frame::DatagramHeader { length: len };
if push_frame_to_pkt!(b, frames, frame, left) {
ack_eliciting = true;
in_flight = true;
dgram_emitted = true;
}
},
None => continue,
};
} else if len > max_dgram_payload {
// This dgram frame will never fit. Let's purge it.
self.dgram_send_queue.pop();
} else {
break;
}
}
}
}
// Create a single STREAM frame for the first stream that is flushable.
if (pkt_type == packet::Type::Short || pkt_type == packet::Type::ZeroRTT) &&
left > frame::MAX_STREAM_OVERHEAD &&
!is_closing &&
!dgram_emitted
{
while let Some(stream_id) = self.streams.pop_flushable() {
let stream = match self.streams.get_mut(stream_id) {
Some(v) => v,
None => continue,
};
// Avoid sending frames for streams that were already stopped.
//
// This might happen if stream data was buffered but not yet
// flushed on the wire when a STOP_SENDING frame is received.
if stream.send.is_stopped() {
continue;
}
let stream_off = stream.send.off_front();
// Encode the frame.
//
// Instead of creating a `frame::Frame` object, encode the frame
// directly into the packet buffer.
//
// First we reserve some space in the output buffer for writing
// the frame header (we assume the length field is always a
// 2-byte varint as we don't know the value yet).
//
// Then we emit the data from the stream's send buffer.
//
// Finally we go back and encode the frame header with the now
// available information.
let hdr_off = b.off();
let hdr_len = 1 + // frame type
octets::varint_len(stream_id) + // stream_id
octets::varint_len(stream_off) + // offset
2; // length, always encode as 2-byte varint
let max_len = match left.checked_sub(hdr_len) {
Some(v) => v,
None => continue,
};
let (mut stream_hdr, mut stream_payload) =
b.split_at(hdr_off + hdr_len)?;
// Write stream data into the packet buffer.
let (len, fin) =
stream.send.emit(&mut stream_payload.as_mut()[..max_len])?;
// Encode the frame's header.
//
// Due to how `OctetsMut::split_at()` works, `stream_hdr` starts
// from the initial offset of `b` (rather than the current
// offset), so it needs to be advanced to the initial frame
// offset.
stream_hdr.skip(hdr_off)?;
frame::encode_stream_header(
stream_id,
stream_off,
len as u64,
fin,
&mut stream_hdr,
)?;
// Advance the packet buffer's offset.
b.skip(hdr_len + len)?;
let frame = frame::Frame::StreamHeader {
stream_id,
offset: stream_off,
length: len,
fin,
};
if push_frame_to_pkt!(b, frames, frame, left) {
ack_eliciting = true;
in_flight = true;
has_data = true;
}
// If the stream is still flushable, push it to the back of the
// queue again.
if stream.is_flushable() {
let urgency = stream.urgency;
let incremental = stream.incremental;
self.streams.push_flushable(stream_id, urgency, incremental);
}
// When fuzzing, try to coalesce multiple STREAM frames in the
// same packet, so it's easier to generate fuzz corpora.
if cfg!(feature = "fuzzing") && left > frame::MAX_STREAM_OVERHEAD
{
continue;
}
break;
}
}
// Alternate trying to send DATAGRAMs next time.
self.emit_dgram = !dgram_emitted;
// Create PING for PTO probe if no other ack-elicitng frame is sent.
if self.recovery.loss_probes[epoch] > 0 &&
!ack_eliciting &&
left >= 1 &&
!is_closing
{
let frame = frame::Frame::Ping;
if push_frame_to_pkt!(b, frames, frame, left) {
ack_eliciting = true;
in_flight = true;
}
}
if ack_eliciting {
self.recovery.loss_probes[epoch] =