// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "src/developer/debug/shared/message_loop.h"

#include <lib/syslog/cpp/macros.h>

#include <algorithm>

namespace debug_ipc {

namespace {

thread_local MessageLoop* current_message_loop = nullptr;

}  // namespace

fit::executor* MessageLoopContext::executor() const { return message_loop_; }

fit::suspended_task MessageLoopContext::suspend_task() {
  return message_loop_->SuspendCurrentTask();
}

MessageLoop::MessageLoop() : context_(this) {}

MessageLoop::~MessageLoop() {
  FX_DCHECK(Current() != this);  // Cleanup() should have been called.
}

bool MessageLoop::Init(std::string* error_message) {
  FX_DCHECK(!current_message_loop);
  current_message_loop = this;
  return true;
}

void MessageLoop::Cleanup() {
  FX_DCHECK(current_message_loop == this);
  current_message_loop = nullptr;
}

// static
MessageLoop* MessageLoop::Current() { return current_message_loop; }

void MessageLoop::Run() {
  should_quit_ = false;
  RunImpl();
}

void MessageLoop::RunUntilNoTasks() {
  // Check if there are no tasks right now. If so, we exit immediately.
  {
    std::lock_guard<std::mutex> lock(mutex_);
    if (task_queue_.empty())
      return;
  }

  should_quit_on_no_more_tasks_ = true;
  Run();
}

void MessageLoop::PostTask(FileLineFunction file_line, fit::function<void()> fn) {
  PostTaskInternal(std::move(file_line), std::move(fn));
}

void MessageLoop::PostTask(FileLineFunction file_line, fit::pending_task task) {
  PostTaskInternal(std::move(file_line), std::move(task));
}

void MessageLoop::RunTask(FileLineFunction file_line, fit::pending_task pending_task) {
  FX_DCHECK(pending_task);

  Task task(std::move(file_line), std::move(pending_task));
  RunOneTask(task);
}

void MessageLoop::PostTimer(FileLineFunction file_line, uint64_t delta_ms,
                            fit::function<void()> fn) {
  constexpr uint64_t kMsToNs = 1000000;

  bool needs_awaken;
  uint64_t expiry = delta_ms * kMsToNs + GetMonotonicNowNS();

  {
    std::lock_guard<std::mutex> guard(mutex_);
    needs_awaken = task_queue_.empty() && NextExpiryNS() > expiry;
    timers_.push_back({{std::move(file_line), std::move(fn)}, expiry});
    std::push_heap(timers_.begin(), timers_.end(), &CompareTimers);
  }
  if (needs_awaken)
    SetHasTasks();
}

void MessageLoop::schedule_task(fit::pending_task task) { PostTask(FROM_HERE, std::move(task)); }

fit::suspended_task::ticket MessageLoop::duplicate_ticket(fit::suspended_task::ticket ticket) {
  std::lock_guard<std::mutex> guard(mutex_);

  auto found = tickets_.find(ticket);
  FX_DCHECK(found != tickets_.end());

  FX_DCHECK(found->second.ref_count > 0);
  found->second.ref_count++;
  return ticket;
}

void MessageLoop::resolve_ticket(fit::suspended_task::ticket ticket, bool resume_task) {
  // Implementation note: The fit single_thread_executor has the behavior that resolving the ticket
  // moves the promise to the run queue, and then it's executed in order from there.
  //
  // However, this has the side effect of reordering the promise execution with respect to
  // non-promise-related tasks that are also executing on the message loop.
  //
  // As an example, consider attaching to a process which involves resolving a promise in the attach
  // reply message handler. There are non-promise-related messages in the message loop such as push
  // notifications about thread events from the remote debug agent. Requiring the resolution of the
  // promise be pushed to the back of the message queue will make it run after the processing of the
  // new thread messages and the replies would be executed in an order that doesn't make any sense.
  //
  // As a result, we run resolved promises synchronously when they're resolved.
  //
  // This has the disadvantage of potentially generating very deep stacks and one can construct
  // reentrant situations where this behavior might be surprising. But given the amount of
  // non-promise-related tasks our message loop currently runs and how most promises are only
  // resolved in response to IPC messages, the alternative is more surprising. If everything was a
  // promise, we could post it to the back of the task_queue_ with no problem (other than a slight
  // performance penalty by going through the loop again).

  Task task;                // The task (to run or delete outside of the lock).
  bool should_run = false;  // Whether to run the above task (otherwise just delete it).

  {
    std::lock_guard<std::mutex> guard(mutex_);

    FX_DCHECK(ticket != current_task_ticket_) << "Trying to resolve a task from within itself.";

    auto found = tickets_.find(ticket);
    FX_DCHECK(found != tickets_.end()) << "Bad ticket.";

    found->second.ref_count--;

    if (resume_task && !found->second.was_resumed) {
      // Task should be run (if was_resumed was already set, it was already moved to the run queue
      // so we don't have to do it again).
      should_run = true;

      // Mark as run. If the refcount isn't 0 yet this struct will still be around and we don't want
      // to run it again.
      found->second.was_resumed = true;
      task = Task(found->second.file_line, std::move(found->second.task));
    }

    if (found->second.ref_count == 0) {
      // Tickets are all closed. It it was resumed, the task will now be run queue, and if it wasn't
      // the task will be dropped with this operation.

      // Task could have already been moved out above.
      if (found->second.task) {
        // Free task outside lock, keep should_run false to avoid running.
        task = Task(FileLineFunction(), std::move(found->second.task));
      }
      tickets_.erase(found);
    }
  }
  if (should_run)
    RunOneTask(task);
}

uint64_t MessageLoop::DelayNS() const {
  // NextExpiry will return kMaxDelay if there are no timers queued.
  uint64_t expiry = NextExpiryNS();
  if (expiry == kMaxDelay) {
    return kMaxDelay;
  }

  // We check how much more time we need to wait.
  uint64_t now = GetMonotonicNowNS();
  if (expiry > now) {
    return expiry - now;
  }

  return 0;
}

uint64_t MessageLoop::NextExpiryNS() const {
  if (timers_.empty()) {
    return kMaxDelay;
  }

  return timers_[0].expiry;
}

fit::suspended_task MessageLoop::SuspendCurrentTask() {
  std::lock_guard<std::mutex> guard(mutex_);

  FX_DCHECK(current_task_is_promise_) << "Can only suspend when running a promise.";
  if (!current_task_ticket_) {
    // The current task has no ticket, make a new one.
    current_task_ticket_ = next_ticket_;
    next_ticket_++;

    tickets_.emplace(current_task_ticket_, TicketRecord());
  } else {
    duplicate_ticket(current_task_ticket_);
  }

  return fit::suspended_task(this, current_task_ticket_);
}

template <typename TaskType>
void MessageLoop::PostTaskInternal(FileLineFunction file_line, TaskType task) {
  bool needs_awaken;
  {
    std::lock_guard<std::mutex> guard(mutex_);
    needs_awaken = task_queue_.empty();
    task_queue_.emplace_back(std::move(file_line), std::move(task));
  }
  if (needs_awaken)
    SetHasTasks();
}

void MessageLoop::RunOneTask(Task& task) {
  if (task.task_fn) {
    task.task_fn();
  } else if (task.pending) {
    // Run the fit::pending_task (generated by promises).

    // This code may be run nested via RunTask() so keep the old current task state so we can
    // restore it.
    bool old_task_is_promise = current_task_is_promise_;
    fit::suspended_task::ticket old_current_ticket = current_task_ticket_;

    current_task_is_promise_ = true;
    current_task_ticket_ = 0;

    bool finished = task.pending(context_);
    FX_DCHECK(!task.pending == finished) << "Finished state should be consistent.";
    (void)finished;

    if (current_task_ticket_) {
      // Task was suspended and a ticket was generated.
      //
      // This function locks again which is unfortunate. We could save this state and execute
      // this work after the mutex is locked again at the bottom of this loop, but that
      // complicates the execution flow.
      SaveTaskToTicket(current_task_ticket_, std::move(task.file_line), std::move(task.pending));
    }

    current_task_ticket_ = old_current_ticket;
    current_task_is_promise_ = old_task_is_promise;
  } else {
    FX_NOTREACHED();
  }
}

void MessageLoop::SaveTaskToTicket(fit::suspended_task::ticket ticket, FileLineFunction file_line,
                                   fit::pending_task task) {
  FX_DCHECK(task) << "The task should not be finished if we're saving it.";

  bool needs_awaken = false;

  std::lock_guard<std::mutex> guard(mutex_);

  {
    auto found = tickets_.find(ticket);
    FX_DCHECK(found != tickets_.end()) << "Ticket was invalid.";

    if (found->second.was_resumed) {
      // The ticket was suspended and then resumed from within the same run of the promise. It is
      // moved immediately to the runnable queue.
      needs_awaken = task_queue_.empty();
      task_queue_.emplace_back(std::move(file_line), std::move(task));
    } else if (found->second.ref_count != 0) {
      // Suspend tickets still out, keep suspended until marked resumed.
      found->second.task = std::move(task);
    }

    if (found->second.ref_count == 0) {
      // No refcount, can drop the ticket. The task could either be marked runnable and currently
      // scheduled to be run, or it could be dropped.
      tickets_.erase(found);
    }
  }

  if (needs_awaken)
    SetHasTasks();
}

void MessageLoop::QuitNow() { should_quit_ = true; }

bool MessageLoop::ProcessPendingTask() {
  // This function will be called with the mutex held.
  if (task_queue_.empty() && DelayNS() > 0) {
    if (should_quit_on_no_more_tasks_) {
      should_quit_on_no_more_tasks_ = false;
      QuitNow();
    }
    return false;
  }

  Task task;
  if (!task_queue_.empty()) {
    task = std::move(task_queue_.front());
    task_queue_.pop_front();
  } else {
    std::pop_heap(timers_.begin(), timers_.end(), &CompareTimers);
    task = std::move(timers_.back().task);
    timers_.pop_back();
  }

  mutex_.unlock();
  RunOneTask(task);
  mutex_.lock();

  return true;
}

MessageLoop::WatchHandle::WatchHandle() = default;
MessageLoop::WatchHandle::WatchHandle(MessageLoop* msg_loop, int id)
    : msg_loop_(msg_loop), id_(id) {}

MessageLoop::WatchHandle::WatchHandle(WatchHandle&& other)
    : msg_loop_(other.msg_loop_), id_(other.id_) {
  other.msg_loop_ = nullptr;
  other.id_ = 0;
}

MessageLoop::WatchHandle::~WatchHandle() { StopWatching(); }

void MessageLoop::WatchHandle::StopWatching() {
  if (watching())
    msg_loop_->StopWatching(id_);
  msg_loop_ = nullptr;
  id_ = 0;
}

MessageLoop::WatchHandle& MessageLoop::WatchHandle::operator=(WatchHandle&& other) {
  // Should never get into a self-assignment situation since this is not
  // copyable and every ID should be unique. Do allow self-assignment of
  // null ones though.
  FX_DCHECK(!watching() || (msg_loop_ != other.msg_loop_ || id_ != other.id_));
  if (watching())
    msg_loop_->StopWatching(id_);
  msg_loop_ = other.msg_loop_;
  id_ = other.id_;

  other.msg_loop_ = nullptr;
  other.id_ = 0;
  return *this;
}

}  // namespace debug_ipc
