blob: 6f4fc83ca56acd64b32beb49576f56e33a686750 [file] [log] [blame] [edit]
// Copyright (C) 2020, Cloudflare, Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright notice,
// this list of conditions and the following disclaimer.
//
// * Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//! Quiche application utilities.
//!
//! This module provides some utility functions that are common to quiche
//! applications.
use std::io::prelude::*;
use std::collections::HashMap;
#[cfg(feature = "sfv")]
use std::convert::TryFrom;
use std::fmt::Write as _;
use std::rc::Rc;
use std::cell::RefCell;
use std::path;
use ring::rand::SecureRandom;
use quiche::ConnectionId;
use quiche::h3::NameValue;
use quiche::h3::Priority;
pub fn stdout_sink(out: String) {
print!("{out}");
}
const H3_MESSAGE_ERROR: u64 = 0x10E;
/// ALPN helpers.
///
/// This module contains constants and functions for working with ALPN.
pub mod alpns {
pub const HTTP_09: [&[u8]; 2] = [b"hq-interop", b"http/0.9"];
pub const HTTP_3: [&[u8]; 1] = [b"h3"];
}
pub struct PartialRequest {
pub req: Vec<u8>,
}
pub struct PartialResponse {
pub headers: Option<Vec<quiche::h3::Header>>,
pub priority: Option<quiche::h3::Priority>,
pub body: Vec<u8>,
pub written: usize,
}
pub type ClientId = u64;
pub struct Client {
pub conn: quiche::Connection,
pub http_conn: Option<Box<dyn HttpConn>>,
pub client_id: ClientId,
pub app_proto_selected: bool,
pub partial_requests: std::collections::HashMap<u64, PartialRequest>,
pub partial_responses: std::collections::HashMap<u64, PartialResponse>,
pub max_datagram_size: usize,
pub loss_rate: f64,
pub max_send_burst: usize,
}
pub type ClientIdMap = HashMap<ConnectionId, ClientId>;
pub type ClientMap = HashMap<ClientId, Client>;
/// Makes a buffered writer for a resource with a target URL.
///
/// The file will have the same name as the resource's last path segment value.
/// Multiple requests for the same URL are indicated by the value of `cardinal`,
/// any value "N" greater than 1, will cause ".N" to be appended to the
/// filename.
fn make_resource_writer(
url: &url::Url, target_path: &Option<String>, cardinal: u64,
) -> Option<std::io::BufWriter<std::fs::File>> {
if let Some(tp) = target_path {
let resource =
url.path_segments().map(|c| c.collect::<Vec<_>>()).unwrap();
let mut path = format!("{}/{}", tp, resource.iter().last().unwrap());
if cardinal > 1 {
path = format!("{path}.{cardinal}");
}
match std::fs::File::create(&path) {
Ok(f) => return Some(std::io::BufWriter::new(f)),
Err(e) => panic!(
"Error creating file for {}, attempted path was {}: {}",
url, path, e
),
}
}
None
}
fn autoindex(path: path::PathBuf, index: &str) -> path::PathBuf {
if let Some(path_str) = path.to_str() {
if path_str.ends_with('/') {
let path_str = format!("{path_str}{index}");
return path::PathBuf::from(&path_str);
}
}
path
}
/// Makes a buffered writer for a qlog.
pub fn make_qlog_writer(
dir: &std::ffi::OsStr, role: &str, id: &str,
) -> std::io::BufWriter<std::fs::File> {
let mut path = std::path::PathBuf::from(dir);
let filename = format!("{role}-{id}.sqlog");
path.push(filename);
match std::fs::File::create(&path) {
Ok(f) => std::io::BufWriter::new(f),
Err(e) => panic!(
"Error creating qlog file attempted path was {:?}: {}",
path, e
),
}
}
fn dump_json(reqs: &[Http3Request], output_sink: &mut dyn FnMut(String)) {
let mut out = String::new();
writeln!(out, "{{").unwrap();
writeln!(out, " \"entries\": [").unwrap();
let mut reqs = reqs.iter().peekable();
while let Some(req) = reqs.next() {
writeln!(out, " {{").unwrap();
writeln!(out, " \"request\":{{").unwrap();
writeln!(out, " \"headers\":[").unwrap();
let mut req_hdrs = req.hdrs.iter().peekable();
while let Some(h) = req_hdrs.next() {
writeln!(out, " {{").unwrap();
writeln!(
out,
" \"name\": \"{}\",",
std::str::from_utf8(h.name()).unwrap()
)
.unwrap();
writeln!(
out,
" \"value\": \"{}\"",
std::str::from_utf8(h.value()).unwrap().replace('"', "\\\"")
)
.unwrap();
if req_hdrs.peek().is_some() {
writeln!(out, " }},").unwrap();
} else {
writeln!(out, " }}").unwrap();
}
}
writeln!(out, " ]}},").unwrap();
writeln!(out, " \"response\":{{").unwrap();
writeln!(out, " \"headers\":[").unwrap();
let mut response_hdrs = req.response_hdrs.iter().peekable();
while let Some(h) = response_hdrs.next() {
writeln!(out, " {{").unwrap();
writeln!(
out,
" \"name\": \"{}\",",
std::str::from_utf8(h.name()).unwrap()
)
.unwrap();
writeln!(
out,
" \"value\": \"{}\"",
std::str::from_utf8(h.value()).unwrap().replace('"', "\\\"")
)
.unwrap();
if response_hdrs.peek().is_some() {
writeln!(out, " }},").unwrap();
} else {
writeln!(out, " }}").unwrap();
}
}
writeln!(out, " ],").unwrap();
writeln!(out, " \"body\": {:?}", req.response_body).unwrap();
writeln!(out, " }}").unwrap();
if reqs.peek().is_some() {
writeln!(out, "}},").unwrap();
} else {
writeln!(out, "}}").unwrap();
}
}
writeln!(out, "]").unwrap();
writeln!(out, "}}").unwrap();
output_sink(out);
}
pub fn hdrs_to_strings(hdrs: &[quiche::h3::Header]) -> Vec<(String, String)> {
hdrs.iter()
.map(|h| {
let name = String::from_utf8_lossy(h.name()).to_string();
let value = String::from_utf8_lossy(h.value()).to_string();
(name, value)
})
.collect()
}
/// Generate a new pair of Source Connection ID and reset token.
pub fn generate_cid_and_reset_token<T: SecureRandom>(
rng: &T,
) -> (quiche::ConnectionId, u128) {
let mut scid = [0; quiche::MAX_CONN_ID_LEN];
rng.fill(&mut scid).unwrap();
let scid = scid.to_vec().into();
let mut reset_token = [0; 16];
rng.fill(&mut reset_token).unwrap();
let reset_token = u128::from_be_bytes(reset_token);
(scid, reset_token)
}
/// Construct a priority field value from quiche apps custom query string.
pub fn priority_field_value_from_query_string(url: &url::Url) -> Option<String> {
let mut priority = "".to_string();
for param in url.query_pairs() {
if param.0 == "u" {
write!(priority, "{}={},", param.0, param.1).ok();
}
if param.0 == "i" && param.1 == "1" {
priority.push_str("i,");
}
}
if !priority.is_empty() {
// remove trailing comma
priority.pop();
Some(priority)
} else {
None
}
}
/// Construct a Priority from quiche apps custom query string.
pub fn priority_from_query_string(url: &url::Url) -> Option<Priority> {
let mut urgency = None;
let mut incremental = None;
for param in url.query_pairs() {
if param.0 == "u" {
urgency = Some(param.1.parse::<u8>().unwrap());
}
if param.0 == "i" && param.1 == "1" {
incremental = Some(true);
}
}
match (urgency, incremental) {
(Some(u), Some(i)) => Some(Priority::new(u, i)),
(Some(u), None) => Some(Priority::new(u, false)),
(None, Some(i)) => Some(Priority::new(3, i)),
(None, None) => None,
}
}
pub trait HttpConn {
fn send_requests(
&mut self, conn: &mut quiche::Connection, target_path: &Option<String>,
);
fn handle_responses(
&mut self, conn: &mut quiche::Connection, buf: &mut [u8],
req_start: &std::time::Instant,
);
fn report_incomplete(&self, start: &std::time::Instant) -> bool;
fn handle_requests(
&mut self, conn: &mut quiche::Connection,
partial_requests: &mut HashMap<u64, PartialRequest>,
partial_responses: &mut HashMap<u64, PartialResponse>, root: &str,
index: &str, buf: &mut [u8],
) -> quiche::h3::Result<()>;
fn handle_writable(
&mut self, conn: &mut quiche::Connection,
partial_responses: &mut HashMap<u64, PartialResponse>, stream_id: u64,
);
}
/// Represents an HTTP/0.9 formatted request.
pub struct Http09Request {
url: url::Url,
cardinal: u64,
request_line: String,
stream_id: Option<u64>,
response_writer: Option<std::io::BufWriter<std::fs::File>>,
}
/// Represents an HTTP/3 formatted request.
struct Http3Request {
url: url::Url,
cardinal: u64,
stream_id: Option<u64>,
hdrs: Vec<quiche::h3::Header>,
priority: Option<Priority>,
response_hdrs: Vec<quiche::h3::Header>,
response_body: Vec<u8>,
response_body_max: usize,
response_writer: Option<std::io::BufWriter<std::fs::File>>,
}
type Http3ResponseBuilderResult = std::result::Result<
(Vec<quiche::h3::Header>, Vec<u8>, Vec<u8>),
(u64, String),
>;
pub struct Http09Conn {
stream_id: u64,
reqs_sent: usize,
reqs_complete: usize,
reqs: Vec<Http09Request>,
output_sink: Rc<RefCell<dyn FnMut(String)>>,
}
impl Default for Http09Conn {
fn default() -> Self {
Http09Conn {
stream_id: Default::default(),
reqs_sent: Default::default(),
reqs_complete: Default::default(),
reqs: Default::default(),
output_sink: Rc::new(RefCell::new(stdout_sink)),
}
}
}
impl Http09Conn {
pub fn with_urls(
urls: &[url::Url], reqs_cardinal: u64,
output_sink: Rc<RefCell<dyn FnMut(String)>>,
) -> Box<dyn HttpConn> {
let mut reqs = Vec::new();
for url in urls {
for i in 1..=reqs_cardinal {
let request_line = format!("GET {}\r\n", url.path());
reqs.push(Http09Request {
url: url.clone(),
cardinal: i,
request_line,
stream_id: None,
response_writer: None,
});
}
}
let h_conn = Http09Conn {
stream_id: 0,
reqs_sent: 0,
reqs_complete: 0,
reqs,
output_sink,
};
Box::new(h_conn)
}
}
impl HttpConn for Http09Conn {
fn send_requests(
&mut self, conn: &mut quiche::Connection, target_path: &Option<String>,
) {
let mut reqs_done = 0;
for req in self.reqs.iter_mut().skip(self.reqs_sent) {
match conn.stream_send(
self.stream_id,
req.request_line.as_bytes(),
true,
) {
Ok(v) => v,
Err(quiche::Error::StreamLimit) => {
debug!("not enough stream credits, retry later...");
break;
},
Err(e) => {
error!("failed to send request {:?}", e);
break;
},
};
debug!("sending HTTP request {:?}", req.request_line);
req.stream_id = Some(self.stream_id);
req.response_writer =
make_resource_writer(&req.url, target_path, req.cardinal);
self.stream_id += 4;
reqs_done += 1;
}
self.reqs_sent += reqs_done;
}
fn handle_responses(
&mut self, conn: &mut quiche::Connection, buf: &mut [u8],
req_start: &std::time::Instant,
) {
// Process all readable streams.
for s in conn.readable() {
while let Ok((read, fin)) = conn.stream_recv(s, buf) {
trace!("received {} bytes", read);
let stream_buf = &buf[..read];
trace!(
"stream {} has {} bytes (fin? {})",
s,
stream_buf.len(),
fin
);
let req = self
.reqs
.iter_mut()
.find(|r| r.stream_id == Some(s))
.unwrap();
match &mut req.response_writer {
Some(rw) => {
rw.write_all(&buf[..read]).ok();
},
None => {
self.output_sink.borrow_mut()(unsafe {
String::from_utf8_unchecked(stream_buf.to_vec())
});
},
}
// The server reported that it has no more data to send on
// a client-initiated
// bidirectional stream, which means
// we got the full response. If all responses are received
// then close the connection.
if &s % 4 == 0 && fin {
self.reqs_complete += 1;
let reqs_count = self.reqs.len();
debug!(
"{}/{} responses received",
self.reqs_complete, reqs_count
);
if self.reqs_complete == reqs_count {
info!(
"{}/{} response(s) received in {:?}, closing...",
self.reqs_complete,
reqs_count,
req_start.elapsed()
);
match conn.close(true, 0x00, b"kthxbye") {
// Already closed.
Ok(_) | Err(quiche::Error::Done) => (),
Err(e) => panic!("error closing conn: {:?}", e),
}
break;
}
}
}
}
}
fn report_incomplete(&self, start: &std::time::Instant) -> bool {
if self.reqs_complete != self.reqs.len() {
error!(
"connection timed out after {:?} and only completed {}/{} requests",
start.elapsed(),
self.reqs_complete,
self.reqs.len()
);
return true;
}
false
}
fn handle_requests(
&mut self, conn: &mut quiche::Connection,
partial_requests: &mut HashMap<u64, PartialRequest>,
partial_responses: &mut HashMap<u64, PartialResponse>, root: &str,
index: &str, buf: &mut [u8],
) -> quiche::h3::Result<()> {
// Process all readable streams.
for s in conn.readable() {
while let Ok((read, fin)) = conn.stream_recv(s, buf) {
trace!("{} received {} bytes", conn.trace_id(), read);
let stream_buf = &buf[..read];
trace!(
"{} stream {} has {} bytes (fin? {})",
conn.trace_id(),
s,
stream_buf.len(),
fin
);
let stream_buf =
if let Some(partial) = partial_requests.get_mut(&s) {
partial.req.extend_from_slice(stream_buf);
if !partial.req.ends_with(b"\r\n") {
return Ok(());
}
&partial.req
} else {
if !stream_buf.ends_with(b"\r\n") {
let request = PartialRequest {
req: stream_buf.to_vec(),
};
partial_requests.insert(s, request);
return Ok(());
}
stream_buf
};
if stream_buf.starts_with(b"GET ") {
let uri = &stream_buf[4..stream_buf.len() - 2];
let uri = String::from_utf8(uri.to_vec()).unwrap();
let uri = String::from(uri.lines().next().unwrap());
let uri = path::Path::new(&uri);
let mut path = path::PathBuf::from(root);
partial_requests.remove(&s);
for c in uri.components() {
if let path::Component::Normal(v) = c {
path.push(v)
}
}
path = autoindex(path, index);
info!(
"{} got GET request for {:?} on stream {}",
conn.trace_id(),
path,
s
);
let body = std::fs::read(path.as_path())
.unwrap_or_else(|_| b"Not Found!\r\n".to_vec());
info!(
"{} sending response of size {} on stream {}",
conn.trace_id(),
body.len(),
s
);
let written = match conn.stream_send(s, &body, true) {
Ok(v) => v,
Err(quiche::Error::Done) => 0,
Err(e) => {
error!(
"{} stream send failed {:?}",
conn.trace_id(),
e
);
return Err(From::from(e));
},
};
if written < body.len() {
let response = PartialResponse {
headers: None,
priority: None,
body,
written,
};
partial_responses.insert(s, response);
}
}
}
}
Ok(())
}
fn handle_writable(
&mut self, conn: &mut quiche::Connection,
partial_responses: &mut HashMap<u64, PartialResponse>, stream_id: u64,
) {
trace!("{} stream {} is writable", conn.trace_id(), stream_id);
if !partial_responses.contains_key(&stream_id) {
return;
}
let resp = partial_responses.get_mut(&stream_id).unwrap();
let body = &resp.body[resp.written..];
let written = match conn.stream_send(stream_id, body, true) {
Ok(v) => v,
Err(quiche::Error::Done) => 0,
Err(e) => {
partial_responses.remove(&stream_id);
error!("{} stream send failed {:?}", conn.trace_id(), e);
return;
},
};
resp.written += written;
if resp.written == resp.body.len() {
partial_responses.remove(&stream_id);
}
}
}
pub struct Http3DgramSender {
dgram_count: u64,
pub dgram_content: String,
pub flow_id: u64,
pub dgrams_sent: u64,
}
impl Http3DgramSender {
pub fn new(dgram_count: u64, dgram_content: String, flow_id: u64) -> Self {
Self {
dgram_count,
dgram_content,
flow_id,
dgrams_sent: 0,
}
}
}
fn make_h3_config(
max_field_section_size: Option<u64>, qpack_max_table_capacity: Option<u64>,
qpack_blocked_streams: Option<u64>,
) -> quiche::h3::Config {
let mut config = quiche::h3::Config::new().unwrap();
if let Some(v) = max_field_section_size {
config.set_max_field_section_size(v);
}
if let Some(v) = qpack_max_table_capacity {
// quiche doesn't support dynamic QPACK, so clamp to 0 for now.
config.set_qpack_max_table_capacity(v.clamp(0, 0));
}
if let Some(v) = qpack_blocked_streams {
// quiche doesn't support dynamic QPACK, so clamp to 0 for now.
config.set_qpack_blocked_streams(v.clamp(0, 0));
}
config
}
pub struct Http3Conn {
h3_conn: quiche::h3::Connection,
reqs_hdrs_sent: usize,
reqs_complete: usize,
largest_processed_request: u64,
reqs: Vec<Http3Request>,
body: Option<Vec<u8>>,
sent_body_bytes: HashMap<u64, usize>,
dump_json: bool,
dgram_sender: Option<Http3DgramSender>,
output_sink: Rc<RefCell<dyn FnMut(String)>>,
}
impl Http3Conn {
#[allow(clippy::too_many_arguments)]
pub fn with_urls(
conn: &mut quiche::Connection, urls: &[url::Url], reqs_cardinal: u64,
req_headers: &[String], body: &Option<Vec<u8>>, method: &str,
send_priority_update: bool, max_field_section_size: Option<u64>,
qpack_max_table_capacity: Option<u64>,
qpack_blocked_streams: Option<u64>, dump_json: Option<usize>,
dgram_sender: Option<Http3DgramSender>,
output_sink: Rc<RefCell<dyn FnMut(String)>>,
) -> Box<dyn HttpConn> {
let mut reqs = Vec::new();
for url in urls {
for i in 1..=reqs_cardinal {
let authority = match url.port() {
Some(port) => format!("{}:{}", url.host_str().unwrap(), port),
None => url.host_str().unwrap().to_string(),
};
let mut hdrs = vec![
quiche::h3::Header::new(b":method", method.as_bytes()),
quiche::h3::Header::new(b":scheme", url.scheme().as_bytes()),
quiche::h3::Header::new(b":authority", authority.as_bytes()),
quiche::h3::Header::new(
b":path",
url[url::Position::BeforePath..].as_bytes(),
),
quiche::h3::Header::new(b"user-agent", b"quiche"),
];
let priority = if send_priority_update {
priority_from_query_string(url)
} else {
None
};
// Add custom headers to the request.
for header in req_headers {
let header_split: Vec<&str> =
header.splitn(2, ": ").collect();
if header_split.len() != 2 {
panic!("malformed header provided - \"{}\"", header);
}
hdrs.push(quiche::h3::Header::new(
header_split[0].as_bytes(),
header_split[1].as_bytes(),
));
}
if body.is_some() {
hdrs.push(quiche::h3::Header::new(
b"content-length",
body.as_ref().unwrap().len().to_string().as_bytes(),
));
}
reqs.push(Http3Request {
url: url.clone(),
cardinal: i,
hdrs,
priority,
response_hdrs: Vec::new(),
response_body: Vec::new(),
response_body_max: dump_json.unwrap_or_default(),
stream_id: None,
response_writer: None,
});
}
}
let h_conn = Http3Conn {
h3_conn: quiche::h3::Connection::with_transport(
conn,
&make_h3_config(
max_field_section_size,
qpack_max_table_capacity,
qpack_blocked_streams,
),
).expect("Unable to create HTTP/3 connection, check the server's uni stream limit and window size"),
reqs_hdrs_sent: 0,
reqs_complete: 0,
largest_processed_request: 0,
reqs,
body: body.as_ref().map(|b| b.to_vec()),
sent_body_bytes: HashMap::new(),
dump_json: dump_json.is_some(),
dgram_sender,
output_sink,
};
Box::new(h_conn)
}
pub fn with_conn(
conn: &mut quiche::Connection, max_field_section_size: Option<u64>,
qpack_max_table_capacity: Option<u64>,
qpack_blocked_streams: Option<u64>,
dgram_sender: Option<Http3DgramSender>,
output_sink: Rc<RefCell<dyn FnMut(String)>>,
) -> std::result::Result<Box<dyn HttpConn>, String> {
let h3_conn = quiche::h3::Connection::with_transport(
conn,
&make_h3_config(
max_field_section_size,
qpack_max_table_capacity,
qpack_blocked_streams,
),
).map_err(|_| "Unable to create HTTP/3 connection, check the client's uni stream limit and window size")?;
let h_conn = Http3Conn {
h3_conn,
reqs_hdrs_sent: 0,
reqs_complete: 0,
largest_processed_request: 0,
reqs: Vec::new(),
body: None,
sent_body_bytes: HashMap::new(),
dump_json: false,
dgram_sender,
output_sink,
};
Ok(Box::new(h_conn))
}
/// Builds an HTTP/3 response given a request.
fn build_h3_response(
root: &str, index: &str, request: &[quiche::h3::Header],
) -> Http3ResponseBuilderResult {
let mut file_path = path::PathBuf::from(root);
let mut scheme = None;
let mut authority = None;
let mut host = None;
let mut path = None;
let mut method = None;
let mut priority = vec![];
// Parse some of the request headers.
for hdr in request {
match hdr.name() {
b":scheme" => {
if scheme.is_some() {
return Err((
H3_MESSAGE_ERROR,
":scheme cannot be duplicated".to_string(),
));
}
scheme = Some(std::str::from_utf8(hdr.value()).unwrap());
},
b":authority" => {
if authority.is_some() {
return Err((
H3_MESSAGE_ERROR,
":authority cannot be duplicated".to_string(),
));
}
authority = Some(std::str::from_utf8(hdr.value()).unwrap());
},
b":path" => {
if path.is_some() {
return Err((
H3_MESSAGE_ERROR,
":path cannot be duplicated".to_string(),
));
}
path = Some(std::str::from_utf8(hdr.value()).unwrap())
},
b":method" => {
if method.is_some() {
return Err((
H3_MESSAGE_ERROR,
":method cannot be duplicated".to_string(),
));
}
method = Some(std::str::from_utf8(hdr.value()).unwrap())
},
b":protocol" => {
return Err((
H3_MESSAGE_ERROR,
":protocol not supported".to_string(),
));
},
b"priority" => priority = hdr.value().to_vec(),
b"host" => host = Some(std::str::from_utf8(hdr.value()).unwrap()),
_ => (),
}
}
let decided_method = match method {
Some(method) => {
match method {
"" =>
return Err((
H3_MESSAGE_ERROR,
":method value cannot be empty".to_string(),
)),
"CONNECT" => {
// not allowed
let headers = vec![
quiche::h3::Header::new(
b":status",
"405".to_string().as_bytes(),
),
quiche::h3::Header::new(b"server", b"quiche"),
];
return Ok((headers, b"".to_vec(), Default::default()));
},
_ => method,
}
},
None =>
return Err((
H3_MESSAGE_ERROR,
":method cannot be missing".to_string(),
)),
};
let decided_scheme = match scheme {
Some(scheme) => {
if scheme != "http" && scheme != "https" {
let headers = vec![
quiche::h3::Header::new(
b":status",
"400".to_string().as_bytes(),
),
quiche::h3::Header::new(b"server", b"quiche"),
];
return Ok((
headers,
b"Invalid scheme".to_vec(),
Default::default(),
));
}
scheme
},
None =>
return Err((
H3_MESSAGE_ERROR,
":scheme cannot be missing".to_string(),
)),
};
let decided_host = match (authority, host) {
(None, Some("")) =>
return Err((
H3_MESSAGE_ERROR,
"host value cannot be empty".to_string(),
)),
(Some(""), None) =>
return Err((
H3_MESSAGE_ERROR,
":authority value cannot be empty".to_string(),
)),
(Some(""), Some("")) =>
return Err((
H3_MESSAGE_ERROR,
":authority and host value cannot be empty".to_string(),
)),
(None, None) =>
return Err((
H3_MESSAGE_ERROR,
":authority and host missing".to_string(),
)),
// Any other combo, prefer :authority
(..) => authority.unwrap(),
};
let decided_path = match path {
Some("") =>
return Err((
H3_MESSAGE_ERROR,
":path value cannot be empty".to_string(),
)),
None =>
return Err((
H3_MESSAGE_ERROR,
":path cannot be missing".to_string(),
)),
Some(path) => path,
};
let url = format!("{decided_scheme}://{decided_host}{decided_path}");
let url = url::Url::parse(&url).unwrap();
let pathbuf = path::PathBuf::from(url.path());
let pathbuf = autoindex(pathbuf, index);
// Priority query string takes precedence over the header.
// So replace the header with one built here.
let query_priority = priority_field_value_from_query_string(&url);
if let Some(p) = query_priority {
priority = p.as_bytes().to_vec();
}
let (status, body) = match decided_method {
"GET" => {
for c in pathbuf.components() {
if let path::Component::Normal(v) = c {
file_path.push(v)
}
}
match std::fs::read(file_path.as_path()) {
Ok(data) => (200, data),
Err(_) => (404, b"Not Found!".to_vec()),
}
},
_ => (405, Vec::new()),
};
let headers = vec![
quiche::h3::Header::new(b":status", status.to_string().as_bytes()),
quiche::h3::Header::new(b"server", b"quiche"),
quiche::h3::Header::new(
b"content-length",
body.len().to_string().as_bytes(),
),
];
Ok((headers, body, priority))
}
}
impl HttpConn for Http3Conn {
fn send_requests(
&mut self, conn: &mut quiche::Connection, target_path: &Option<String>,
) {
let mut reqs_done = 0;
// First send headers.
for req in self.reqs.iter_mut().skip(self.reqs_hdrs_sent) {
let s = match self.h3_conn.send_request(
conn,
&req.hdrs,
self.body.is_none(),
) {
Ok(v) => v,
Err(quiche::h3::Error::TransportError(
quiche::Error::StreamLimit,
)) => {
debug!("not enough stream credits, retry later...");
break;
},
Err(quiche::h3::Error::StreamBlocked) => {
debug!("stream is blocked, retry later...");
break;
},
Err(e) => {
error!("failed to send request {:?}", e);
break;
},
};
debug!("Sent HTTP request {:?}", &req.hdrs);
if let Some(priority) = &req.priority {
// If sending the priority fails, don't try again.
self.h3_conn
.send_priority_update_for_request(conn, s, priority)
.ok();
}
req.stream_id = Some(s);
req.response_writer =
make_resource_writer(&req.url, target_path, req.cardinal);
self.sent_body_bytes.insert(s, 0);
reqs_done += 1;
}
self.reqs_hdrs_sent += reqs_done;
// Then send any remaining body.
if let Some(body) = &self.body {
for (stream_id, sent_bytes) in self.sent_body_bytes.iter_mut() {
if *sent_bytes == body.len() {
continue;
}
// Always try to send all remaining bytes, so always set fin to
// true.
let sent = match self.h3_conn.send_body(
conn,
*stream_id,
&body[*sent_bytes..],
true,
) {
Ok(v) => v,
Err(quiche::h3::Error::Done) => 0,
Err(e) => {
error!("failed to send request body {:?}", e);
continue;
},
};
*sent_bytes += sent;
}
}
// And finally any DATAGRAMS.
if let Some(ds) = self.dgram_sender.as_mut() {
let mut dgrams_done = 0;
for _ in ds.dgrams_sent..ds.dgram_count {
info!(
"sending HTTP/3 DATAGRAM on flow_id={} with data {:?}",
ds.flow_id,
ds.dgram_content.as_bytes()
);
match self.h3_conn.send_dgram(
conn,
0,
ds.dgram_content.as_bytes(),
) {
Ok(v) => v,
Err(e) => {
error!("failed to send dgram {:?}", e);
break;
},
}
dgrams_done += 1;
}
ds.dgrams_sent += dgrams_done;
}
}
fn handle_responses(
&mut self, conn: &mut quiche::Connection, buf: &mut [u8],
req_start: &std::time::Instant,
) {
loop {
match self.h3_conn.poll(conn) {
Ok((stream_id, quiche::h3::Event::Headers { list, .. })) => {
debug!(
"got response headers {:?} on stream id {}",
hdrs_to_strings(&list),
stream_id
);
let req = self
.reqs
.iter_mut()
.find(|r| r.stream_id == Some(stream_id))
.unwrap();
req.response_hdrs = list;
},
Ok((stream_id, quiche::h3::Event::Data)) => {
while let Ok(read) =
self.h3_conn.recv_body(conn, stream_id, buf)
{
debug!(
"got {} bytes of response data on stream {}",
read, stream_id
);
let req = self
.reqs
.iter_mut()
.find(|r| r.stream_id == Some(stream_id))
.unwrap();
let len = std::cmp::min(
read,
req.response_body_max - req.response_body.len(),
);
req.response_body.extend_from_slice(&buf[..len]);
match &mut req.response_writer {
Some(rw) => {
rw.write_all(&buf[..read]).ok();
},
None =>
if !self.dump_json {
self.output_sink.borrow_mut()(unsafe {
String::from_utf8_unchecked(
buf[..read].to_vec(),
)
});
},
}
}
},
Ok((_stream_id, quiche::h3::Event::Finished)) => {
self.reqs_complete += 1;
let reqs_count = self.reqs.len();
debug!(
"{}/{} responses received",
self.reqs_complete, reqs_count
);
if self.reqs_complete == reqs_count {
info!(
"{}/{} response(s) received in {:?}, closing...",
self.reqs_complete,
reqs_count,
req_start.elapsed()
);
if self.dump_json {
dump_json(
&self.reqs,
&mut *self.output_sink.borrow_mut(),
);
}
match conn.close(true, 0x100, b"kthxbye") {
// Already closed.
Ok(_) | Err(quiche::Error::Done) => (),
Err(e) => panic!("error closing conn: {:?}", e),
}
break;
}
},
Ok((_stream_id, quiche::h3::Event::Reset(e))) => {
error!("request was reset by peer with {}, closing...", e);
match conn.close(true, 0x100, b"kthxbye") {
// Already closed.
Ok(_) | Err(quiche::Error::Done) => (),
Err(e) => panic!("error closing conn: {:?}", e),
}
break;
},
Ok((_flow_id, quiche::h3::Event::Datagram)) => {
while let Ok((len, flow_id, flow_id_len)) =
self.h3_conn.recv_dgram(conn, buf)
{
info!(
"Received DATAGRAM flow_id={} len={} data={:?}",
flow_id,
len,
buf[flow_id_len..len].to_vec()
);
}
},
Ok((
prioritized_element_id,
quiche::h3::Event::PriorityUpdate,
)) => {
info!(
"{} PRIORITY_UPDATE triggered for element ID={}",
conn.trace_id(),
prioritized_element_id
);
},
Ok((goaway_id, quiche::h3::Event::GoAway)) => {
info!(
"{} got GOAWAY with ID {} ",
conn.trace_id(),
goaway_id
);
},
Err(quiche::h3::Error::Done) => {
break;
},
Err(e) => {
error!("HTTP/3 processing failed: {:?}", e);
break;
},
}
}
}
fn report_incomplete(&self, start: &std::time::Instant) -> bool {
if self.reqs_complete != self.reqs.len() {
error!(
"connection timed out after {:?} and only completed {}/{} requests",
start.elapsed(),
self.reqs_complete,
self.reqs.len()
);
if self.dump_json {
dump_json(&self.reqs, &mut *self.output_sink.borrow_mut());
}
return true;
}
false
}
fn handle_requests(
&mut self, conn: &mut quiche::Connection,
_partial_requests: &mut HashMap<u64, PartialRequest>,
partial_responses: &mut HashMap<u64, PartialResponse>, root: &str,
index: &str, buf: &mut [u8],
) -> quiche::h3::Result<()> {
// Process HTTP events.
loop {
match self.h3_conn.poll(conn) {
Ok((stream_id, quiche::h3::Event::Headers { list, .. })) => {
info!(
"{} got request {:?} on stream id {}",
conn.trace_id(),
hdrs_to_strings(&list),
stream_id
);
self.largest_processed_request =
std::cmp::max(self.largest_processed_request, stream_id);
// We decide the response based on headers alone, so
// stop reading the request stream so that any body
// is ignored and pointless Data events are not
// generated.
conn.stream_shutdown(stream_id, quiche::Shutdown::Read, 0)
.unwrap();
let (mut headers, body, mut priority) =
match Http3Conn::build_h3_response(root, index, &list) {
Ok(v) => v,
Err((error_code, _)) => {
conn.stream_shutdown(
stream_id,
quiche::Shutdown::Write,
error_code,
)
.unwrap();
continue;
},
};
match self.h3_conn.take_last_priority_update(stream_id) {
Ok(v) => {
priority = v;
},
Err(quiche::h3::Error::Done) => (),
Err(e) => error!(
"{} error taking PRIORITY_UPDATE {}",
conn.trace_id(),
e
),
}
if !priority.is_empty() {
headers.push(quiche::h3::Header::new(
b"priority",
priority.as_slice(),
));
}
#[cfg(feature = "sfv")]
let priority =
match quiche::h3::Priority::try_from(priority.as_slice())
{
Ok(v) => v,
Err(_) => quiche::h3::Priority::default(),
};
#[cfg(not(feature = "sfv"))]
let priority = quiche::h3::Priority::default();
info!(
"{} prioritizing response on stream {} as {:?}",
conn.trace_id(),
stream_id,
priority
);
match self.h3_conn.send_response_with_priority(
conn, stream_id, &headers, &priority, false,
) {
Ok(v) => v,
Err(quiche::h3::Error::StreamBlocked) => {
let response = PartialResponse {
headers: Some(headers),
priority: Some(priority),
body,
written: 0,
};
partial_responses.insert(stream_id, response);
continue;
},
Err(e) => {
error!(
"{} stream send failed {:?}",
conn.trace_id(),
e
);
break;
},
}
let written = match self
.h3_conn
.send_body(conn, stream_id, &body, true)
{
Ok(v) => v,
Err(quiche::h3::Error::Done) => 0,
Err(e) => {
error!(
"{} stream send failed {:?}",
conn.trace_id(),
e
);
break;
},
};
if written < body.len() {
let response = PartialResponse {
headers: None,
priority: None,
body,
written,
};
partial_responses.insert(stream_id, response);
}
},
Ok((stream_id, quiche::h3::Event::Data)) => {
info!(
"{} got data on stream id {}",
conn.trace_id(),
stream_id
);
},
Ok((_stream_id, quiche::h3::Event::Finished)) => (),
Ok((_stream_id, quiche::h3::Event::Reset { .. })) => (),
Ok((_, quiche::h3::Event::Datagram)) => {
while let Ok((len, flow_id, flow_id_len)) =
self.h3_conn.recv_dgram(conn, buf)
{
info!(
"Received DATAGRAM flow_id={} len={} data={:?}",
flow_id,
len,
buf[flow_id_len..len].to_vec()
);
}
},
Ok((
prioritized_element_id,
quiche::h3::Event::PriorityUpdate,
)) => {
info!(
"{} PRIORITY_UPDATE triggered for element ID={}",
conn.trace_id(),
prioritized_element_id
);
},
Ok((goaway_id, quiche::h3::Event::GoAway)) => {
trace!(
"{} got GOAWAY with ID {} ",
conn.trace_id(),
goaway_id
);
self.h3_conn
.send_goaway(conn, self.largest_processed_request)?;
},
Err(quiche::h3::Error::Done) => {
break;
},
Err(e) => {
error!("{} HTTP/3 error {:?}", conn.trace_id(), e);
return Err(e);
},
}
}
if let Some(ds) = self.dgram_sender.as_mut() {
let mut dgrams_done = 0;
for _ in ds.dgrams_sent..ds.dgram_count {
info!(
"sending HTTP/3 DATAGRAM on flow_id={} with data {:?}",
ds.flow_id,
ds.dgram_content.as_bytes()
);
match self.h3_conn.send_dgram(
conn,
0,
ds.dgram_content.as_bytes(),
) {
Ok(v) => v,
Err(e) => {
error!("failed to send dgram {:?}", e);
break;
},
}
dgrams_done += 1;
}
ds.dgrams_sent += dgrams_done;
}
Ok(())
}
fn handle_writable(
&mut self, conn: &mut quiche::Connection,
partial_responses: &mut HashMap<u64, PartialResponse>, stream_id: u64,
) {
debug!("{} stream {} is writable", conn.trace_id(), stream_id);
if !partial_responses.contains_key(&stream_id) {
return;
}
let resp = partial_responses.get_mut(&stream_id).unwrap();
if let (Some(headers), Some(priority)) = (&resp.headers, &resp.priority) {
match self.h3_conn.send_response_with_priority(
conn, stream_id, headers, priority, false,
) {
Ok(_) => (),
Err(quiche::h3::Error::StreamBlocked) => {
return;
},
Err(e) => {
error!("{} stream send failed {:?}", conn.trace_id(), e);
return;
},
}
}
resp.headers = None;
resp.priority = None;
let body = &resp.body[resp.written..];
let written = match self.h3_conn.send_body(conn, stream_id, body, true) {
Ok(v) => v,
Err(quiche::h3::Error::Done) => 0,
Err(e) => {
partial_responses.remove(&stream_id);
error!("{} stream send failed {:?}", conn.trace_id(), e);
return;
},
};
resp.written += written;
if resp.written == resp.body.len() {
partial_responses.remove(&stream_id);
}
}
}