blob: 457999c789a3a893093855f2c7271f71dc9844e0 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef SRC_DEVELOPER_DEBUG_SHARED_MESSAGE_LOOP_H_
#define SRC_DEVELOPER_DEBUG_SHARED_MESSAGE_LOOP_H_
#include <lib/fpromise/promise.h>
#include <deque>
#include <functional>
#include <limits>
#include <map>
#include <mutex>
#include <string>
#include <vector>
#include <fbl/unique_fd.h>
#include "src/developer/debug/shared/logging/file_line_function.h"
#include "src/lib/fxl/macros.h"
#if defined(__Fuchsia__)
#include <zircon/compiler.h>
#else
// The macros for thread annotations aren't set up for non-Fuchsia builds.
#undef __TA_REQUIRES
#define __TA_REQUIRES(arg)
#endif
namespace debug {
class MessageLoop;
// Context implementation for fpromise::promise integration.
class MessageLoopContext : public fpromise::context {
public:
// Pointer must outlive this class.
explicit MessageLoopContext(MessageLoop* loop) : message_loop_(loop) {}
// fpromise::context implementation.
fpromise::executor* executor() const override;
fpromise::suspended_task suspend_task() override;
private:
MessageLoop* message_loop_;
};
// Message loop implementation.
//
// Unlike the one in FXL, this will run on the host in addition to a Zircon target. On Zircon it
// is backed by the async-loop library, on the host (Linux & Mac) it is backed by a poll()
// implementation (see message_loop_poll.h).
//
// This message loop supports several types of tasks:
// - Bare lambdas.
// - Delayed lambdas (timers).
// - fpromise::pending_task objects (normally generated by fpromise::promise).
// - Async I/O events on file handles.
//
// The Fuchsia-specific subclass can also watch for Zircon async events (see message_loop_fuchsia.h
// for more).
class MessageLoop : public fpromise::executor, public fpromise::suspended_task::resolver {
public:
enum class WatchMode {
kRead,
kWrite,
kReadWrite,
};
class WatchHandle;
// There can be only one active MessageLoop in scope per thread at a time.
//
// A message loop is active between Init() and Cleanup(). During this period, Current() will
// return the message loop.
//
// Init() / Cleanup() is a separate phase so a message loop can be created and managed on one
// thread and sent to another thread to actually run (to help with cross-thread task posting).
MessageLoop();
~MessageLoop() override;
// Init() and Cleanup() must be called on the same thread as Run().
//
// Init() returns true on success. On false the error message will be put into the output param.
virtual bool Init(std::string* error_message);
virtual void Cleanup();
// Exits the message loop immediately, not running pending functions. This must be called only on
// the MessageLoop thread.
virtual void QuitNow();
// Returns the current message loop or null if there isn't one.
static MessageLoop* Current();
// Runs the message loop.
void Run();
// Run until no more tasks are posted. This is not really meant for normal functioning of the
// debugger. Rather this is geared towards test environments that control what gets inserted into
// the message loop. This Useful for tests in which tasks post additional tasks.
//
// NOTE: OS events (file handles, sockets, signals) are not considered as non-idle tasks.
// Basically they're ignored when checking for "idleness".
void RunUntilNoTasks();
// Posts the given work to the message loop. It will be added to the end of the work queue.
void PostTask(FileLineFunction file_line, fit::function<void()> fn);
void PostTask(FileLineFunction file_line, fpromise::pending_task task);
// Runs the given task immediately. If it reports a pending completion it will complete
// asynchronously, otherwise it will complete synchronously. This can be used to start executing
// a promise without putting it at the back of the message loop.
//
// If the task complete asynchronously, it will be added to the queue when it signals a pending
// completion.
void RunTask(FileLineFunction file_line, fpromise::pending_task task);
// Set a task to run after a certain number of milliseconds have elapsed. Granularity is hard to
// guarantee but the timer shouldn't fire earlier than expected.
void PostTimer(FileLineFunction file_line, uint64_t delta_ms, fit::function<void()> fn);
// Starts watching the given file descriptor in the given mode. Returns a WatchHandle that scopes
// the watch operation (when the handle is destroyed the watcher is unregistered).
//
// This function must only be called on the message loop thread.
//
// The watcher object will be deleted upon the destruction of the returned WatchHandle. If this
// happens in the watcher function, care needs to be taken to avoid using any captured variables
// after they are deleted.
//
// You can only watch a handle once. Note that stdin/stdout/stderr can be the same underlying OS
// handle, so the caller can only watch one of them.
using FDWatcher = fit::function<void(int fd, bool read, bool write, bool err)>;
virtual WatchHandle WatchFD(WatchMode mode, int fd, FDWatcher watcher) = 0;
// fpromise::executor implementation.
void schedule_task(fpromise::pending_task task) override;
// fpromise::resolver implementation.
fpromise::suspended_task::ticket duplicate_ticket(
fpromise::suspended_task::ticket ticket) override;
void resolve_ticket(fpromise::suspended_task::ticket ticket, bool resume_task) override;
protected:
static constexpr uint64_t kMaxDelay = std::numeric_limits<uint64_t>::max();
virtual void RunImpl() = 0;
// Get the value of a monotonic clock in nanoseconds.
virtual uint64_t GetMonotonicNowNS() const = 0;
// Used by WatchHandle to unregister a watch. Can be called from any thread without the lock held.
virtual void StopWatching(int id) = 0;
// Indicates there are tasks to process. Can be called from any thread and will be called without
// the lock held.
virtual void SetHasTasks() = 0;
// Processes one pending task, returning true if there was work to do, or false if there was
// nothing. The mutex_ must be held during the call. It will be unlocked during task processing,
// so the platform implementation that calls it must not assume state did not change across the
// call.
bool ProcessPendingTask() __TA_REQUIRES(mutex_);
// The platform implementation should check should_quit() after every task execution and exit if
// true.
bool should_quit() const { return should_quit_; }
// How much time we should wait before waking up again to process timers.
uint64_t DelayNS() const;
// Style guide says this should be private and we should have a protected getter, but that makes
// the thread annotations much more complicated.
std::mutex mutex_;
private:
friend MessageLoopContext;
friend WatchHandle;
// A task is either a bare function or a fpromise::pending function. This is one entry in the
// task_queue_ of pending runnable tasks.
struct Task {
Task() = default;
Task(FileLineFunction fl, fit::function<void()> fn)
: file_line(std::move(fl)), task_fn(std::move(fn)) {}
Task(FileLineFunction fl, fpromise::pending_task pend)
: file_line(std::move(fl)), pending(std::move(pend)) {}
FileLineFunction file_line;
// Only one of these two members will be non-null.
fit::function<void()> task_fn;
fpromise::pending_task pending;
};
// The data associated with a "ticket". A ticket is the handle behind a fpromise::suspended_task
// which is used to track fpromise::pending_task objects that have completed asynchronously and to
// signal that they should be run again.
struct TicketRecord {
// A ticket is reference counted, with the references being managed by the
// fpromise::suspended_task objects. When this reference count gets to 0, the ticket is deleted.
uint32_t ref_count = 1;
// Set when the task is resumed. This means it will be moved to the task_queue_ and the task
// object will be null on this struct. The ticket can exist in this state if there are other
// fpromise::suspended_task objects that hold a ticket for it, but calling resume() from those
// will be a no-op.
bool was_resumed = false;
// Source of the original post to the message loop.
FileLineFunction file_line;
// The actual task. This will be null if the task currently lives on the pending task_queue_.
// See was_resumed above.
fpromise::pending_task task;
};
using TicketMap = std::map<fpromise::suspended_task::ticket, TicketRecord>;
// Currently runnable tasks.
std::deque<Task> task_queue_;
struct Timer {
Task task;
// Expiration time in nanoseconds. The time is absolute and compares to GetMonotonicNowNS.
uint64_t expiry;
};
std::vector<Timer> timers_;
static bool CompareTimers(const Timer& a, const Timer& b) { return a.expiry >= b.expiry; }
// Expiration time of the timer which will expire soonest. Returns an upper bound if there are no
// timers set.
uint64_t NextExpiryNS() const;
// Backend for the public PostTask variants above that can handle any task type.
template <typename TaskType>
void PostTaskInternal(FileLineFunction file_line, TaskType task);
// Runs the given task, executing either the lambda or the fpromise::pending_task.
// The lock must not be held.
void RunOneTask(Task& task);
// Backing implementation for the context which gets a suspended_task ticket for the current
// task.
fpromise::suspended_task SuspendCurrentTask();
// Called when a task has reported an async completion. This will save it back to the ticket if
// one was provided, or it will be deleted if nobody to save it back to the ticket. The lock
// should not be held.
void SaveTaskToTicket(fpromise::suspended_task::ticket ticket, FileLineFunction file_line,
fpromise::pending_task task);
bool should_quit_ = false;
bool should_quit_on_no_more_tasks_ = false;
MessageLoopContext context_;
// Tracking information for suspended task tickets. These are handles that are used to suspend or
// resume tasks.
TicketMap tickets_;
fpromise::suspended_task::ticket next_ticket_ = 1;
// These are only accessed on the thread running this loop since they refer to the "current" task.
// They do not need locking.
//
// The current_task_ticket_ is lazily filled when the current task is suspended. 0 means there is
// no current task or the current task hasn't been suspended.
bool current_task_is_promise_ = false; // For assertions to check proper usage.
fpromise::suspended_task::ticket current_task_ticket_ = 0;
FXL_DISALLOW_COPY_AND_ASSIGN(MessageLoop);
};
// Scopes watching a file handle. When the WatchHandle is destroyed, the MessageLoop will stop
// watching the handle. Must only be destroyed on the thread where the MessageLoop is.
//
// Invalid watch handles will have watching() return false.
class MessageLoop::WatchHandle {
public:
// Constructs a WatchHandle not watching anything.
WatchHandle();
// Constructor used by MessageLoop to make one that watches something.
WatchHandle(MessageLoop* msg_loop, int id);
WatchHandle(WatchHandle&&);
WatchHandle(const WatchHandle&) = delete;
// Stops watching.
~WatchHandle();
WatchHandle& operator=(WatchHandle&& other);
WatchHandle& operator=(const WatchHandle& other) = delete;
// Stops watching from the message loop.
// If the handle is not watching, this doesn't do anything.
void StopWatching();
bool watching() const { return id_ > 0; }
private:
friend MessageLoop;
MessageLoop* msg_loop_ = nullptr;
int id_ = 0;
};
// Creates a nonblocking temporary pipe pipe and assigns the two ends of it to the two out
// parameters. Returns true on success.
bool CreateLocalNonBlockingPipe(fbl::unique_fd* out_end, fbl::unique_fd* in_end);
#if !defined(__Fuchsia__)
#undef __TA_REQUIRES
#endif
} // namespace debug
#endif // SRC_DEVELOPER_DEBUG_SHARED_MESSAGE_LOOP_H_