// Copyright 2018 The Fuchsia Authors. All rights reserved.  Use of this source
// code is governed by a BSD-style license that can be found in the LICENSE
// file.

#include "src/observation_store/file_observation_store.h"

#include <ctime>
#include <iomanip>
#include <regex>
#include <utility>

#include "src/logger/internal_metrics.h"
#include "src/logger/internal_metrics_config.cb.h"
#include "src/logger/logger_interface.h"
#include "src/logging.h"
#include "src/observation_store/observation_store_internal.pb.h"
#include "src/tracing.h"
#include "third_party/protobuf/src/google/protobuf/util/delimited_message_util.h"

namespace cobalt::observation_store {

using lib::statusor::StatusOr;
using util::FileSystem;
using util::Status;
using util::StatusCode;

constexpr char kActiveFileName[] = "in_progress.data";
// The first part of the filename is 13 digits representing the milliseconds
// since the unix epoch. The millisecond timestamp became 13 digits in
// September 2001, and won't be 14 digits until 2286.
//
// The second part of the filename is 10 digits, which is a random number in the
// range 1000000000-9999999999.
const std::regex kFinalizedFileRegex(R"(\d{13}-\d{10}.data)");
constexpr uint32_t kTimestampWidth = 13;
constexpr uint64_t kMinRandomNumber = 1000000000;
constexpr uint64_t kMaxRandomNumber = 9999999999;

FileObservationStore::FileObservationStore(size_t max_bytes_per_observation,
                                           size_t max_bytes_per_envelope, size_t max_bytes_total,
                                           FileSystem *fs, std::string root_directory,
                                           std::string name,
                                           logger::InternalMetrics *internal_metrics)
    : ObservationStore(max_bytes_per_observation, max_bytes_per_envelope, max_bytes_total),
      fs_(fs),
      root_directory_(std::move(root_directory)),
      active_file_name_(FullPath(kActiveFileName)),
      name_(std::move(name)) {
  ResetInternalMetrics(internal_metrics);
  CHECK(fs_);

  // Check if root_directory_ already exists.
  if (!fs_->ListFiles(root_directory_).ok()) {
    // If it doesn't exist, create it here.
    // TODO(zmbush): If MakeDirectory doesn't work, we should fail over to
    // MemoryObservationStore.
    CHECK(fs_->MakeDirectory(root_directory_));
  }

  {
    auto fields = protected_fields_.lock();
    fields->finalized_bytes = 0;

    for (const auto &file : ListFinalizedFiles()) {
      fields->finalized_bytes += fs_->FileSize(FullPath(file)).ConsumeValueOr(0);
    }

    // If there exists an active file, it likely means that the process
    // terminated unexpectedly last time. In this case, the file should be
    // finalized in order to be Taken from the store.
    //
    // For simplicity's sake, we attempt to finalize the active_file_name_ while
    // ignoring the result. If the operation succeeds, then we rescued the
    // active file. Otherwise, there probably was no active file in the first
    // place.
    FinalizeActiveFile(&fields);
  }
}

ObservationStore::StoreStatus FileObservationStore::StoreObservation(
    std::unique_ptr<StoredObservation> observation, std::unique_ptr<ObservationMetadata> metadata) {
  if (IsDisabled()) {
    return kOk;
  }

  TRACE_DURATION("cobalt_core", "FileObservationStore::StoreObservation");
  auto fields = protected_fields_.lock();

  size_t obs_size = observation->ByteSizeLong();
  internal_metrics_->BytesStored(logger::PerProjectBytesStoredMetricDimensionStatus::Attempted,
                                 obs_size, metadata->customer_id(), metadata->project_id());

  auto active_file = GetActiveFile(&fields);
  if (active_file == nullptr) {
    return kWriteFailed;
  }

  auto metadata_str = metadata->SerializeAsString();
  auto report_id = metadata->report_id();

  if (obs_size > max_bytes_per_observation_) {
    LOG(WARNING) << "An observation that was too big was passed in to "
                    "FileObservationStore::StoreObservation(): "
                 << obs_size;
    return kObservationTooBig;
  }

  VLOG(6) << name_ << ": StoreObservation() metric=(" << metadata->customer_id() << ","
          << metadata->project_id() << "," << metadata->metric_id() << "), size=" << obs_size
          << ".";

  size_t estimated_new_byte_count = fields->finalized_bytes + active_file->ByteCount() + obs_size;
  if (estimated_new_byte_count > max_bytes_total_) {
    VLOG(4) << name_ << ": The observation store is full. estimated_new_byte_count="
            << estimated_new_byte_count << " > " << max_bytes_total_ << ".";
    return kStoreFull;
  }

  if (!fields->metadata_written || metadata_str != fields->last_written_metadata) {
    VLOG(5) << name_ << ": Writing observation metadata.";
    FileObservationStoreRecord stored_metadata;
    stored_metadata.mutable_meta_data()->Swap(metadata.get());
    if (!google::protobuf::util::SerializeDelimitedToZeroCopyStream(stored_metadata, active_file)) {
      LOG(WARNING) << name_ << ": Unable to write metadata to `" << active_file_name_ << "`";
      return kWriteFailed;
    }
    // Swap needed to report the customer id and project id to the internal metrics.
    metadata->Swap(stored_metadata.mutable_meta_data());
    fields->metadata_written = true;
    fields->last_written_metadata = metadata_str;
  }

  FileObservationStoreRecord stored_message;
  stored_message.set_contribution_id(observation->contribution_id());
  if (observation->has_encrypted()) {
    stored_message.mutable_encrypted_observation()->Swap(observation->mutable_encrypted());
  } else if (observation->has_unencrypted()) {
    stored_message.mutable_unencrypted_observation()->Swap(observation->mutable_unencrypted());
  } else {
    LOG(ERROR) << "Recieved StoredObservation of unexpected type.";
    return kWriteFailed;
  }
  if (!google::protobuf::util::SerializeDelimitedToZeroCopyStream(stored_message, active_file)) {
    LOG(WARNING) << "Unable to write encrypted_observation to `" << active_file_name_ << "`";
    return kWriteFailed;
  }

  if (active_file->ByteCount() >= static_cast<int64_t>(max_bytes_per_envelope_)) {
    VLOG(4) << name_ << ": In-progress file contains " << active_file->ByteCount()
            << " bytes (>= " << max_bytes_per_envelope_ << "). Finalizing it.";

    if (!FinalizeActiveFile(&fields)) {
      LOG(WARNING) << "Unable to finalize `" << active_file_name_;
      return kWriteFailed;
    }
  }

  num_obs_per_report_[{.customer_id = metadata->customer_id(),
                       .project_id = metadata->project_id(),
                       .metric_id = metadata->metric_id(),
                       .report_id = report_id}]++;
  internal_metrics_->BytesStored(logger::PerProjectBytesStoredMetricDimensionStatus::Succeeded,
                                 obs_size, metadata->customer_id(), metadata->project_id());
  // Cannot cause a loop, since TrackDiskUsage is handled asynchronously.
  internal_metrics_->TrackDiskUsage(logger::InternalMetrics::StorageClass::ObservationStore,
                                    static_cast<int64_t>(estimated_new_byte_count),
                                    static_cast<int64_t>(max_bytes_total_));

  return kOk;
}

bool FileObservationStore::FinalizeActiveFile(
    util::ProtectedFields<Fields>::LockedFieldsPtr *fields) {
  VLOG(6) << name_ << ": FinalizeActiveFile()";
  auto &f = *fields;

  // Close the current file (if it is open).
  f->active_file = nullptr;
  f->metadata_written = false;

  auto filesize = fs_->FileSize(active_file_name_);
  if (filesize.ok()) {
    if (filesize.ConsumeValueOrDie() == 0) {
      // File exists, but is empty. Let's just delete it instead of renaming.
      fs_->Delete(active_file_name_);
      return false;
    }
  } else {
    // if !filesize.ok(), the file likely doesn't even exist.
    return false;
  }

  auto new_name = FullPath(filename_generator_.GenerateFilename());
  if (!fs_->Rename(active_file_name_, new_name)) {
    return false;
  }

  f->finalized_bytes += fs_->FileSize(new_name).ConsumeValueOr(0);
  return true;
}

FileObservationStore::FilenameGenerator::FilenameGenerator()
    : FilenameGenerator([]() {
        return std::chrono::duration_cast<std::chrono::milliseconds>(
                   std::chrono::system_clock::now().time_since_epoch())
            .count();
      }) {}

FileObservationStore::FilenameGenerator::FilenameGenerator(std::function<int64_t()> now)
    : now_(std::move(now)), random_int_(kMinRandomNumber, kMaxRandomNumber) {}

std::string FileObservationStore::FilenameGenerator::GenerateFilename() const {
  std::stringstream date;
  std::stringstream fname;
  date << std::setfill('0') << std::setw(kTimestampWidth) << now_();
  fname << date.str().substr(0, kTimestampWidth) << "-" << random_int_(random_dev_) << ".data";
  return fname.str();
}

std::string FileObservationStore::FullPath(const std::string &filename) const {
  return root_directory_ + "/" + filename;
}

std::string FileObservationStore::FileEnvelopeHolder::FullPath(const std::string &filename) const {
  return root_directory_ + "/" + filename;
}

google::protobuf::io::ZeroCopyOutputStream *FileObservationStore::GetActiveFile(
    util::ProtectedFields<Fields>::LockedFieldsPtr *fields) {
  auto &f = *fields;

  if (f->active_file == nullptr) {
    auto stream_or = fs_->NewProtoOutputStream(active_file_name_);
    if (!stream_or.ok()) {
      LOG_FIRST_N(ERROR, 10) << "Failed to open file. (Perhaps the disk is full): "
                             << stream_or.status().error_message();
      return nullptr;
    }
    f->active_file = stream_or.ConsumeValueOrDie();
  }
  return f->active_file.get();
}

std::vector<std::string> FileObservationStore::ListFinalizedFiles() const {
  auto files = fs_->ListFiles(root_directory_).ConsumeValueOr({});
  std::vector<std::string> retval;
  for (const auto &file : files) {
    if (std::regex_match(file, kFinalizedFileRegex)) {
      retval.push_back(file);
    }
  }
  return retval;
}

StatusOr<std::string> FileObservationStore::GetOldestFinalizedFile(
    util::ProtectedFields<Fields>::LockedFieldsPtr *fields) {
  auto &f = *fields;

  std::string found_file_name;
  for (const auto &file : ListFinalizedFiles()) {
    if (f->files_taken.find(file) == f->files_taken.end()) {
      if (found_file_name.empty()) {
        found_file_name = file;
      } else {
        // We compare the file names to try to find the oldest one. This works
        // because file names are prefixed with the timestamp when they were
        // finalized and that timestamp is always 13 digits long.
        //
        // A lexigraphic order of fixed length number strings is identical to
        // ordering their numerical values.
        auto result = strcmp(file.c_str(), found_file_name.c_str());
        if (result < 0) {
          found_file_name = file;
        }
      }
    }
  }
  if (found_file_name.empty()) {
    return Status(StatusCode::NOT_FOUND, "No finalized file");
  }
  return found_file_name;
}

std::unique_ptr<ObservationStore::EnvelopeHolder> FileObservationStore::TakeNextEnvelopeHolder() {
  auto fields = protected_fields_.lock();

  auto oldest_file_name_or = GetOldestFinalizedFile(&fields);
  if (!oldest_file_name_or.ok()) {
    if (!fields->active_file || fields->active_file->ByteCount() == 0) {
      // Active file isn't open or is empty. Return nullptr.
      return nullptr;
    }
    if (!FinalizeActiveFile(&fields)) {
      // Finalizing the active file failed, no envelope to return.
      return nullptr;
    }
    oldest_file_name_or = GetOldestFinalizedFile(&fields);
    if (!oldest_file_name_or.ok()) {
      return nullptr;
    }
  }

  auto oldest_file_name = oldest_file_name_or.ConsumeValueOrDie();
  fields->files_taken.insert(oldest_file_name);
  return std::make_unique<FileEnvelopeHolder>(fs_, this, root_directory_, oldest_file_name);
}

void FileObservationStore::ReturnEnvelopeHolder(
    std::unique_ptr<ObservationStore::EnvelopeHolder> envelope) {
  std::unique_ptr<FileObservationStore::FileEnvelopeHolder> env(
      static_cast<FileObservationStore::FileEnvelopeHolder *>(envelope.release()));

  auto fields = protected_fields_.lock();
  for (const auto &file_name : env->file_names()) {
    fields->files_taken.erase(file_name);
  }
  env->clear();
}

size_t FileObservationStore::Size() const {
  auto fields = protected_fields_.const_lock();
  auto bytes = fields->finalized_bytes;
  VLOG(4) << name_ << "::Size(): finalized_bytes=" << bytes;
  if (fields->active_file) {
    bytes += fields->active_file->ByteCount();
  } else {
    VLOG(4) << name_ << "::Size(): there is no active file.";
  }
  VLOG(4) << name_ << "::Size(): total_bytes=" << bytes;
  return bytes;
}

bool FileObservationStore::Empty() const { return Size() == 0; }

FileObservationStore::FileEnvelopeHolder::~FileEnvelopeHolder() {
  auto fields = store_->protected_fields_.lock();
  for (const auto &file_name : file_names_) {
    fields->finalized_bytes -= fs_->FileSize(FullPath(file_name)).ConsumeValueOr(0);
    fs_->Delete(FullPath(file_name));
  }
}

void FileObservationStore::FileEnvelopeHolder::MergeWith(
    std::unique_ptr<EnvelopeHolder> container) {
  std::unique_ptr<FileEnvelopeHolder> file_container(
      static_cast<FileEnvelopeHolder *>(container.release()));

  file_names_.insert(file_container->file_names_.begin(), file_container->file_names_.end());

  file_container->file_names_.clear();

  envelope_.Clear();
  cached_file_size_ = 0;
  envelope_read_ = false;
}

const Envelope &FileObservationStore::FileEnvelopeHolder::GetEnvelope(
    util::EncryptedMessageMaker *encrypter) {
  if (envelope_read_) {
    return envelope_;
  }

  std::string serialized_metadata;
  std::unordered_map<std::string, ObservationBatch *> batch_map;
  ObservationBatch *current_batch;

  FileObservationStoreRecord stored;

  for (const auto &file_name : file_names_) {
    auto iis_or = fs_->NewProtoInputStream(FullPath(file_name));
    if (!iis_or.ok()) {
      LOG(ERROR) << "WARNING: Trying to open `" << FullPath(file_name)
                 << "` failed with error: " << iis_or.status().error_message();
      continue;
    }
    auto iis = iis_or.ConsumeValueOrDie();

    bool clean_eof;
    while (
        google::protobuf::util::ParseDelimitedFromZeroCopyStream(&stored, iis.get(), &clean_eof)) {
      if (stored.has_meta_data()) {
        std::unique_ptr<ObservationMetadata> current_metadata(stored.release_meta_data());
        current_metadata->SerializeToString(&serialized_metadata);

        auto iter = batch_map.find(serialized_metadata);
        if (iter != batch_map.end()) {
          current_batch = iter->second;
        } else {
          current_batch = envelope_.add_batch();
          current_batch->set_allocated_meta_data(current_metadata.release());
          batch_map[serialized_metadata] = current_batch;
        }
      } else if (stored.has_encrypted_observation()) {
        stored.mutable_encrypted_observation()->set_contribution_id(stored.contribution_id());
        current_batch->add_encrypted_observation()->Swap(stored.mutable_encrypted_observation());
      } else if (stored.has_unencrypted_observation()) {
        auto *encrypted = current_batch->add_encrypted_observation();
        if (!encrypter->Encrypt(stored.unencrypted_observation(), encrypted)) {
          LOG_FIRST_N(ERROR, 10) << "ERROR: Unable to encrypt observation on read.";
        }
        encrypted->set_contribution_id(stored.contribution_id());
      } else {
        clean_eof = false;
        break;
      }
    }

    if (!clean_eof) {
      VLOG(1) << "WARNING: Trying to read from `" << file_name
              << "` encountered a corrupted message. Returning the envelope that "
                 "has been read so far.";
      break;
    }
  }

  envelope_read_ = true;
  return envelope_;
}

size_t FileObservationStore::FileEnvelopeHolder::Size() {
  if (cached_file_size_ != 0) {
    return cached_file_size_;
  }

  cached_file_size_ = 0;
  for (const auto &file_name : file_names_) {
    cached_file_size_ += fs_->FileSize(FullPath(file_name)).ConsumeValueOr(0);
  }
  return cached_file_size_;
}

void FileObservationStore::DeleteData() {
  LOG(INFO) << "FileObservationStore: Deleting stored data";

  auto fields = protected_fields_.lock();

  FinalizeActiveFile(&fields);

  fields->metadata_written = false;
  fields->last_written_metadata = "";
  fields->active_file = nullptr;
  fields->files_taken = {};
  fields->finalized_bytes = 0;

  auto files = fs_->ListFiles(root_directory_).ConsumeValueOr({});
  for (const auto &file : files) {
    fs_->Delete(FullPath(file));
  }
}

}  // namespace cobalt::observation_store
