blob: 569d390025cf9d6b0e60ce4e2f455f6e61767038 [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use std::{borrow::Cow, mem::replace};
use crate::{linealyzer::Linealyzer, Event};
/// `EventSource` `parse`s byte slices from an http sse connection into sse `Events`.
/// It maintains state across `parse` calls, so that an `Event` whose bytes
/// are split across two calls to `parse` will be returned by the second call.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct EventSource {
linealyzer: Linealyzer,
processed_first_line: bool,
event_type: String,
data: String,
// http sse spec says to append a newline to the data buffer on each receipt
// of a data field, and then to remove the last newline from the data buffer
// before reconstituting the event. Appending the newline forces an extra
// allocation/str copy on the simple path of events with single-line data,
// so we keep track of how many newlines to append instead.
data_trailing_newlines: usize,
impl EventSource {
pub fn new() -> Self {
Self {
linealyzer: Linealyzer::new(),
processed_first_line: false,
event_type: String::new(),
data: String::new(),
data_trailing_newlines: 0,
/// Ingest more bytes from an http sse stream and return all completed `Event`s.
pub fn parse(&mut self, bytes: &[u8]) -> Vec<Event> {
let mut ret = vec![];
for mut line in self.linealyzer.feed(bytes) {
// http sse stream is allowed to begin with an optional utf8-encoded
// byte order mark (bom), that should be ignored.
if !self.processed_first_line {
self.processed_first_line = true;
if line.starts_with(b"\xef\xbb\xbf") {
line = match line {
Cow::Borrowed(b) => Cow::Borrowed(&b[3..]),
Cow::Owned(mut b) => {
if line.is_empty() {
if && self.data_trailing_newlines == 0 {
} else {
let n = self.data_trailing_newlines - 1;;
for _ in 0..n {'\n')
self.data_trailing_newlines = 0;
// self.event_type cannot have carriage returns or newlines
// cannot be empty or have carriage returns in it
// so event creation should never fail
replace(&mut self.event_type, String::new()),
replace(&mut, String::new()),
} else if line[0] == b':' { // ignore these lines
} else {
let (name, value) = match line.iter().position(|b| *b == b':') {
Some(p) => {
let (name, mut value) = line.split_at(p);
value = &value[1..];
if !value.is_empty() && value[0] == b' ' {
value = &value[1..];
None => (String::from_utf8_lossy(&line).into_owned(), String::new()),
if name == "event" {
self.event_type = value;
} else if name == "data" {
if !value.is_empty() {
if self.data_trailing_newlines > 0 { + value.len());
for _ in 0..self.data_trailing_newlines {'\n');
self.data_trailing_newlines = 0;
if { = value;
} else {;
self.data_trailing_newlines += 1;
} // ignoring unrecognized field names, including "id" and "retry"
mod tests {
use super::*;
use proptest::prelude::*;
fn event_and_data() {
let bs = b"event: type\n\
data: data\n\n";
let mut event_source = EventSource::new();
let events = event_source.parse(bs);
assert_eq!(events, vec![Event::from_type_and_data("type", "data").unwrap()]);
fn data_before_event() {
let bs = b"data: data\n\
event: type\n\n";
let mut event_source = EventSource::new();
let events = event_source.parse(bs);
assert_eq!(events, vec![Event::from_type_and_data("type", "data").unwrap()]);
fn bom_stripped() {
let bs = b"\xef\xbb\xbfevent: type\n\
data: data\n\n";
let mut event_source = EventSource::new();
let events = event_source.parse(bs);
assert_eq!(events, vec![Event::from_type_and_data("type", "data").unwrap()]);
fn partial_bom_not_stripped() {
let bs = b"\xef\xbbevent: type\n\
data: data\n\n";
let mut event_source = EventSource::new();
let events = event_source.parse(bs);
assert_eq!(events, vec![Event::from_type_and_data("", "data").unwrap()]);
fn bom_stripped_only_on_first_line() {
let bs = b"\xef\xbb\xbfevent: type\n\
\xef\xbb\xbfdata: data\n\n";
let mut event_source = EventSource::new();
let events = event_source.parse(bs);
assert_eq!(events, vec![]);
fn invalid_utf8_replaced() {
let bs = b"event: type\xff\n\
data: data\xff\n\n";
let mut event_source = EventSource::new();
let events = event_source.parse(bs);
assert_eq!(events, vec![Event::from_type_and_data("type�", "data�").unwrap()]);
fn colon_allowed_in_field_value() {
let bs = b"event: type\n\
data: :da:ta\n\n";
let mut event_source = EventSource::new();
let events = event_source.parse(bs);
assert_eq!(events, vec![Event::from_type_and_data("type", ":da:ta").unwrap()]);
fn event_with_no_data_is_dropped() {
let bs = b"event: type\n\n";
let mut event_source = EventSource::new();
let events = event_source.parse(bs);
assert_eq!(events, vec![]);
let events = event_source.parse(b"data: data\n\n");
assert_eq!(events, vec![Event::from_type_and_data("", "data").unwrap()]);
fn consecutive_event_replaces() {
let bs = b"event: type\n\
data: data\n\
event: replaced_type\n\n";
let mut event_source = EventSource::new();
let events = event_source.parse(bs);
assert_eq!(events, vec![Event::from_type_and_data("replaced_type", "data").unwrap()]);
fn consecutive_data_concatenates_with_newline() {
let bs = b"data: data1\n\
data: data2\n\n";
let mut event_source = EventSource::new();
let events = event_source.parse(bs);
assert_eq!(events, vec![Event::from_type_and_data("", "data1\ndata2").unwrap()]);
fn consecutive_empty_lines_does_nothing() {
let bs = b"data: data\n\n\n\n\n\n\n\n";
let mut event_source = EventSource::new();
let events = event_source.parse(bs);
assert_eq!(events, vec![Event::from_type_and_data("", "data").unwrap()]);
fn missing_field_value_is_empty_string() {
let bs = b"event: type\n\
data: data\n\
let mut event_source = EventSource::new();
let events = event_source.parse(bs);
assert_eq!(events, vec![Event::from_type_and_data("", "data\n").unwrap()]);
fn field_value_without_leading_space_not_stripped() {
let bs = b"event: type\n\
let mut event_source = EventSource::new();
let events = event_source.parse(bs);
assert_eq!(events, vec![Event::from_type_and_data("type", "data").unwrap()]);
fn only_first_field_value_space_stripped() {
let bs = b"event: type\n\
data: data\n\n";
let mut event_source = EventSource::new();
let events = event_source.parse(bs);
assert_eq!(events, vec![Event::from_type_and_data("type", " data").unwrap()]);
fn comment_lines_ignored() {
let bs = b"event: type\n\
:event: other_type\n\
data: data\n\n";
let mut event_source = EventSource::new();
let events = event_source.parse(bs);
assert_eq!(events, vec![Event::from_type_and_data("type", "data").unwrap()]);
fn unknown_field_names_ignored() {
let bs = b"event: type\n\
event2: other_type\n\
data: data\n\n";
let mut event_source = EventSource::new();
let events = event_source.parse(bs);
assert_eq!(events, vec![Event::from_type_and_data("type", "data").unwrap()]);
fn event_and_data_persisted_across_parse() {
let mut event_source = EventSource::new();
assert_eq!(event_source.parse(b"event: type\n"), vec![]);
assert_eq!(event_source.parse(b"data: data\n"), vec![]);
vec![Event::from_type_and_data("type", "data").unwrap()]
fn event_and_data_not_persisted_across_parse_after_dispatch() {
let bs = b"event: type\n\
data: data\n\n";
let mut event_source = EventSource::new();
let events = event_source.parse(bs);
assert_eq!(events, vec![Event::from_type_and_data("type", "data").unwrap()]);
event_source.parse(b"data: data2\n\n"),
vec![Event::from_type_and_data("", "data2").unwrap()]
fn assert_all_3_byte_partitionings(bytes: &[u8], events: Vec<Event>) {
for i in 0..bytes.len() {
for j in i..bytes.len() {
let mut event_source = EventSource::new();
let mut parsed_events = vec![];
parsed_events.append(&mut event_source.parse(&bytes[..i]));
parsed_events.append(&mut event_source.parse(&bytes[i..j]));
parsed_events.append(&mut event_source.parse(&bytes[j..]));
assert_eq!(parsed_events, events, "i: {}, j: {}, bytes: {:?}", i, j, bytes);
fn parse_event_all_3_byte_partitionings() {
let bs = b"event: type\n\
data: data\n\n";
vec![Event::from_type_and_data("type", "data").unwrap()],
fn parse_two_events_all_3_byte_partitionings() {
let bs = b"event: type\n\
data: data\n\
data: data2\n\
event: type2\n\n";
Event::from_type_and_data("type", "data").unwrap(),
Event::from_type_and_data("type2", "data2").unwrap(),
prop_compose! {
fn random_event()
(event_type in "[^\r\n]{0,20}",
data in "[^\r]{1,20}") -> Event
Event::from_type_and_data(event_type, data).unwrap()
prop_compose! {
fn random_adversarial_event()
(event_type in "[a:]{0,3}",
data in "[a\n]{1,5}") -> Event
Event::from_type_and_data(event_type, data).unwrap()
proptest! {
// Disable persistence to avoid the warning for not running in the
// source code directory (since we're running on a Fuchsia target)
failure_persistence: None,
.. ProptestConfig::default()
fn random_event_serialize_deserialize(
event in random_event())
let mut bytes = vec![];
event.to_writer(&mut bytes).unwrap();
assert_eq!(EventSource::new().parse(&bytes), vec![event.clone()]);
fn random_events_serialize_deserialize(
events in prop::collection::vec(random_event(), 0..10))
let mut bytes = vec![];
for event in events.iter() {
event.to_writer(&mut bytes).unwrap();
assert_eq!(EventSource::new().parse(&bytes), events);
fn random_adversarial_events_serialize_deserialize(
events in prop::collection::vec(random_adversarial_event(), 0..10))
let mut bytes = vec![];
for event in events.iter() {
event.to_writer(&mut bytes).unwrap();
assert_eq!(EventSource::new().parse(&bytes), events);
fn random_events_serialize_deserialize_all_3_line_partitionings(
events in prop::collection::vec(random_event(), 0..4))
fn random_adversarial_events_serialize_deserialize_all_3_line_partitionings(
events in prop::collection::vec(random_adversarial_event(), 0..4))
fn assert_all_3_line_partitionings(events: Vec<Event>) {
let mut bytes = vec![];
for event in events.iter() {
event.to_writer(&mut bytes).unwrap();
let splits: Vec<&[u8]> = bytes.split(|b| *b == b'\n').collect();
for i in 0..=splits.len() {
for j in i..=splits.len() {
let mut event_source = EventSource::new();
let mut parsed_events = vec![];
parsed_events.append(&mut event_source.parse(&splits[0..i].join(&b'\n')));
if i > 0 {
parsed_events.append(&mut event_source.parse(b"\n"));
parsed_events.append(&mut event_source.parse(&splits[i..j].join(&b'\n')));
if j > i {
parsed_events.append(&mut event_source.parse(b"\n"));
parsed_events.append(&mut event_source.parse(&splits[j..].join(&b'\n')));
if j < splits.len() {
parsed_events.append(&mut event_source.parse(b"\n"));
assert_eq!(parsed_events, events, "i: {}, j: {}", i, j);