// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "src/media/audio/audio_core/threading_model.h"

#include <lib/async-loop/cpp/loop.h>
#include <lib/async-loop/default.h>
#include <lib/async/cpp/executor.h>
#include <lib/async/cpp/task.h>
#include <lib/async/dispatcher.h>

#include <mutex>
#include <string>
#include <unordered_map>

#include <trace/event.h>

#include "src/lib/syslog/cpp/logger.h"
#include "src/media/audio/audio_core/utils.h"

namespace media::audio {
namespace {

void SetMixDispatcherThreadProfile(async_dispatcher_t* dispatcher) {
  zx::profile profile;
  zx_status_t status = AcquireHighPriorityProfile(&profile);
  if (status != ZX_OK) {
    FX_LOGS(ERROR)
        << "Unable to acquire high priority profile; mix threads will run at normal priority";
    return;
  }
  FX_DCHECK(profile);
  async::PostTask(dispatcher, [profile = std::move(profile)] {
    zx_status_t status = zx::thread::self()->set_profile(profile, 0);
    FX_DCHECK(status == ZX_OK);
  });
}

struct ExecutionDomainHolder {
  ExecutionDomainHolder() : ExecutionDomainHolder(&kAsyncLoopConfigNoAttachToCurrentThread) {}
  ExecutionDomainHolder(const async_loop_config_t* loop_config)
      : loop{loop_config}, executor{loop.dispatcher()} {}
  async::Loop loop;
  async::Executor executor;
  ExecutionDomain domain{loop.dispatcher(), &executor};
};

class ThreadingModelBase : public ThreadingModel {
 public:
  ~ThreadingModelBase() override = default;

  // |ThreadingModel|
  ExecutionDomain& FidlDomain() final { return fidl_domain_.domain; }
  ExecutionDomain& IoDomain() final { return io_domain_.domain; }
  void RunAndJoinAllThreads() override {
    io_domain_.loop.StartThread("io");
    fidl_domain_.loop.Run();
    IoDomain().PostTask([this] { io_domain_.loop.Quit(); });
    io_domain_.loop.JoinThreads();
  }
  void Quit() final {
    FidlDomain().PostTask([this] { fidl_domain_.loop.Quit(); });
  }

 private:
  ExecutionDomainHolder fidl_domain_{&kAsyncLoopConfigAttachToCurrentThread};
  ExecutionDomainHolder io_domain_;
};

class ThreadingModelMixOnFidlThread : public ThreadingModelBase {
 public:
  ~ThreadingModelMixOnFidlThread() override = default;

  // |ThreadingModel|
  OwnedDomainPtr AcquireMixDomain() final {
    return OwnedDomainPtr(&FidlDomain(), [](auto* p) {});
  }
};

class ThreadingModelMixOnSingleThread : public ThreadingModelBase {
 public:
  ~ThreadingModelMixOnSingleThread() override = default;

  // |ThreadingModel|
  OwnedDomainPtr AcquireMixDomain() final {
    return OwnedDomainPtr(&mix_domain_.domain, [](auto* p) {});
  }
  void RunAndJoinAllThreads() final {
    mix_domain_.loop.StartThread("mixer");
    SetMixDispatcherThreadProfile(mix_domain_.loop.dispatcher());
    ThreadingModelBase::RunAndJoinAllThreads();
    mix_domain_.domain.PostTask([this] { mix_domain_.loop.Quit(); });
    mix_domain_.loop.JoinThreads();
  }

 private:
  ExecutionDomainHolder mix_domain_;
};

class ThreadingModelThreadPerMix : public ThreadingModelBase {
 public:
  ~ThreadingModelThreadPerMix() override = default;

  // |ThreadingModel|
  OwnedDomainPtr AcquireMixDomain() final {
    TRACE_DURATION("audio.debug", "ThreadingModelThreadPerMix::AcquireMixDomain");
    std::string thread_name;
    auto holder = std::make_unique<ExecutionDomainHolder>();
    auto dispatcher = holder->loop.dispatcher();
    auto domain = &holder->domain;
    {
      std::lock_guard<std::mutex> guard(lock_);
      if (shut_down_) {
        return nullptr;
      }
      thread_name = "mixer-" + std::to_string(mix_thread_number_++);
      holder->loop.StartThread(thread_name.c_str());
      bool inserted = mix_domains_.insert(std::make_pair(dispatcher, std::move(holder))).second;
      FX_DCHECK(inserted);
    }

    SetMixDispatcherThreadProfile(dispatcher);
    return OwnedDomainPtr(domain, [this](auto* domain) {
      TRACE_DURATION("audio.debug", "ThreadingModelThreadPerMix.delete_domain");
      // We use the IO dispatcher here because the async::Loop dtor will implicitly do a join on
      // the dispatcher threads, so we cannot run that dtor on the mix loop. Since it will block
      // while joining, we choose the io dispatcher which is there to do potentially blocking
      // operations
      auto nonce = TRACE_NONCE();
      TRACE_FLOW_BEGIN("audio.debug", "ThreadingModelThreadPerMix.release", nonce);
      IoDomain().PostTask([this, dispatcher = domain->dispatcher(), nonce] {
        TRACE_DURATION("audio.debug", "ThreadingModelThreadPerMix.release_thunk");
        TRACE_FLOW_END("audio.debug", "ThreadingModelThreadPerMix.release", nonce);
        ReleaseDomainForDispatcher(dispatcher);
      });
    });
  }
  void RunAndJoinAllThreads() final {
    ThreadingModelBase::RunAndJoinAllThreads();
    {
      std::lock_guard<std::mutex> guard(lock_);
      shut_down_ = true;
      // First post a task to each mix loop to quit these loops.
      for (auto it = mix_domains_.begin(); it != mix_domains_.end(); ++it) {
        async::PostTask(it->first, [loop = &it->second->loop] { loop->Quit(); });
      }
      // Now just wait for all in-flight tasks to complete.
      for (auto it = mix_domains_.begin(); it != mix_domains_.end(); ++it) {
        it->second->loop.JoinThreads();
      }
    }
  }

 private:
  void ReleaseDomainForDispatcher(async_dispatcher_t* dispatcher) {
    TRACE_DURATION("audio.debug", "ThreadingModelThreadPerMix::ReleaseDomainForDispatcher");
    std::unique_ptr<ExecutionDomainHolder> domain;
    {
      std::lock_guard<std::mutex> guard(lock_);
      auto it = mix_domains_.find(dispatcher);
      FX_DCHECK(it != mix_domains_.end());
      domain = std::move(it->second);
      mix_domains_.erase(it);
    }

    domain->loop.Shutdown();
  }

  std::mutex lock_;
  bool shut_down_ __TA_GUARDED(lock_) = false;
  std::unordered_map<async_dispatcher_t*, std::unique_ptr<ExecutionDomainHolder>> mix_domains_
      __TA_GUARDED(lock_);
  uint32_t mix_thread_number_ = 0;
};

}  // namespace

std::unique_ptr<ThreadingModel> ThreadingModel::CreateWithMixStrategy(MixStrategy mix_strategy) {
  switch (mix_strategy) {
    case MixStrategy::kMixOnFidlThread:
      return std::make_unique<ThreadingModelMixOnFidlThread>();
    case MixStrategy::kMixOnSingleThread:
      return std::make_unique<ThreadingModelMixOnSingleThread>();
    case MixStrategy::kThreadPerMix:
      return std::make_unique<ThreadingModelThreadPerMix>();
  }
}

}  // namespace media::audio
