blob: 78bca7edabeab189de8132d15b96fc7ba56ef0f1 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#pragma once
#include <vector>
#include "callback.h"
#include "optional.h"
#include "status.h"
namespace overnet {
// A typed data sink for some kind of object
template <class T>
class Sink {
public:
virtual ~Sink() {}
virtual void Close(const Status& status, Callback<void> quiesced) = 0;
virtual void Push(T item) = 0;
void PushMany(T* items, size_t count);
};
template <class T>
class Source {
public:
virtual ~Source() {}
virtual void Close(const Status& status) = 0;
virtual void Pull(StatusOrCallback<Optional<T>> ready) = 0;
// Default implementation of a draining-pull.
// Derived types are encouraged but not required to replace this.
// Implies a Close() call.
virtual void PullAll(StatusOrCallback<std::vector<T>> ready);
};
template <class T>
void Sink<T>::PushMany(T* items, size_t count) {
for (size_t i = 0; i < count; i++) {
Push(items[i]);
}
}
template <class T>
void Source<T>::PullAll(StatusOrCallback<std::vector<T>> ready) {
class Puller {
public:
Puller(Source* source, StatusOrCallback<std::vector<T>> ready)
: source_(source), ready_(std::move(ready)) {
Next();
}
private:
void Next() {
source_->Pull(
StatusOrCallback<Optional<T>>([this](StatusOr<Optional<T>>&& status) {
if (status.is_error() || !status->has_value()) {
source_->Close(status.AsStatus());
ready_(std::move(so_far_));
delete this;
return;
}
so_far_.emplace_back(std::move(**status.get()));
Next();
}));
}
Source* const source_;
std::vector<T> so_far_;
StatusOrCallback<std::vector<T>> ready_;
};
new Puller(this, std::move(ready));
}
} // namespace overnet