blob: f928b7edc36adb43770cde1a9e54f23bf70d5e24 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "garnet/lib/overnet/endpoint/message_builder.h"
#include "garnet/lib/overnet/protocol/fidl.h"
namespace overnet {
namespace {
static const uint8_t kFirstRequiredParse = 1;
static const uint8_t kFirstSkippableParse = 65;
static const uint8_t kFirstHandle = 128;
enum class MessageFragmentType : uint8_t {
kTxId = 1,
kOrdinal = 2,
kBody = 127,
kChannel = 128,
};
} // namespace
StatusOr<RouterEndpoint::NewStream> MessageWireEncoder::AppendChannelHandle(
fuchsia::overnet::protocol::Introduction introduction) {
auto fork_status = stream_->Fork(
fuchsia::overnet::protocol::ReliabilityAndOrdering::ReliableOrdered,
std::move(introduction));
if (fork_status.is_error()) {
return fork_status.AsStatus();
}
auto fork_frame = std::move(*Encode(&fork_status->fork_frame));
auto fork_frame_len = fork_frame.length();
auto fork_frame_len_len = varint::WireSizeFor(fork_frame_len);
tail_.emplace_back(Slice::WithInitializer(
1 + fork_frame_len_len + fork_frame_len, [&](uint8_t* p) {
*p++ = static_cast<uint8_t>(MessageFragmentType::kChannel);
p = varint::Write(fork_frame_len, fork_frame_len_len, p);
memcpy(p, fork_frame.begin(), fork_frame_len);
}));
return std::move(fork_status->new_stream);
}
Slice MessageWireEncoder::Write(Border desired_border) const {
// Wire format:
// (1-byte fragment id, fragment length varint, fragment data)*
// fragment id's must be in ordinal order (except for handles, which must be
// last and be placed in the order which they should be appended)
const auto txid_len = txid_ ? varint::WireSizeFor(txid_) : 0;
const auto txid_len_len = txid_ ? varint::WireSizeFor(txid_len) : 0;
assert(ordinal_ != 0);
const auto ordinal_len = varint::WireSizeFor(ordinal_);
const auto ordinal_len_len = varint::WireSizeFor(ordinal_len);
const auto body_len = body_.length();
const auto body_len_len = varint::WireSizeFor(body_len);
const auto message_length_without_tail =
// space for txid
(txid_ ? 1 + txid_len_len + txid_len : 0) +
// space for ordinal
1 + ordinal_len_len + ordinal_len +
// space for body
1 + body_len_len + body_len;
return Slice::Join(
tail_.begin(), tail_.end(),
desired_border.WithAddedPrefix(message_length_without_tail))
.WithPrefix(message_length_without_tail, [&](uint8_t* const data) {
uint8_t* p = data;
if (txid_) {
*p++ = static_cast<uint8_t>(MessageFragmentType::kTxId);
p = varint::Write(txid_len, txid_len_len, p);
p = varint::Write(txid_, txid_len, p);
}
*p++ = static_cast<uint8_t>(MessageFragmentType::kOrdinal);
p = varint::Write(ordinal_len_len, ordinal_len, p);
p = varint::Write(ordinal_, ordinal_len, p);
*p++ = static_cast<uint8_t>(MessageFragmentType::kBody);
p = varint::Write(body_len, body_len_len, p);
memcpy(p, body_.begin(), body_len);
p += body_len;
assert(p == data + message_length_without_tail);
});
}
Status ParseMessageInto(Slice slice, NodeId peer,
RouterEndpoint* router_endpoint,
MessageReceiver* builder) {
const uint8_t* p = slice.begin();
const uint8_t* end = slice.end();
MessageFragmentType largest_fragment_id_seen =
static_cast<MessageFragmentType>(0);
while (p != end) {
const MessageFragmentType fragment_type =
static_cast<MessageFragmentType>(*p++);
if (fragment_type >= static_cast<MessageFragmentType>(kFirstHandle)) {
largest_fragment_id_seen = static_cast<MessageFragmentType>(kFirstHandle);
} else {
if (fragment_type <= largest_fragment_id_seen) {
return Status(StatusCode::FAILED_PRECONDITION,
"Message fragments must be written in ascending order of "
"fragment ordinal");
}
largest_fragment_id_seen = fragment_type;
}
uint64_t fragment_length;
if (!varint::Read(&p, end, &fragment_length)) {
return Status(StatusCode::FAILED_PRECONDITION,
"Failed to read message fragment length");
}
if (fragment_length > uint64_t(p - end)) {
return Status(StatusCode::FAILED_PRECONDITION,
"Fragment length is longer than total message");
}
const uint8_t* next_fragment = p + fragment_length;
switch (fragment_type) {
case MessageFragmentType::kTxId: {
uint64_t txid;
if (!varint::Read(&p, end, &txid)) {
return Status(StatusCode::FAILED_PRECONDITION,
"Failed to parse txid");
}
if (txid >= 0x80000000u) {
return Status(StatusCode::FAILED_PRECONDITION, "Txid out of range");
}
auto set_status = builder->SetTransactionId(txid);
if (set_status.is_error()) {
return set_status;
}
} break;
case MessageFragmentType::kOrdinal: {
uint64_t ordinal;
if (!varint::Read(&p, end, &ordinal)) {
return Status(StatusCode::FAILED_PRECONDITION,
"Failed to parse ordinal");
}
if (ordinal > std::numeric_limits<uint32_t>::max() || ordinal == 0) {
return Status(StatusCode::FAILED_PRECONDITION,
"Ordinal out of range");
}
auto set_status = builder->SetOrdinal(ordinal);
if (set_status.is_error()) {
return set_status;
}
} break;
case MessageFragmentType::kBody: {
auto set_status =
builder->SetBody(slice.FromPointer(p).ToOffset(fragment_length));
if (set_status.is_error()) {
return set_status;
}
p = next_fragment;
} break;
case MessageFragmentType::kChannel: {
auto fork_frame = Decode<fuchsia::overnet::protocol::ForkFrame>(
slice.FromPointer(p).ToOffset(fragment_length));
if (fork_frame.is_error()) {
return fork_frame.AsStatus();
}
auto intro =
router_endpoint->UnwrapForkFrame(peer, std::move(*fork_frame));
if (intro.is_error()) {
return intro.AsStatus();
}
auto append_status = builder->AppendChannelHandle(std::move(*intro));
if (append_status.is_error()) {
return append_status;
}
p = next_fragment;
} break;
default: {
const uint8_t type_byte = static_cast<uint8_t>(fragment_type);
if (type_byte >= kFirstRequiredParse &&
type_byte < kFirstSkippableParse) {
return Status(StatusCode::FAILED_PRECONDITION,
"Failed to parse a fragment that is required: "
"version mismatch?");
} else {
if (type_byte >= kFirstHandle) {
auto set_status = builder->AppendUnknownHandle();
if (set_status.is_error()) {
return set_status;
}
}
p = next_fragment;
}
} break;
}
assert(p == next_fragment);
p = next_fragment;
}
return Status::Ok();
}
} // namespace overnet