// Copyright 2016 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include <lib/async/cpp/operation.h>

#include <utility>

#include <lib/async/cpp/task.h>
#include <lib/async/default.h>
#include <lib/fit/bridge.h>
#include <lib/fit/defer.h>
#include <lib/fxl/logging.h>
#include <trace/event.h>

namespace modular {

constexpr char kModularTraceCategory[] = "modular";
constexpr char kTraceIdKey[] = "id";
constexpr char kTraceInfoKey[] = "info";

OperationContainer::OperationContainer() = default;

OperationContainer::~OperationContainer() = default;

void OperationContainer::Add(OperationBase* const o) {
  FXL_DCHECK(o != nullptr);
  o->SetOwner(this);
  Hold(o);  // Takes ownership.
}

void OperationContainer::Schedule(OperationBase* const o) { o->Schedule(); }

void OperationContainer::InvalidateWeakPtrs(OperationBase* const o) {
  o->InvalidateWeakPtrs();
}

OperationCollection::OperationCollection()
    : executor_(async_get_default_dispatcher()), weak_ptr_factory_(this) {}

OperationCollection::~OperationCollection() {
  // We invalidate weakptrs to all Operation<>s first before destroying them, so
  // that an outstanding FlowToken<> that gets destroyed in the process doesn't
  // erroneously call Operation<>::Done.
  for (auto& operation : operations_) {
    FXL_DCHECK(operation.get() != nullptr);
    InvalidateWeakPtrs(operation.get());
  }
}

fxl::WeakPtr<OperationContainer> OperationCollection::GetWeakPtr() {
  return weak_ptr_factory_.GetWeakPtr();
}

void OperationCollection::Hold(OperationBase* const o) {
  operations_.emplace_back(o);
  Schedule(o);
}

void OperationCollection::Drop(OperationBase* const o) {
  auto it = std::find_if(
      operations_.begin(), operations_.end(),
      [o](const std::unique_ptr<OperationBase>& p) { return p.get() == o; });
  FXL_DCHECK(it != operations_.end());

  // Ensures we erase the operation off our container first.
  // By keeping a reference to the operation its destructor is only triggered
  // after the scope of this method. Otherwise, we would trigger the
  // operation's destructor first before we actually removed the operation from
  // the container. To prevent reentry in case, an operation might have a
  // member variable to someone who has the parent container containing our
  // operation.
  //
  // Example:
  // operations.erase(operation) calls operation's destructor ~KillModuleCall()
  //   ~KillModuleCall() [has member variable FlowToken of OnModuleUpdatedCall]
  //    ~OnModuleUpdatedCall() [holding container containing KillModuleCall]
  //      ~OperationQueue() [triggered by ~OnModuleUpdatedCall()]
  //      [ERROR KillModuleCall in operation_ is nullptr at this point]
  // operations_.erase() actually erases the operation from the container
  //
  // See operation_unittest.cc for testcase. TestCollectionNotNullPtr
  std::unique_ptr<OperationBase> operation = std::move(*it);
  FXL_DCHECK(it->get() == nullptr);
  FXL_DCHECK(operation.get() == o);
  InvalidateWeakPtrs(operation.get());
  operations_.erase(it);
}

void OperationCollection::Cont() {
  // no-op for operation collection.
}

void OperationCollection::ScheduleTask(fit::pending_task task) {
  executor_.schedule_task(std::move(task));
}

OperationQueue::OperationQueue()
    : executor_(async_get_default_dispatcher()), weak_ptr_factory_(this) {}

OperationQueue::~OperationQueue() {
  // We invalidate weakptrs to all Operation<>s first before destroying them, so
  // that an outstanding FlowToken<> that gets destroyed in the process doesn't
  // erroneously call Operation<>::Done.
  while (!operations_.empty()) {
    FXL_DCHECK(operations_.front().get() != nullptr);
    InvalidateWeakPtrs(operations_.front().get());
    operations_.pop();
  }
}

fxl::WeakPtr<OperationContainer> OperationQueue::GetWeakPtr() {
  return weak_ptr_factory_.GetWeakPtr();
}

void OperationQueue::Hold(OperationBase* const o) {
  operations_.emplace(o);
  if (idle_) {
    FXL_DCHECK(operations_.size() == 1);
    idle_ = false;
    Schedule(o);
  }
}

void OperationQueue::Drop(OperationBase* const o) {
  FXL_DCHECK(!operations_.empty());
  FXL_DCHECK(operations_.front().get() == o);

  // See comment in OperationCollection::Drop() for why this move is important.
  std::unique_ptr<OperationBase> operation = std::move(operations_.front());
  FXL_DCHECK(operations_.front().get() == nullptr);
  FXL_DCHECK(operation.get() == o);
  InvalidateWeakPtrs(operation.get());
  operations_.pop();
}

void OperationQueue::Cont() {
  if (!operations_.empty()) {
    auto o = operations_.front().get();
    Schedule(o);
  } else {
    idle_ = true;
  }
}

namespace {
class PromiseWrapperCall : public Operation<> {
 public:
  PromiseWrapperCall(fit::completer<> completer)
      : Operation("PromiseDoneCall", [] {}), completer_(std::move(completer)) {}

  void Run() {
    running_ = true;
    completer_.complete_ok();
  }

  void SayDone() {
    FXL_CHECK(running_);
    Done();
  }

 private:
  fit::completer<> completer_;
  bool running_{false};
};
}  // namespace

void OperationQueue::ScheduleTask(fit::pending_task task) {
  // We need to block the execution of this task on tasks in |operations_|, and
  // then also block further execution of |operations_| on this task finishing.
  // We do this by "wrapping" the promise in a PromiseWrapperCall.
  //
  // |start| will be completed when |wrapper->Run()| is called, and |wrapper|
  // will be finished as a result of the promise finishing: we call
  // |wrapper->SayDone()| when |task| is destroyed. It is destroyed when either
  // a) it is abandoned or b) it is completed successfully or with an error. In
  // either case we want to unblock the next operation in the queue.
  fit::bridge start;
  auto wrapper = new PromiseWrapperCall(std::move(start.completer));
  executor_.schedule_task(start.consumer.promise().then(
      [p = task.take_promise(), wrapper](fit::result<>&) mutable {
        // It is safe to call SayDone() on |wrapper| because we know that
        // |wrapper| will be alive so long as this promise is being executed.
        //
        // We use a fit::defer on the capture list of .then() so that if |p| is
        // abandoned, we still unblock the queue.
        return p.then([defer = fit::defer([wrapper] { wrapper->SayDone(); })](
                          fit::result<>&) {});
      }));

  Add(wrapper);
}

OperationBase::OperationBase(const char* const trace_name,
                             std::string trace_info)
    // While we transition all operations to be explicitly added to containers
    // with OperationContainer::Add(), some |c|'s are going to be null.
    : weak_ptr_factory_(this),
      trace_name_(trace_name),
      trace_id_(TRACE_NONCE()),
      trace_info_(std::move(trace_info)) {}

OperationBase::~OperationBase() = default;

fxl::WeakPtr<OperationBase> OperationBase::GetWeakPtr() {
  return weak_ptr_factory_.GetWeakPtr();
}

void OperationBase::SetOwner(OperationContainer* c) {
  FXL_DCHECK(!container_);
  container_ = c->GetWeakPtr();
}

void OperationBase::Schedule() {
  TraceAsyncBegin();

  async::PostTask(async_get_default_dispatcher(),
                  [this, weak = weak_ptr_factory_.GetWeakPtr()] {
                    if (weak) {
                      Run();
                    }
                  });
}

void OperationBase::InvalidateWeakPtrs() {
  weak_ptr_factory_.InvalidateWeakPtrs();
}

void OperationBase::TraceAsyncBegin() {
  TRACE_ASYNC_BEGIN(kModularTraceCategory, trace_name_, trace_id_, kTraceIdKey,
                    trace_id_, kTraceInfoKey, trace_info_);
}

void OperationBase::TraceAsyncEnd() {
  TRACE_ASYNC_END(kModularTraceCategory, trace_name_, trace_id_, kTraceIdKey,
                  trace_id_, kTraceInfoKey, trace_info_);
}

OperationBase::FlowTokenBase::FlowTokenBase(OperationBase* const op)
    : refcount_(new int), weak_op_(op->weak_ptr_factory_.GetWeakPtr()) {
  *refcount_ = 1;
}

OperationBase::FlowTokenBase::FlowTokenBase(const FlowTokenBase& other)
    : refcount_(other.refcount_), weak_op_(other.weak_op_) {
  ++*refcount_;
}

OperationBase::FlowTokenBase::~FlowTokenBase() {
  --*refcount_;
  if (*refcount_ == 0) {
    delete refcount_;
  }
}

}  // namespace modular
