| // Copyright 2023 Google Inc. All Rights Reserved. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| #include "async_loop.h" |
| |
| #include "util.h" |
| |
| #ifdef _WIN32 |
| #include "async_loop-win32.h" |
| #else |
| #include "async_loop-posix.h" |
| #endif |
| |
| // static |
| std::string AsyncErrorToString(AsyncError error) { |
| return std::string(strerror(error)); |
| } |
| |
| // Global instance. |
| static std::unique_ptr<AsyncLoop> s_loop; |
| |
| // static |
| AsyncLoop& AsyncLoop::Get() { |
| AsyncLoop* loop = s_loop.get(); |
| if (!loop) { |
| loop = new AsyncLoop(); |
| s_loop.reset(loop); |
| } |
| return *loop; |
| } |
| |
| // static |
| std::unique_ptr<AsyncLoop> AsyncLoop::CreateLocal() { |
| return std::unique_ptr<AsyncLoop>(new AsyncLoop()); |
| } |
| |
| void AsyncLoop::Reset() { |
| impl_.reset(new AsyncLoop::Impl()); |
| if (interrupt_catcher_count_ > 0) |
| impl_->EnableInterruptCatcher(); |
| } |
| |
| // static |
| void AsyncLoop::ResetForTesting() { |
| AsyncLoop* loop = s_loop.get(); |
| if (loop) |
| loop->Reset(); |
| } |
| |
| AsyncLoop::AsyncLoop() : impl_(new AsyncLoop::Impl()) {} |
| |
| AsyncLoop::~AsyncLoop() = default; |
| |
| // static |
| int64_t AsyncLoop::NowMs() { |
| static bool init = false; |
| static int64_t start_ms = 0; |
| int64_t result = GetTimeMillis(); |
| if (!init) { |
| start_ms = result; |
| init = true; |
| } |
| return result - start_ms; |
| } |
| |
| AsyncLoop::ExitStatus AsyncLoop::RunOnce(int64_t timeout_ms) { |
| return impl_->RunOnce(timeout_ms, *this); |
| } |
| |
| void AsyncLoop::ClearInterrupt() { |
| impl_->ClearInterrupt(); |
| } |
| |
| #ifndef _WIN32 |
| int AsyncLoop::GetInterruptSignal() const { |
| return impl_->GetInterruptSignal(); |
| } |
| |
| sigset_t AsyncLoop::GetOldSignalMask() const { |
| return impl_->GetOldSignalMask(); |
| } |
| #endif // !_WIN32 |
| |
| void AsyncLoop::UpdateHandle(AsyncHandle::State* state) { |
| impl_->UpdateHandle(state); |
| } |
| |
| void AsyncLoop::CancelHandle(AsyncHandle::State* state) { |
| impl_->CancelHandle(state); |
| } |
| |
| void AsyncLoop::AttachTimer(AsyncTimer::State* state) { |
| impl_->timers().AttachTimer(state); |
| } |
| |
| void AsyncLoop::DetachTimer(AsyncTimer::State* state) { |
| impl_->timers().DetachTimer(state); |
| } |
| |
| void AsyncLoop::UpdateTimer(AsyncTimer::State* state) { |
| impl_->timers().UpdateTimer(state); |
| } |
| |
| AsyncLoop::RunUntilState::RunUntilState(int64_t timeout_ms) |
| : timeout_ms_(timeout_ms) {} |
| |
| bool AsyncLoop::RunUntilState::LoopAgain(AsyncLoop& async_loop) { |
| // Initialize expiration_ms_ the first time this method is called. |
| int64_t expiration_ms = -1; |
| if (timeout_ms_ >= 0) |
| expiration_ms = async_loop.NowMs() + timeout_ms_; |
| |
| status_ = async_loop.RunOnce(timeout_ms_); |
| if (status_ != ExitSuccess) { |
| // Either an interrupt, timeout or idle exit. |
| // This is the end of the loop. |
| return false; |
| } |
| |
| if (timeout_ms_ < 0) { |
| // Since no timeout was specified, just loop again |
| // after an async event. |
| return true; |
| } |
| |
| // Adjust the timeout for the next invocation. |
| timeout_ms_ = expiration_ms - async_loop.NowMs(); |
| if (timeout_ms_ >= 0) { |
| // There is still time left, so loop again. |
| return true; |
| } |
| |
| // There is no time left, stop the loop reporting |
| // a real timeout. |
| timeout_ms_ = 0; |
| status_ = ExitTimeout; |
| return false; |
| } |
| |
| void AsyncLoop::ChangeInterruptCatcher(bool increment) { |
| if (increment) { |
| if (++interrupt_catcher_count_ == 1) |
| impl_->EnableInterruptCatcher(); |
| } else { |
| if (interrupt_catcher_count_ <= 0) |
| Fatal("Unbalanced ChangeInterruptCatcher() calls"); |
| if (--interrupt_catcher_count_ == 0) |
| impl_->DisableInterruptCatcher(); |
| } |
| } |
| |
| void AsyncLoop::AttachHandle(AsyncHandle::State* state) { |
| impl_->AttachHandle(state); |
| } |
| |
| void AsyncLoop::DetachHandle(AsyncHandle::State* state) { |
| impl_->DetachHandle(state); |
| } |
| |
| /////////////////////////////////////////////////////////////////////////// |
| /// |
| /// AsyncHandle |
| /// |
| |
| AsyncHandle::AsyncHandle() = default; |
| AsyncHandle::AsyncHandle(std::unique_ptr<AsyncHandle::State> state) |
| : state_(std::move(state)) {} |
| |
| // static |
| AsyncHandle AsyncHandle::Create(IpcHandle handle, AsyncLoop& async_loop, |
| AsyncHandle::Callback&& callback) { |
| auto state = std::unique_ptr<AsyncHandle::State>(new AsyncHandle::State( |
| std::move(handle), async_loop, std::move(callback))); |
| return AsyncHandle(std::move(state)); |
| } |
| |
| // static |
| AsyncHandle AsyncHandle::Create(IpcHandle::HandleType native_handle, |
| AsyncLoop& async_loop, |
| AsyncHandle::Callback&& callback) { |
| return Create(IpcHandle(native_handle), async_loop, std::move(callback)); |
| } |
| |
| // static |
| AsyncHandle AsyncHandle::CreateClone(IpcHandle::HandleType native_handle, |
| AsyncLoop& async_loop, |
| AsyncHandle::Callback&& callback) { |
| return Create(IpcHandle::CloneNativeHandle(native_handle), async_loop, |
| std::move(callback)); |
| } |
| |
| AsyncHandle::~AsyncHandle() = default; |
| |
| AsyncHandle::AsyncHandle(AsyncHandle&&) noexcept = default; |
| AsyncHandle& AsyncHandle::operator=(AsyncHandle&&) noexcept = default; |
| |
| void AsyncHandle::Close() { |
| if (is_valid()) { |
| state_->Cancel(); |
| state_.reset(); |
| } |
| } |
| |
| void AsyncHandle::ResetHandle(IpcHandle handle) { |
| assert(state_ && "Reset() on invalid AsyncHandle value"); |
| #ifndef _WIN32 |
| handle.SetNonBlocking(true); |
| #endif |
| state_->ResetHandle(std::move(handle)); |
| } |
| |
| IpcHandle::HandleType AsyncHandle::Release() { |
| return state_->ReleaseHandle(); |
| } |
| |
| bool AsyncHandle::is_valid() const { |
| return state_ && state_->is_valid(); |
| } |
| |
| IpcHandle::HandleType AsyncHandle::native_handle() const { |
| return state_ ? state_->native_handle() : IpcHandle::kInvalid; |
| } |
| |
| bool AsyncHandle::IsRunning() const { |
| return state_ && state_->IsRunning(); |
| } |
| |
| void AsyncHandle::Cancel() { |
| if (state_) |
| state_->Cancel(); |
| } |
| |
| AsyncHandle& AsyncHandle::ResetCallback(AsyncHandle::Callback&& callback) { |
| assert(state_ && "ResetCallback() on invalid AsyncHandle value"); |
| state_->ResetCallback(std::move(callback)); |
| return *this; |
| } |
| |
| void AsyncHandle::StartRead(void* buffer, size_t size) { |
| assert(state_ && "StartRead() on invalid AsyncHandle value"); |
| state_->StartRead(buffer, size); |
| } |
| |
| void AsyncHandle::StartWrite(const void* buffer, size_t size) { |
| assert(state_ && "StartWrite() on invalid AsyncHandle value"); |
| state_->StartWrite(buffer, size); |
| } |
| |
| void AsyncHandle::StartConnect() { |
| assert(state_ && "StartConnect() on invalid AsyncHandle value"); |
| state_->StartConnect(); |
| } |
| |
| void AsyncHandle::StartAccept() { |
| assert(state_ && "StartAccept() on invalid AsyncHandle value"); |
| state_->StartAccept(); |
| } |
| |
| IpcHandle AsyncHandle::TakeAcceptedHandle() { |
| assert(state_ && "TakeAcceptedHandle() on invalid AsyncHandle value"); |
| return state_->TakeAcceptedHandle(); |
| } |
| |
| AsyncLoop& AsyncHandle::async_loop() const { |
| assert(state_ && "async_loop() on invalid AsyncHandle value"); |
| return state_->async_loop(); |
| } |
| |
| /////////////////////////////////////////////////////////////////////////// |
| /// |
| /// AsyncTimer |
| /// |
| |
| AsyncTimer::AsyncTimer() = default; |
| |
| AsyncTimer::AsyncTimer(AsyncLoop& async_loop, AsyncTimer::Callback&& callback) |
| : state_(new State(async_loop, std::move(callback))) {} |
| |
| AsyncTimer::~AsyncTimer() = default; |
| |
| AsyncTimer::AsyncTimer(AsyncTimer&&) noexcept = default; |
| AsyncTimer& AsyncTimer::operator=(AsyncTimer&&) noexcept = default; |
| |
| void AsyncTimer::ResetCallback(AsyncTimer::Callback&& callback) { |
| assert(state_ && "Calling AsyncTimer::ResetCallback() on invalid instance!"); |
| state_->ResetCallback(std::move(callback)); |
| } |
| void AsyncTimer::SetExpirationMs(int64_t expiration_ms) { |
| assert(state_ && |
| "Calling AsyncTimer::SetExpirationMs() on invalid instance!"); |
| state_->SetExpirationMs(expiration_ms); |
| } |
| |
| void AsyncTimer::SetDurationMs(int64_t duration_ms) { |
| assert(state_ && "Calling AsyncTimer::SetDurationMs() on invalid instance!"); |
| state_->SetDurationMs(duration_ms); |
| } |
| |
| void AsyncTimer::Cancel() { |
| assert(state_ && "Calling AsyncTimer::Cancel() on invalid instance!"); |
| state_->Cancel(); |
| } |
| |
| void AsyncTimer::Close() { |
| state_.reset(); |
| } |
| |
| AsyncLoop& AsyncTimer::async_loop() const { |
| assert(state_ && "Calling AsyncTimer::async_loop() on invalid instance!"); |
| return state_->async_loop(); |
| } |
| |
| // static |
| AsyncTimer AsyncTimer::CreateWithExpiration(int64_t expiration_ms, |
| AsyncLoop& async_loop, |
| AsyncTimer::Callback&& callback) { |
| AsyncTimer timer(async_loop, std::move(callback)); |
| timer.SetExpirationMs(expiration_ms); |
| return timer; |
| } |
| |
| // static |
| AsyncTimer AsyncTimer::CreateWithDuration(int64_t duration_ms, |
| AsyncLoop& async_loop, |
| AsyncTimer::Callback&& callback) { |
| AsyncTimer timer(async_loop, std::move(callback)); |
| timer.SetDurationMs(duration_ms); |
| return timer; |
| } |