blob: 39c827a2ba7922018975c70fcbd81d08d8717a18 [file] [log] [blame]
// Copyright 2016 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "peridot/lib/firebase/event_stream.h"
#include <utility>
#include <lib/fit/function.h>
#include <src/lib/fxl/logging.h>
#include <src/lib/fxl/strings/trim.h>
namespace firebase {
EventStream::EventStream() {}
EventStream::~EventStream() {}
void EventStream::Start(zx::socket source,
fit::function<EventCallback> event_callback,
fit::function<CompletionCallback> completion_callback) {
event_callback_ = std::move(event_callback);
completion_callback_ = std::move(completion_callback);
drainer_ = std::make_unique<fsl::SocketDrainer>(this);
drainer_->Start(std::move(source));
}
void EventStream::OnDataAvailable(const void* data, size_t num_bytes) {
const char* current = static_cast<const char*>(data);
const char* const end = current + num_bytes;
while (current < end) {
const char* newline = std::find(current, end, '\n');
pending_line_.append(current, newline - current);
current = newline;
if (newline != end) {
if (!ProcessLine(pending_line_))
return;
pending_line_.clear();
++current;
}
}
}
void EventStream::OnDataComplete() { completion_callback_(); }
// See https://www.w3.org/TR/eventsource/#event-stream-interpretation.
bool EventStream::ProcessLine(fxl::StringView line) {
// If the line is empty, dispatch the event.
if (line.empty()) {
// If data is empty, clear event type and abort.
if (data_.empty()) {
event_type_.clear();
return true;
}
// Remove the trailing line break from data.
if (*(data_.rbegin()) == '\n') {
data_.resize(data_.size() - 1);
}
if (destruction_sentinel_.DestructedWhile(
[this] { event_callback_(Status::OK, event_type_, data_); })) {
return false;
}
event_type_.clear();
data_.clear();
return true;
}
// If the line starts with a colon, ignore the line.
if (line[0] == ':') {
return true;
}
// If the line contains a colon, process the field.
size_t colon_pos = line.find(':');
if (colon_pos != std::string::npos) {
fxl::StringView field(line.substr(0, colon_pos));
fxl::StringView value = line.substr(colon_pos + 1);
ProcessField(field, fxl::TrimString(value, " "));
return true;
}
// If the line does not contain a colon, process the field using the whole
// line as the field name and empty string as field value.
ProcessField(line, "");
return true;
}
void EventStream::ProcessField(fxl::StringView field, fxl::StringView value) {
if (field == "event") {
event_type_ = value.ToString();
} else if (field == "data") {
data_.append(value.data(), value.size());
data_.append("\n");
} else if (field == "id" || field == "retry") {
// Not implemented.
FXL_LOG(WARNING) << "Event stream - field type not implemented: " << field;
} else {
// The spec says to ignore unknown field names.
FXL_LOG(WARNING) << "Event stream - unknown field name: " << field;
}
}
} // namespace firebase