// Copyright 2022 The Fuchsia Authors.All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "src/developer/forensics/feedback/annotations/annotation_manager.h"

#include <lib/async/cpp/task.h>
#include <lib/fpromise/bridge.h>
#include <lib/fpromise/promise.h>
#include <lib/syslog/cpp/macros.h>

#include <map>
#include <memory>
#include <vector>

namespace forensics::feedback {
namespace {

void InsertUnique(const Annotations& annotations, const std::set<std::string>& allowlist,
                  Annotations* out) {
  for (const auto& [k, v] : annotations) {
    if (allowlist.count(k) != 0) {
      FX_CHECK(out->count(k) == 0) << "Attempting to re-insert " << k;
      out->insert({k, v});
    }
  }
}

void InsertUnique(const Annotations& annotations, Annotations* out) {
  for (const auto& [k, v] : annotations) {
    FX_CHECK(out->count(k) == 0) << "Attempting to re-insert " << k;
    out->insert({k, v});
  }
}

// Inserts all keys in |keys| with a value of |error| into |out|, if they don't already have a
// value.
void InsertMissing(const std::set<std::string>& keys, const Error error,
                   const std::set<std::string>& allowlist, Annotations* out) {
  for (const auto& key : keys) {
    if (allowlist.count(key) == 0 || out->count(key) != 0) {
      continue;
    }

    out->insert({key, error});
  }
}

template <typename Container, typename T>
void Remove(Container& c, T v) {
  c.erase(std::remove(c.begin(), c.end(), v), c.end());
}

// Creates a callable object that can be used to complete an asynchronous flow and an object to
// consume its results.
auto CompleteAndConsume() {
  ::fpromise::bridge<> bridge;

  auto completer = std::make_shared<::fpromise::completer<>>(std::move(bridge.completer));
  auto complete = [completer] {
    if ((*completer)) {
      completer->complete_ok();
    }
  };

  return std::make_pair(std::move(complete), std::move(bridge.consumer));
}

}  // namespace

AnnotationManager::AnnotationManager(
    async_dispatcher_t* dispatcher, std::set<std::string> allowlist,
    const Annotations static_annotations, NonPlatformAnnotationProvider* non_platform_provider,
    std::vector<DynamicSyncAnnotationProvider*> dynamic_sync_providers,
    std::vector<StaticAsyncAnnotationProvider*> static_async_providers,
    std::vector<CachedAsyncAnnotationProvider*> cached_async_providers,
    std::vector<DynamicAsyncAnnotationProvider*> dynamic_async_providers)
    : dispatcher_(dispatcher),
      allowlist_(std::move(allowlist)),
      static_annotations_(),
      non_platform_provider_(non_platform_provider),
      dynamic_sync_providers_(std::move(dynamic_sync_providers)),
      static_async_providers_(std::move(static_async_providers)),
      dynamic_async_providers_(std::move(dynamic_async_providers)),
      cached_async_providers_(std::move(cached_async_providers)) {
  for (const auto& k : allowlist_) {
    // Count the number of providers in |providers| that collect |k|.
    auto Count = [&k](const auto& providers) {
      size_t count{0u};
      for (auto* p : providers) {
        count += p->GetKeys().count(k);
      }

      return count;
    };

    const auto num_providers = static_annotations.count(k) + Count(dynamic_sync_providers_) +
                               Count(static_async_providers_) + Count(dynamic_async_providers_) +
                               Count(cached_async_providers_);

    FX_CHECK(num_providers == 1) << "Annotation \"" << k << "\" collected by " << num_providers
                                 << " providers";
  }

  InsertUnique(static_annotations, allowlist_, &static_annotations_);

  // Create a weak pointer because |this| isn't guaranteed to outlive providers.
  auto self = ptr_factory_.GetWeakPtr();
  for (auto* provider : static_async_providers_) {
    provider->GetOnce([self, provider](const Annotations annotations) {
      if (!self) {
        return;
      }

      InsertUnique(annotations, self->allowlist_, &(self->static_annotations_));

      // Remove the reference to |provider| once it has returned its annotations.
      Remove(self->static_async_providers_, provider);
      if (!self->static_async_providers_.empty()) {
        return;
      }

      // No static async providers remain so complete all calls to WaitForStaticAsync.
      for (auto& waiting : self->waiting_for_static_) {
        waiting();
        waiting = nullptr;
      }

      Remove(self->waiting_for_static_, nullptr);
    });
  }

  for (auto* provider : cached_async_providers_) {
    provider->GetOnUpdate(
        [self, provider, keys = provider->GetKeys()](const Annotations annotations) {
          if (!self) {
            return;
          }

          // Clear the last collected annotations.
          for (const auto& key : keys) {
            self->cached_annotations_.erase(key);
          }

          InsertUnique(annotations, self->allowlist_, &(self->cached_annotations_));

          // Remove the reference to |provider| once it has returned its annotations. This is safe
          // because the original provider still exists outside the AnnotationManager.
          Remove(self->cached_async_providers_, provider);
          if (!self->cached_async_providers_.empty()) {
            return;
          }

          // No cached async providers remain so complete all calls to WaitForCachedAsync.
          for (auto& waiting : self->waiting_for_cached_) {
            waiting();
            waiting = nullptr;
          }

          Remove(self->waiting_for_cached_, nullptr);
        });
  }
}

::fpromise::promise<Annotations> AnnotationManager::GetAll(const zx::duration timeout) {
  // Create a weak pointer because |this| isn't guaranteed to outlive providers.
  auto self = ptr_factory_.GetWeakPtr();

  return ::fpromise::join_promises(WaitForStaticAsync(timeout), WaitForCachedAsync(timeout),
                                   WaitForDynamicAsync(timeout))
      .and_then([self](std::tuple<::fpromise::result<>, ::fpromise::result<>,
                                  ::fpromise::result<Annotations>>& results) {
        Annotations annotations = self->ImmediatelyAvailable();

        // Add the dynamic async annotations.
        InsertUnique(std::get<2>(results).value(), &annotations);

        // Any async annotations not collected timed out.
        for (const auto& p : self->static_async_providers_) {
          InsertMissing(p->GetKeys(), Error::kTimeout, self->allowlist_, &annotations);
        }

        for (const auto& p : self->cached_async_providers_) {
          InsertMissing(p->GetKeys(), Error::kTimeout, self->allowlist_, &annotations);
        }

        for (const auto& p : self->dynamic_async_providers_) {
          InsertMissing(p->GetKeys(), Error::kTimeout, self->allowlist_, &annotations);
        }

        return ::fpromise::ok(std::move(annotations));
      });
}

Annotations AnnotationManager::ImmediatelyAvailable() const {
  Annotations annotations(static_annotations_);

  InsertUnique(cached_annotations_, allowlist_, &annotations);

  for (auto* provider : dynamic_sync_providers_) {
    InsertUnique(provider->Get(), allowlist_, &annotations);
  }

  if (non_platform_provider_ != nullptr) {
    InsertUnique(non_platform_provider_->Get(), &annotations);
  }

  return annotations;
}

bool AnnotationManager::IsMissingNonPlatformAnnotations() const {
  return (non_platform_provider_ == nullptr) ? false
                                             : non_platform_provider_->IsMissingAnnotations();
}

::fpromise::promise<> AnnotationManager::WaitForStaticAsync(const zx::duration timeout) {
  // All static async annotations have been collected.
  if (static_async_providers_.empty()) {
    return ::fpromise::make_ok_promise();
  }

  auto [complete, consume] = CompleteAndConsume();

  async::PostDelayedTask(dispatcher_, complete, timeout);
  waiting_for_static_.push_back(complete);

  return consume.promise_or(::fpromise::error()).or_else([] {
    FX_LOGS(FATAL) << "Promise for waiting on static annotations was incorrectly dropped";
    return ::fpromise::error();
  });
}

::fpromise::promise<> AnnotationManager::WaitForCachedAsync(const zx::duration timeout) {
  // All cached async annotations have been collected.
  if (cached_async_providers_.empty()) {
    return ::fpromise::make_ok_promise();
  }

  auto [complete, consume] = CompleteAndConsume();

  async::PostDelayedTask(dispatcher_, complete, timeout);
  waiting_for_cached_.push_back(complete);

  return consume.promise_or(::fpromise::error()).or_else([] {
    FX_LOGS(FATAL) << "Promise for waiting on cached annotations was incorrectly dropped";
    return ::fpromise::error();
  });
}

namespace {

// Stores state needed to join the result of dynamic async annotation flows.
struct AsyncAnnotations {
  Annotations annotations;
  std::vector<DynamicAsyncAnnotationProvider*> providers;
  ::std::function<void()> complete;
};

}  // namespace

::fpromise::promise<Annotations> AnnotationManager::WaitForDynamicAsync(
    const zx::duration timeout) {
  // No need to collect dynamic async annotations.
  if (dynamic_async_providers_.empty()) {
    return ::fpromise::make_ok_promise(Annotations{});
  }

  auto [complete, consume] = CompleteAndConsume();

  auto async_state = std::make_shared<AsyncAnnotations>(AsyncAnnotations{
      .annotations = {},
      .providers = dynamic_async_providers_,
      .complete = complete,
  });

  // Create a weak pointer because |this| isn't guaranteed to outlive providers.
  auto self = ptr_factory_.GetWeakPtr();
  for (auto* provider : async_state->providers) {
    provider->Get([self, provider, async_state](const Annotations annotations) {
      if (!self) {
        return;
      }

      InsertUnique(annotations, self->allowlist_, &(async_state->annotations));

      // Remove the reference to |provider| once it has returned its annotations.
      Remove(async_state->providers, provider);
      if (!async_state->providers.empty()) {
        return;
      }

      // No dynamic async providers remain so complete the call to WaitForDynamicAsync.
      async_state->complete();
    });
  }

  async::PostDelayedTask(dispatcher_, async_state->complete, timeout);

  return consume.promise_or(::fpromise::error())
      .and_then([async_state] { return ::fpromise::ok(async_state->annotations); })
      .or_else([] {
        FX_LOGS(FATAL) << "Promise for waiting on dynamic annotations was incorrectly dropped";
        return ::fpromise::error();
      });
}

}  // namespace forensics::feedback
