blob: 978a972507a3ee203c9ba4a7acefc5993931182e [file] [log] [blame]
// Copyright 2023 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "async_loop.h"
#include "util.h"
#ifdef _WIN32
#include "async_loop-win32.h"
#else
#include "async_loop-posix.h"
#endif
// static
std::string AsyncErrorToString(AsyncError error) {
return std::string(strerror(error));
}
// Global instance.
static std::unique_ptr<AsyncLoop> s_loop;
// static
AsyncLoop& AsyncLoop::Get() {
AsyncLoop* loop = s_loop.get();
if (!loop) {
loop = new AsyncLoop();
s_loop.reset(loop);
}
return *loop;
}
// static
std::unique_ptr<AsyncLoop> AsyncLoop::CreateLocal() {
return std::unique_ptr<AsyncLoop>(new AsyncLoop());
}
void AsyncLoop::Reset() {
impl_.reset(new AsyncLoop::Impl());
if (interrupt_catcher_count_ > 0)
impl_->EnableInterruptCatcher();
}
// static
void AsyncLoop::ResetForTesting() {
AsyncLoop* loop = s_loop.get();
if (loop)
loop->Reset();
}
AsyncLoop::AsyncLoop() : impl_(new AsyncLoop::Impl()) {}
AsyncLoop::~AsyncLoop() = default;
// static
int64_t AsyncLoop::NowMs() {
static bool init = false;
static int64_t start_ms = 0;
int64_t result = GetTimeMillis();
if (!init) {
start_ms = result;
init = true;
}
return result - start_ms;
}
AsyncLoop::ExitStatus AsyncLoop::RunOnce(int64_t timeout_ms) {
return impl_->RunOnce(timeout_ms, *this);
}
void AsyncLoop::ClearInterrupt() {
impl_->ClearInterrupt();
}
#ifndef _WIN32
int AsyncLoop::GetInterruptSignal() const {
return impl_->GetInterruptSignal();
}
sigset_t AsyncLoop::GetOldSignalMask() const {
return impl_->GetOldSignalMask();
}
#endif // !_WIN32
void AsyncLoop::UpdateHandle(AsyncHandle::State* state) {
impl_->UpdateHandle(state);
}
void AsyncLoop::CancelHandle(AsyncHandle::State* state) {
impl_->CancelHandle(state);
}
void AsyncLoop::AttachTimer(AsyncTimer::State* state) {
impl_->timers().AttachTimer(state);
}
void AsyncLoop::DetachTimer(AsyncTimer::State* state) {
impl_->timers().DetachTimer(state);
}
void AsyncLoop::UpdateTimer(AsyncTimer::State* state) {
impl_->timers().UpdateTimer(state);
}
AsyncLoop::RunUntilState::RunUntilState(int64_t timeout_ms)
: timeout_ms_(timeout_ms) {}
bool AsyncLoop::RunUntilState::LoopAgain(AsyncLoop& async_loop) {
// Initialize expiration_ms_ the first time this method is called.
int64_t expiration_ms = -1;
if (timeout_ms_ >= 0)
expiration_ms = async_loop.NowMs() + timeout_ms_;
status_ = async_loop.RunOnce(timeout_ms_);
if (status_ != ExitSuccess) {
// Either an interrupt, timeout or idle exit.
// This is the end of the loop.
return false;
}
if (timeout_ms_ < 0) {
// Since no timeout was specified, just loop again
// after an async event.
return true;
}
// Adjust the timeout for the next invocation.
timeout_ms_ = expiration_ms - async_loop.NowMs();
if (timeout_ms_ >= 0) {
// There is still time left, so loop again.
return true;
}
// There is no time left, stop the loop reporting
// a real timeout.
timeout_ms_ = 0;
status_ = ExitTimeout;
return false;
}
void AsyncLoop::ChangeInterruptCatcher(bool increment) {
if (increment) {
if (++interrupt_catcher_count_ == 1)
impl_->EnableInterruptCatcher();
} else {
if (interrupt_catcher_count_ <= 0)
Fatal("Unbalanced ChangeInterruptCatcher() calls");
if (--interrupt_catcher_count_ == 0)
impl_->DisableInterruptCatcher();
}
}
void AsyncLoop::AttachHandle(AsyncHandle::State* state) {
impl_->AttachHandle(state);
}
void AsyncLoop::DetachHandle(AsyncHandle::State* state) {
impl_->DetachHandle(state);
}
///////////////////////////////////////////////////////////////////////////
///
/// AsyncHandle
///
AsyncHandle::AsyncHandle() = default;
AsyncHandle::AsyncHandle(std::unique_ptr<AsyncHandle::State> state)
: state_(std::move(state)) {}
// static
AsyncHandle AsyncHandle::Create(IpcHandle handle, AsyncLoop& async_loop,
AsyncHandle::Callback&& callback) {
auto state = std::unique_ptr<AsyncHandle::State>(new AsyncHandle::State(
std::move(handle), async_loop, std::move(callback)));
return AsyncHandle(std::move(state));
}
// static
AsyncHandle AsyncHandle::Create(IpcHandle::HandleType native_handle,
AsyncLoop& async_loop,
AsyncHandle::Callback&& callback) {
return Create(IpcHandle(native_handle), async_loop, std::move(callback));
}
// static
AsyncHandle AsyncHandle::CreateClone(IpcHandle::HandleType native_handle,
AsyncLoop& async_loop,
AsyncHandle::Callback&& callback) {
return Create(IpcHandle::CloneNativeHandle(native_handle), async_loop,
std::move(callback));
}
AsyncHandle::~AsyncHandle() = default;
AsyncHandle::AsyncHandle(AsyncHandle&&) noexcept = default;
AsyncHandle& AsyncHandle::operator=(AsyncHandle&&) noexcept = default;
void AsyncHandle::Close() {
if (is_valid()) {
state_->Cancel();
state_.reset();
}
}
void AsyncHandle::ResetHandle(IpcHandle handle) {
assert(state_ && "Reset() on invalid AsyncHandle value");
#ifndef _WIN32
handle.SetNonBlocking(true);
#endif
state_->ResetHandle(std::move(handle));
}
IpcHandle::HandleType AsyncHandle::Release() {
return state_->ReleaseHandle();
}
bool AsyncHandle::is_valid() const {
return state_ && state_->is_valid();
}
IpcHandle::HandleType AsyncHandle::native_handle() const {
return state_ ? state_->native_handle() : IpcHandle::kInvalid;
}
bool AsyncHandle::IsRunning() const {
return state_ && state_->IsRunning();
}
void AsyncHandle::Cancel() {
if (state_)
state_->Cancel();
}
AsyncHandle& AsyncHandle::ResetCallback(AsyncHandle::Callback&& callback) {
assert(state_ && "ResetCallback() on invalid AsyncHandle value");
state_->ResetCallback(std::move(callback));
return *this;
}
void AsyncHandle::StartRead(void* buffer, size_t size) {
assert(state_ && "StartRead() on invalid AsyncHandle value");
state_->StartRead(buffer, size);
}
void AsyncHandle::StartWrite(const void* buffer, size_t size) {
assert(state_ && "StartWrite() on invalid AsyncHandle value");
state_->StartWrite(buffer, size);
}
void AsyncHandle::StartConnect() {
assert(state_ && "StartConnect() on invalid AsyncHandle value");
state_->StartConnect();
}
void AsyncHandle::StartAccept() {
assert(state_ && "StartAccept() on invalid AsyncHandle value");
state_->StartAccept();
}
IpcHandle AsyncHandle::TakeAcceptedHandle() {
assert(state_ && "TakeAcceptedHandle() on invalid AsyncHandle value");
return state_->TakeAcceptedHandle();
}
AsyncLoop& AsyncHandle::async_loop() const {
assert(state_ && "async_loop() on invalid AsyncHandle value");
return state_->async_loop();
}
///////////////////////////////////////////////////////////////////////////
///
/// AsyncTimer
///
AsyncTimer::AsyncTimer() = default;
AsyncTimer::AsyncTimer(AsyncLoop& async_loop, AsyncTimer::Callback&& callback)
: state_(new State(async_loop, std::move(callback))) {}
AsyncTimer::~AsyncTimer() = default;
AsyncTimer::AsyncTimer(AsyncTimer&&) noexcept = default;
AsyncTimer& AsyncTimer::operator=(AsyncTimer&&) noexcept = default;
void AsyncTimer::ResetCallback(AsyncTimer::Callback&& callback) {
assert(state_ && "Calling AsyncTimer::ResetCallback() on invalid instance!");
state_->ResetCallback(std::move(callback));
}
void AsyncTimer::SetExpirationMs(int64_t expiration_ms) {
assert(state_ &&
"Calling AsyncTimer::SetExpirationMs() on invalid instance!");
state_->SetExpirationMs(expiration_ms);
}
void AsyncTimer::SetDurationMs(int64_t duration_ms) {
assert(state_ && "Calling AsyncTimer::SetDurationMs() on invalid instance!");
state_->SetDurationMs(duration_ms);
}
void AsyncTimer::Cancel() {
assert(state_ && "Calling AsyncTimer::Cancel() on invalid instance!");
state_->Cancel();
}
void AsyncTimer::Close() {
state_.reset();
}
AsyncLoop& AsyncTimer::async_loop() const {
assert(state_ && "Calling AsyncTimer::async_loop() on invalid instance!");
return state_->async_loop();
}
// static
AsyncTimer AsyncTimer::CreateWithExpiration(int64_t expiration_ms,
AsyncLoop& async_loop,
AsyncTimer::Callback&& callback) {
AsyncTimer timer(async_loop, std::move(callback));
timer.SetExpirationMs(expiration_ms);
return timer;
}
// static
AsyncTimer AsyncTimer::CreateWithDuration(int64_t duration_ms,
AsyncLoop& async_loop,
AsyncTimer::Callback&& callback) {
AsyncTimer timer(async_loop, std::move(callback));
timer.SetDurationMs(duration_ms);
return timer;
}