blob: a7594fb26a2a3df5589323919d120eab4b57fc8f [file]
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_SRC_CORE_LIB_PROMISE_INTER_ACTIVITY_PIPE_H
#define GRPC_SRC_CORE_LIB_PROMISE_INTER_ACTIVITY_PIPE_H
#include <grpc/support/port_platform.h>
#include <stdint.h>
#include <array>
#include <utility>
#include "absl/base/thread_annotations.h"
#include "absl/types/optional.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/poll.h"
namespace grpc_core {
template <typename T, uint8_t kQueueSize>
class InterActivityPipe {
private:
class Center : public RefCounted<Center, NonPolymorphicRefCount> {
public:
Poll<bool> Push(T& value) {
ReleasableMutexLock lock(&mu_);
if (closed_) return false;
if (count_ == kQueueSize) {
on_available_ = Activity::current()->MakeNonOwningWaker();
return Pending{};
}
queue_[(first_ + count_) % kQueueSize] = std::move(value);
++count_;
if (count_ == 1) {
auto on_occupied = std::move(on_occupied_);
lock.Release();
on_occupied.Wakeup();
}
return true;
}
Poll<absl::optional<T>> Next() {
ReleasableMutexLock lock(&mu_);
if (count_ == 0) {
if (closed_) return absl::nullopt;
on_occupied_ = Activity::current()->MakeNonOwningWaker();
return Pending{};
}
auto value = std::move(queue_[first_]);
first_ = (first_ + 1) % kQueueSize;
--count_;
if (count_ == kQueueSize - 1) {
auto on_available = std::move(on_available_);
lock.Release();
on_available.Wakeup();
}
return std::move(value);
}
void MarkClosed() {
ReleasableMutexLock lock(&mu_);
if (std::exchange(closed_, true)) return;
auto on_occupied = std::move(on_occupied_);
auto on_available = std::move(on_available_);
lock.Release();
on_occupied.Wakeup();
on_available.Wakeup();
}
bool IsClosed() {
MutexLock lock(&mu_);
return closed_;
}
private:
Mutex mu_;
std::array<T, kQueueSize> queue_ ABSL_GUARDED_BY(mu_);
bool closed_ ABSL_GUARDED_BY(mu_) = false;
uint8_t first_ ABSL_GUARDED_BY(mu_) = 0;
uint8_t count_ ABSL_GUARDED_BY(mu_) = 0;
Waker on_occupied_ ABSL_GUARDED_BY(mu_);
Waker on_available_ ABSL_GUARDED_BY(mu_);
};
RefCountedPtr<Center> center_{MakeRefCounted<Center>()};
public:
class Sender {
public:
explicit Sender(RefCountedPtr<Center> center)
: center_(std::move(center)) {}
Sender(const Sender&) = delete;
Sender& operator=(const Sender&) = delete;
Sender(Sender&&) noexcept = default;
Sender& operator=(Sender&&) noexcept = default;
~Sender() {
if (center_ != nullptr) center_->MarkClosed();
}
bool IsClose() { return center_->IsClosed(); }
void MarkClose() {
if (center_ != nullptr) center_->MarkClosed();
}
auto Push(T value) {
return [center = center_, value = std::move(value)]() mutable {
return center->Push(value);
};
}
private:
RefCountedPtr<Center> center_;
};
class Receiver {
public:
explicit Receiver(RefCountedPtr<Center> center)
: center_(std::move(center)) {}
Receiver(const Receiver&) = delete;
Receiver& operator=(const Receiver&) = delete;
Receiver(Receiver&&) noexcept = default;
Receiver& operator=(Receiver&&) noexcept = default;
~Receiver() {
if (center_ != nullptr) center_->MarkClosed();
}
auto Next() {
return [center = center_]() { return center->Next(); };
}
private:
RefCountedPtr<Center> center_;
};
Sender sender{center_};
Receiver receiver{center_};
};
} // namespace grpc_core
#endif // GRPC_SRC_CORE_LIB_PROMISE_INTER_ACTIVITY_PIPE_H