blob: 4ecfc5a352e1d05f5c9721d05837dcb30fca652c [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/developer/debug/shared/message_loop.h"
#include <lib/syslog/cpp/macros.h>
#include <algorithm>
namespace debug_ipc {
namespace {
thread_local MessageLoop* current_message_loop = nullptr;
} // namespace
fit::executor* MessageLoopContext::executor() const { return message_loop_; }
fit::suspended_task MessageLoopContext::suspend_task() {
return message_loop_->SuspendCurrentTask();
}
MessageLoop::MessageLoop() : context_(this) {}
MessageLoop::~MessageLoop() {
FX_DCHECK(Current() != this); // Cleanup() should have been called.
}
bool MessageLoop::Init(std::string* error_message) {
FX_DCHECK(!current_message_loop);
current_message_loop = this;
return true;
}
void MessageLoop::Cleanup() {
FX_DCHECK(current_message_loop == this);
current_message_loop = nullptr;
}
// static
MessageLoop* MessageLoop::Current() { return current_message_loop; }
void MessageLoop::Run() {
should_quit_ = false;
RunImpl();
}
void MessageLoop::RunUntilNoTasks() {
// Check if there are no tasks right now. If so, we exit immediately.
{
std::lock_guard<std::mutex> lock(mutex_);
if (task_queue_.empty())
return;
}
should_quit_on_no_more_tasks_ = true;
Run();
}
void MessageLoop::PostTask(FileLineFunction file_line, fit::function<void()> fn) {
PostTaskInternal(std::move(file_line), std::move(fn));
}
void MessageLoop::PostTask(FileLineFunction file_line, fit::pending_task task) {
PostTaskInternal(std::move(file_line), std::move(task));
}
void MessageLoop::RunTask(FileLineFunction file_line, fit::pending_task pending_task) {
FX_DCHECK(pending_task);
Task task(std::move(file_line), std::move(pending_task));
RunOneTask(task);
}
void MessageLoop::PostTimer(FileLineFunction file_line, uint64_t delta_ms,
fit::function<void()> fn) {
constexpr uint64_t kMsToNs = 1000000;
bool needs_awaken;
uint64_t expiry = delta_ms * kMsToNs + GetMonotonicNowNS();
{
std::lock_guard<std::mutex> guard(mutex_);
needs_awaken = task_queue_.empty() && NextExpiryNS() > expiry;
timers_.push_back({{std::move(file_line), std::move(fn)}, expiry});
std::push_heap(timers_.begin(), timers_.end(), &CompareTimers);
}
if (needs_awaken)
SetHasTasks();
}
void MessageLoop::schedule_task(fit::pending_task task) { PostTask(FROM_HERE, std::move(task)); }
fit::suspended_task::ticket MessageLoop::duplicate_ticket(fit::suspended_task::ticket ticket) {
std::lock_guard<std::mutex> guard(mutex_);
auto found = tickets_.find(ticket);
FX_DCHECK(found != tickets_.end());
FX_DCHECK(found->second.ref_count > 0);
found->second.ref_count++;
return ticket;
}
void MessageLoop::resolve_ticket(fit::suspended_task::ticket ticket, bool resume_task) {
// Implementation note: The fit single_thread_executor has the behavior that resolving the ticket
// moves the promise to the run queue, and then it's executed in order from there.
//
// However, this has the side effect of reordering the promise execution with respect to
// non-promise-related tasks that are also executing on the message loop.
//
// As an example, consider attaching to a process which involves resolving a promise in the attach
// reply message handler. There are non-promise-related messages in the message loop such as push
// notifications about thread events from the remote debug agent. Requiring the resolution of the
// promise be pushed to the back of the message queue will make it run after the processing of the
// new thread messages and the replies would be executed in an order that doesn't make any sense.
//
// As a result, we run resolved promises synchronously when they're resolved.
//
// This has the disadvantage of potentially generating very deep stacks and one can construct
// reentrant situations where this behavior might be surprising. But given the amount of
// non-promise-related tasks our message loop currently runs and how most promises are only
// resolved in response to IPC messages, the alternative is more surprising. If everything was a
// promise, we could post it to the back of the task_queue_ with no problem (other than a slight
// performance penalty by going through the loop again).
Task task; // The task (to run or delete outside of the lock).
bool should_run = false; // Whether to run the above task (otherwise just delete it).
{
std::lock_guard<std::mutex> guard(mutex_);
FX_DCHECK(ticket != current_task_ticket_) << "Trying to resolve a task from within itself.";
auto found = tickets_.find(ticket);
FX_DCHECK(found != tickets_.end()) << "Bad ticket.";
found->second.ref_count--;
if (resume_task && !found->second.was_resumed) {
// Task should be run (if was_resumed was already set, it was already moved to the run queue
// so we don't have to do it again).
should_run = true;
// Mark as run. If the refcount isn't 0 yet this struct will still be around and we don't want
// to run it again.
found->second.was_resumed = true;
task = Task(found->second.file_line, std::move(found->second.task));
}
if (found->second.ref_count == 0) {
// Tickets are all closed. It it was resumed, the task will now be run queue, and if it wasn't
// the task will be dropped with this operation.
// Task could have already been moved out above.
if (found->second.task) {
// Free task outside lock, keep should_run false to avoid running.
task = Task(FileLineFunction(), std::move(found->second.task));
}
tickets_.erase(found);
}
}
if (should_run)
RunOneTask(task);
}
uint64_t MessageLoop::DelayNS() const {
// NextExpiry will return kMaxDelay if there are no timers queued.
uint64_t expiry = NextExpiryNS();
if (expiry == kMaxDelay) {
return kMaxDelay;
}
// We check how much more time we need to wait.
uint64_t now = GetMonotonicNowNS();
if (expiry > now) {
return expiry - now;
}
return 0;
}
uint64_t MessageLoop::NextExpiryNS() const {
if (timers_.empty()) {
return kMaxDelay;
}
return timers_[0].expiry;
}
fit::suspended_task MessageLoop::SuspendCurrentTask() {
std::lock_guard<std::mutex> guard(mutex_);
FX_DCHECK(current_task_is_promise_) << "Can only suspend when running a promise.";
if (!current_task_ticket_) {
// The current task has no ticket, make a new one.
current_task_ticket_ = next_ticket_;
next_ticket_++;
tickets_.emplace(current_task_ticket_, TicketRecord());
} else {
duplicate_ticket(current_task_ticket_);
}
return fit::suspended_task(this, current_task_ticket_);
}
template <typename TaskType>
void MessageLoop::PostTaskInternal(FileLineFunction file_line, TaskType task) {
bool needs_awaken;
{
std::lock_guard<std::mutex> guard(mutex_);
needs_awaken = task_queue_.empty();
task_queue_.emplace_back(std::move(file_line), std::move(task));
}
if (needs_awaken)
SetHasTasks();
}
void MessageLoop::RunOneTask(Task& task) {
if (task.task_fn) {
task.task_fn();
} else if (task.pending) {
// Run the fit::pending_task (generated by promises).
// This code may be run nested via RunTask() so keep the old current task state so we can
// restore it.
bool old_task_is_promise = current_task_is_promise_;
fit::suspended_task::ticket old_current_ticket = current_task_ticket_;
current_task_is_promise_ = true;
current_task_ticket_ = 0;
bool finished = task.pending(context_);
FX_DCHECK(!task.pending == finished) << "Finished state should be consistent.";
(void)finished;
if (current_task_ticket_) {
// Task was suspended and a ticket was generated.
//
// This function locks again which is unfortunate. We could save this state and execute
// this work after the mutex is locked again at the bottom of this loop, but that
// complicates the execution flow.
SaveTaskToTicket(current_task_ticket_, std::move(task.file_line), std::move(task.pending));
}
current_task_ticket_ = old_current_ticket;
current_task_is_promise_ = old_task_is_promise;
} else {
FX_NOTREACHED();
}
}
void MessageLoop::SaveTaskToTicket(fit::suspended_task::ticket ticket, FileLineFunction file_line,
fit::pending_task task) {
FX_DCHECK(task) << "The task should not be finished if we're saving it.";
bool needs_awaken = false;
std::lock_guard<std::mutex> guard(mutex_);
{
auto found = tickets_.find(ticket);
FX_DCHECK(found != tickets_.end()) << "Ticket was invalid.";
if (found->second.was_resumed) {
// The ticket was suspended and then resumed from within the same run of the promise. It is
// moved immediately to the runnable queue.
needs_awaken = task_queue_.empty();
task_queue_.emplace_back(std::move(file_line), std::move(task));
} else if (found->second.ref_count != 0) {
// Suspend tickets still out, keep suspended until marked resumed.
found->second.task = std::move(task);
}
if (found->second.ref_count == 0) {
// No refcount, can drop the ticket. The task could either be marked runnable and currently
// scheduled to be run, or it could be dropped.
tickets_.erase(found);
}
}
if (needs_awaken)
SetHasTasks();
}
void MessageLoop::QuitNow() { should_quit_ = true; }
bool MessageLoop::ProcessPendingTask() {
// This function will be called with the mutex held.
if (task_queue_.empty() && DelayNS() > 0) {
if (should_quit_on_no_more_tasks_) {
should_quit_on_no_more_tasks_ = false;
QuitNow();
}
return false;
}
Task task;
if (!task_queue_.empty()) {
task = std::move(task_queue_.front());
task_queue_.pop_front();
} else {
std::pop_heap(timers_.begin(), timers_.end(), &CompareTimers);
task = std::move(timers_.back().task);
timers_.pop_back();
}
mutex_.unlock();
RunOneTask(task);
mutex_.lock();
return true;
}
MessageLoop::WatchHandle::WatchHandle() = default;
MessageLoop::WatchHandle::WatchHandle(MessageLoop* msg_loop, int id)
: msg_loop_(msg_loop), id_(id) {}
MessageLoop::WatchHandle::WatchHandle(WatchHandle&& other)
: msg_loop_(other.msg_loop_), id_(other.id_) {
other.msg_loop_ = nullptr;
other.id_ = 0;
}
MessageLoop::WatchHandle::~WatchHandle() { StopWatching(); }
void MessageLoop::WatchHandle::StopWatching() {
if (watching())
msg_loop_->StopWatching(id_);
msg_loop_ = nullptr;
id_ = 0;
}
MessageLoop::WatchHandle& MessageLoop::WatchHandle::operator=(WatchHandle&& other) {
// Should never get into a self-assignment situation since this is not
// copyable and every ID should be unique. Do allow self-assignment of
// null ones though.
FX_DCHECK(!watching() || (msg_loop_ != other.msg_loop_ || id_ != other.id_));
if (watching())
msg_loop_->StopWatching(id_);
msg_loop_ = other.msg_loop_;
id_ = other.id_;
other.msg_loop_ = nullptr;
other.id_ = 0;
return *this;
}
} // namespace debug_ipc