// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#ifndef SRC_DEVELOPER_DEBUG_SHARED_MESSAGE_LOOP_H_
#define SRC_DEVELOPER_DEBUG_SHARED_MESSAGE_LOOP_H_

#include <lib/fpromise/promise.h>

#include <deque>
#include <functional>
#include <limits>
#include <map>
#include <mutex>
#include <string>
#include <vector>

#include <fbl/unique_fd.h>

#include "src/developer/debug/shared/logging/file_line_function.h"
#include "src/lib/fxl/macros.h"

#if defined(__Fuchsia__)
#include <zircon/compiler.h>
#else
// The macros for thread annotations aren't set up for non-Fuchsia builds.
#undef __TA_REQUIRES
#define __TA_REQUIRES(arg)
#endif

namespace debug {

class MessageLoop;

// Context implementation for fpromise::promise integration.
class MessageLoopContext : public fpromise::context {
 public:
  // Pointer must outlive this class.
  explicit MessageLoopContext(MessageLoop* loop) : message_loop_(loop) {}

  // fpromise::context implementation.
  fpromise::executor* executor() const override;
  fpromise::suspended_task suspend_task() override;

 private:
  MessageLoop* message_loop_;
};

// Message loop implementation.
//
// Unlike the one in FXL, this will run on the host in addition to a Zircon target. On Zircon it
// is backed by the async-loop library, on the host (Linux & Mac) it is backed by a poll()
// implementation (see message_loop_poll.h).
//
// This message loop supports several types of tasks:
//  - Bare lambdas.
//  - Delayed lambdas (timers).
//  - fpromise::pending_task objects (normally generated by fpromise::promise).
//  - Async I/O events on file handles.
//
// The Fuchsia-specific subclass can also watch for Zircon async events (see message_loop_fuchsia.h
// for more).
class MessageLoop : public fpromise::executor, public fpromise::suspended_task::resolver {
 public:
  enum class WatchMode {
    kRead,
    kWrite,
    kReadWrite,
  };

  class WatchHandle;

  // There can be only one active MessageLoop in scope per thread at a time.
  //
  // A message loop is active between Init() and Cleanup(). During this period, Current() will
  // return the message loop.
  //
  // Init() / Cleanup() is a separate phase so a message loop can be created and managed on one
  // thread and sent to another thread to actually run (to help with cross-thread task posting).
  MessageLoop();
  ~MessageLoop() override;

  // Init() and Cleanup() must be called on the same thread as Run().
  //
  // Init() returns true on success. On false the error message will be put into the output param.
  virtual bool Init(std::string* error_message);
  virtual void Cleanup();

  // Exits the message loop immediately, not running pending functions. This must be called only on
  // the MessageLoop thread.
  virtual void QuitNow();

  // Returns the current message loop or null if there isn't one.
  static MessageLoop* Current();

  // Runs the message loop.
  void Run();

  // Run until no more tasks are posted. This is not really meant for normal functioning of the
  // debugger. Rather this is geared towards test environments that control what gets inserted into
  // the message loop. This Useful for tests in which tasks post additional tasks.
  //
  // NOTE: OS events (file handles, sockets, signals) are not considered as non-idle tasks.
  //       Basically they're ignored when checking for "idleness".
  void RunUntilNoTasks();

  // Posts the given work to the message loop. It will be added to the end of the work queue.
  void PostTask(FileLineFunction file_line, fit::function<void()> fn);
  void PostTask(FileLineFunction file_line, fpromise::pending_task task);

  // Runs the given task immediately. If it reports a pending completion it will complete
  // asynchronously, otherwise it will complete synchronously. This can be used to start executing
  // a promise without putting it at the back of the message loop.
  //
  // If the task complete asynchronously, it will be added to the queue when it signals a pending
  // completion.
  void RunTask(FileLineFunction file_line, fpromise::pending_task task);

  // Set a task to run after a certain number of milliseconds have elapsed. Granularity is hard to
  // guarantee but the timer shouldn't fire earlier than expected.
  void PostTimer(FileLineFunction file_line, uint64_t delta_ms, fit::function<void()> fn);

  // Starts watching the given file descriptor in the given mode. Returns a WatchHandle that scopes
  // the watch operation (when the handle is destroyed the watcher is unregistered).
  //
  // This function must only be called on the message loop thread.
  //
  // The watcher object will be deleted upon the destruction of the returned WatchHandle. If this
  // happens in the watcher function, care needs to be taken to avoid using any captured variables
  // after they are deleted.
  //
  // You can only watch a handle once. Note that stdin/stdout/stderr can be the same underlying OS
  // handle, so the caller can only watch one of them.
  using FDWatcher = fit::function<void(int fd, bool read, bool write, bool err)>;
  virtual WatchHandle WatchFD(WatchMode mode, int fd, FDWatcher watcher) = 0;

  // fpromise::executor implementation.
  void schedule_task(fpromise::pending_task task) override;

  // fpromise::resolver implementation.
  fpromise::suspended_task::ticket duplicate_ticket(
      fpromise::suspended_task::ticket ticket) override;
  void resolve_ticket(fpromise::suspended_task::ticket ticket, bool resume_task) override;

 protected:
  static constexpr uint64_t kMaxDelay = std::numeric_limits<uint64_t>::max();

  virtual void RunImpl() = 0;

  // Get the value of a monotonic clock in nanoseconds.
  virtual uint64_t GetMonotonicNowNS() const = 0;

  // Used by WatchHandle to unregister a watch. Can be called from any thread without the lock held.
  virtual void StopWatching(int id) = 0;

  // Indicates there are tasks to process. Can be called from any thread and will be called without
  // the lock held.
  virtual void SetHasTasks() = 0;

  // Processes one pending task, returning true if there was work to do, or false if there was
  // nothing. The mutex_ must be held during the call. It will be unlocked during task processing,
  // so the platform implementation that calls it must not assume state did not change across the
  // call.
  bool ProcessPendingTask() __TA_REQUIRES(mutex_);

  // The platform implementation should check should_quit() after every task execution and exit if
  // true.
  bool should_quit() const { return should_quit_; }

  // How much time we should wait before waking up again to process timers.
  uint64_t DelayNS() const;

  // Style guide says this should be private and we should have a protected getter, but that makes
  // the thread annotations much more complicated.
  std::mutex mutex_;

 private:
  friend MessageLoopContext;
  friend WatchHandle;

  // A task is either a bare function or a fpromise::pending function. This is one entry in the
  // task_queue_ of pending runnable tasks.
  struct Task {
    Task() = default;
    Task(FileLineFunction fl, fit::function<void()> fn)
        : file_line(std::move(fl)), task_fn(std::move(fn)) {}
    Task(FileLineFunction fl, fpromise::pending_task pend)
        : file_line(std::move(fl)), pending(std::move(pend)) {}

    FileLineFunction file_line;

    // Only one of these two members will be non-null.
    fit::function<void()> task_fn;
    fpromise::pending_task pending;
  };

  // The data associated with a "ticket". A ticket is the handle behind a fpromise::suspended_task
  // which is used to track fpromise::pending_task objects that have completed asynchronously and to
  // signal that they should be run again.
  struct TicketRecord {
    // A ticket is reference counted, with the references being managed by the
    // fpromise::suspended_task objects. When this reference count gets to 0, the ticket is deleted.
    uint32_t ref_count = 1;

    // Set when the task is resumed. This means it will be moved to the task_queue_ and the task
    // object will be null on this struct. The ticket can exist in this state if there are other
    // fpromise::suspended_task objects that hold a ticket for it, but calling resume() from those
    // will be a no-op.
    bool was_resumed = false;

    // Source of the original post to the message loop.
    FileLineFunction file_line;

    // The actual task. This will be null if the task currently lives on the pending task_queue_.
    // See was_resumed above.
    fpromise::pending_task task;
  };
  using TicketMap = std::map<fpromise::suspended_task::ticket, TicketRecord>;

  // Currently runnable tasks.
  std::deque<Task> task_queue_;

  struct Timer {
    Task task;

    // Expiration time in nanoseconds. The time is absolute and compares to GetMonotonicNowNS.
    uint64_t expiry;
  };
  std::vector<Timer> timers_;

  static bool CompareTimers(const Timer& a, const Timer& b) { return a.expiry >= b.expiry; }

  // Expiration time of the timer which will expire soonest. Returns an upper bound if there are no
  // timers set.
  uint64_t NextExpiryNS() const;

  // Backend for the public PostTask variants above that can handle any task type.
  template <typename TaskType>
  void PostTaskInternal(FileLineFunction file_line, TaskType task);

  // Runs the given task, executing either the lambda or the fpromise::pending_task.
  // The lock must not be held.
  void RunOneTask(Task& task);

  // Backing implementation for the context which gets a suspended_task ticket for the current
  // task.
  fpromise::suspended_task SuspendCurrentTask();

  // Called when a task has reported an async completion. This will save it back to the ticket if
  // one was provided, or it will be deleted if nobody to save it back to the ticket. The lock
  // should not be held.
  void SaveTaskToTicket(fpromise::suspended_task::ticket ticket, FileLineFunction file_line,
                        fpromise::pending_task task);

  bool should_quit_ = false;
  bool should_quit_on_no_more_tasks_ = false;

  MessageLoopContext context_;

  // Tracking information for suspended task tickets. These are handles that are used to suspend or
  // resume tasks.
  TicketMap tickets_;
  fpromise::suspended_task::ticket next_ticket_ = 1;

  // These are only accessed on the thread running this loop since they refer to the "current" task.
  // They do not need locking.
  //
  // The current_task_ticket_ is lazily filled when the current task is suspended. 0 means there is
  // no current task or the current task hasn't been suspended.
  bool current_task_is_promise_ = false;  // For assertions to check proper usage.
  fpromise::suspended_task::ticket current_task_ticket_ = 0;

  FXL_DISALLOW_COPY_AND_ASSIGN(MessageLoop);
};

// Scopes watching a file handle. When the WatchHandle is destroyed, the MessageLoop will stop
// watching the handle. Must only be destroyed on the thread where the MessageLoop is.
//
// Invalid watch handles will have watching() return false.
class MessageLoop::WatchHandle {
 public:
  // Constructs a WatchHandle not watching anything.
  WatchHandle();

  // Constructor used by MessageLoop to make one that watches something.
  WatchHandle(MessageLoop* msg_loop, int id);

  WatchHandle(WatchHandle&&);
  WatchHandle(const WatchHandle&) = delete;

  // Stops watching.
  ~WatchHandle();

  WatchHandle& operator=(WatchHandle&& other);
  WatchHandle& operator=(const WatchHandle& other) = delete;

  // Stops watching from the message loop.
  // If the handle is not watching, this doesn't do anything.
  void StopWatching();

  bool watching() const { return id_ > 0; }

 private:
  friend MessageLoop;

  MessageLoop* msg_loop_ = nullptr;
  int id_ = 0;
};

// Creates a nonblocking temporary pipe pipe and assigns the two ends of it to the two out
// parameters. Returns true on success.
bool CreateLocalNonBlockingPipe(fbl::unique_fd* out_end, fbl::unique_fd* in_end);

#if !defined(__Fuchsia__)
#undef __TA_REQUIRES
#endif

}  // namespace debug

#endif  // SRC_DEVELOPER_DEBUG_SHARED_MESSAGE_LOOP_H_
