blob: 318c094142fc9424372cb7ef3f7d1465b631a106 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "receive_mode.h"
namespace overnet {
namespace receive_mode {
static const uint64_t kMaxSeq = ~uint64_t(0);
///////////////////////////////////////////////////////////////////////////////
// ReliableOrdered
void ReliableOrdered::Begin(uint64_t seq, BeginCallback ready) {
if (closed_.has_value()) {
ready(closed_->OrCancelled());
return;
}
if (seq < cur_) return;
if (cur_ == seq) {
if (!cur_in_progress_) {
cur_in_progress_ = true;
ready(Status::Ok());
}
} else {
later_[seq] = std::move(ready);
}
}
void ReliableOrdered::Completed(uint64_t seq, const Status& status) {
assert(seq == cur_ && cur_in_progress_);
if (status.is_ok()) {
if (cur_ != kMaxSeq) {
cur_++;
cur_in_progress_ = false;
auto it = later_.find(cur_);
if (it != later_.end()) {
cur_in_progress_ = true;
auto cb = std::move(it->second);
later_.erase(it);
cb(Status::Ok());
}
}
} else {
cur_in_progress_ = false;
}
}
void ReliableOrdered::Close(const Status& status) {
if (closed_.has_value()) return;
closed_ = status;
std::unordered_map<uint64_t, BeginCallback> later;
later_.swap(later);
for (auto& p : later) {
p.second(closed_->OrCancelled());
}
}
///////////////////////////////////////////////////////////////////////////////
// ReliableUnordered
void ReliableUnordered::Begin(uint64_t seq, BeginCallback ready) {
if (closed_.has_value()) {
ready(closed_->OrCancelled());
return;
}
if (seq < tip_) return;
if (seq >= tip_ + kLookaheadWindow) return;
auto idx = seq - tip_;
if (in_progress_.test(idx)) return;
in_progress_.set(idx);
ready(Status::Ok());
for (uint64_t i = 0; i < idx; i++) {
if (!in_progress_.test(i)) return;
}
}
void ReliableUnordered::Completed(uint64_t seq, const Status& status) {
auto idx = seq - tip_;
if (status.is_ok()) {
done_.set(idx);
if (idx == 0) {
// TODO(ctiller): count how far to shift, and then do this, as the
// shifts could be expensive.
while (tip_ != kMaxSeq && done_.test(0)) {
tip_++;
in_progress_ >>= 1;
done_ >>= 1;
}
}
} else {
in_progress_.reset(idx);
}
}
void ReliableUnordered::Close(const Status& status) {
if (closed_.has_value()) return;
closed_ = status;
}
///////////////////////////////////////////////////////////////////////////////
// UnreliableOrdered
void UnreliableOrdered::Begin(uint64_t seq, BeginCallback ready) {
if (closed_.has_value()) {
ready(closed_->OrCancelled());
return;
}
if (seq < cur_) return;
if (seq > cur_ && cur_in_progress_) {
later_[seq] = std::move(ready);
return;
}
assert(seq >= cur_);
if (cur_in_progress_) return;
cur_in_progress_ = true;
cur_ = seq;
ready(Status::Ok());
}
void UnreliableOrdered::Completed(uint64_t seq, const Status& status) {
assert(seq == cur_);
assert(cur_in_progress_);
cur_in_progress_ = false;
if (!later_.empty()) {
auto it = later_.begin();
uint64_t later_seq = it->first;
BeginCallback later_cb = std::move(it->second);
later_.erase(it);
assert(later_seq > cur_);
cur_ = later_seq;
cur_in_progress_ = true;
later_cb(Status::Ok());
} else {
if (status.is_ok() && cur_ != kMaxSeq) cur_++;
}
}
void UnreliableOrdered::Close(const Status& status) {
if (closed_.has_value()) return;
closed_ = status;
std::map<uint64_t, BeginCallback> later;
later_.swap(later);
for (auto& p : later) {
p.second(closed_->OrCancelled());
}
}
///////////////////////////////////////////////////////////////////////////////
// UnreliableUnordered
void UnreliableUnordered::Begin(uint64_t seq, BeginCallback ready) {
if (closed_.has_value()) {
ready(closed_->OrCancelled());
return;
}
if (seq < tip_) return;
if (seq >= kLookaheadWindow && seq - kLookaheadWindow >= tip_) {
uint64_t new_tip = seq - kLookaheadWindow + 1;
assert(tip_ < new_tip);
uint64_t move = new_tip - tip_;
if (move > kLookaheadWindow) {
in_progress_.reset();
} else {
in_progress_ >>= move;
}
tip_ = new_tip;
}
if (!in_progress_.test(seq - tip_)) {
in_progress_.set(seq - tip_);
ready(Status::Ok());
}
}
void UnreliableUnordered::Completed(uint64_t seq, const Status& status) {
if (seq < tip_) return;
if (status.is_ok()) {
if (seq == tip_) {
if (tip_ != kMaxSeq) {
tip_++;
in_progress_ >>= 1;
}
}
} else {
in_progress_.reset(seq - tip_);
}
}
void UnreliableUnordered::Close(const Status& status) {
if (closed_.has_value()) return;
closed_ = status;
}
///////////////////////////////////////////////////////////////////////////////
// TailReliable
void TailReliable::Begin(uint64_t seq, BeginCallback ready) {
if (closed_.has_value()) {
ready(closed_->OrCancelled());
return;
}
if (seq < cur_) return;
if (seq > cur_ && cur_in_progress_) {
later_[seq] = std::move(ready);
return;
}
assert(seq >= cur_);
if (cur_in_progress_) return;
cur_in_progress_ = true;
cur_ = seq;
ready(Status::Ok());
}
void TailReliable::Completed(uint64_t seq, const Status& status) {
assert(seq == cur_);
assert(cur_in_progress_);
cur_in_progress_ = false;
if (!later_.empty()) {
auto it = later_.begin();
uint64_t later_seq = it->first;
BeginCallback later_cb = std::move(it->second);
later_.erase(it);
assert(later_seq > cur_);
cur_ = later_seq;
cur_in_progress_ = true;
later_cb(Status::Ok());
} else {
if (status.is_ok() && cur_ != kMaxSeq) cur_++;
}
}
void TailReliable::Close(const Status& status) {
if (closed_.has_value()) return;
closed_ = status;
std::map<uint64_t, BeginCallback> later;
later_.swap(later);
for (auto& p : later) {
p.second(closed_->OrCancelled());
}
}
///////////////////////////////////////////////////////////////////////////////
// Error
void Error::Begin(uint64_t seq, BeginCallback ready) {
ready(Status::Cancelled());
}
void Error::Completed(uint64_t seq, const Status& status) { abort(); }
void Error::Close(const Status& status) {}
///////////////////////////////////////////////////////////////////////////////
// ParameterizedReceiveMode
ReceiveMode* ParameterizedReceiveMode::Storage::Init(
ReliabilityAndOrdering reliability_and_ordering) {
switch (reliability_and_ordering) {
case ReliabilityAndOrdering::ReliableOrdered:
return new (&reliable_ordered) ReliableOrdered();
case ReliabilityAndOrdering::ReliableUnordered:
return new (&reliable_unordered) ReliableUnordered();
case ReliabilityAndOrdering::UnreliableOrdered:
return new (&unreliable_ordered) UnreliableOrdered();
case ReliabilityAndOrdering::UnreliableUnordered:
return new (&unreliable_unordered) UnreliableUnordered();
case ReliabilityAndOrdering::TailReliable:
return new (&tail_reliable) TailReliable();
default:
return new (&error) Error();
}
}
} // namespace receive_mode
} // namespace overnet