// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "peridot/bin/user_runner/storage/story_storage.h"

#include <fuchsia/modular/internal/cpp/fidl.h>
#include <lib/fidl/cpp/clone.h>
#include <lib/fxl/functional/make_copyable.h>

#include "peridot/bin/user_runner/storage/constants_and_utils.h"
#include "peridot/bin/user_runner/storage/story_storage_xdr.h"
#include "peridot/lib/fidl/clone.h"
#include "peridot/lib/ledger_client/operations.h"

namespace modular {

namespace {

// TODO(rosswang): replace with |std::string::starts_with| after C++20
bool StartsWith(const std::string& string, const std::string& prefix) {
  return string.compare(0, prefix.size(), prefix) == 0;
}

}  // namespace

StoryStorage::StoryStorage(LedgerClient* ledger_client,
                           fuchsia::ledger::PageId page_id)
    : PageClient("StoryStorage", ledger_client, page_id, "" /* key_prefix */),
      ledger_client_(ledger_client),
      page_id_(page_id),
      weak_ptr_factory_(this) {
  FXL_DCHECK(ledger_client_ != nullptr);
}

FuturePtr<> StoryStorage::WriteModuleData(ModuleData module_data) {
  auto module_path = fidl::Clone(module_data.module_path);
  return UpdateModuleData(
      module_path, fxl::MakeCopyable([module_data = std::move(module_data)](
                                         ModuleDataPtr* module_data_ptr) {
        *module_data_ptr = ModuleData::New();
        module_data.Clone(module_data_ptr->get());
      }));
}

namespace {
struct UpdateModuleDataState {
  fidl::VectorPtr<fidl::StringPtr> module_path;
  std::function<void(ModuleDataPtr*)> mutate_fn;
  OperationQueue sub_operations;
};
}  // namespace

FuturePtr<> StoryStorage::UpdateModuleData(
    const fidl::VectorPtr<fidl::StringPtr>& module_path,
    std::function<void(ModuleDataPtr*)> mutate_fn) {
  auto op_state = std::make_shared<UpdateModuleDataState>();
  op_state->module_path = fidl::Clone(module_path);
  op_state->mutate_fn = std::move(mutate_fn);

  auto key = MakeModuleKey(module_path);
  auto op_body = [this, op_state, key](OperationBase* op) {
    auto did_read =
        Future<ModuleDataPtr>::Create("StoryStorage.UpdateModuleData.did_read");
    op_state->sub_operations.Add(
        new ReadDataCall<ModuleData>(page(), key, true /* not_found_is_ok */,
                                     XdrModuleData, did_read->Completer()));

    auto did_mutate = did_read->AsyncMap(
        [this, op_state, key](ModuleDataPtr current_module_data) {
          auto new_module_data = CloneOptional(current_module_data);
          op_state->mutate_fn(&new_module_data);

          if (!new_module_data && !current_module_data) {
            return Future<>::CreateCompleted(
                "StoryStorage.UpdateModuleData.did_mutate");
          }

          if (current_module_data) {
            FXL_DCHECK(new_module_data)
                << "StoryStorage::UpdateModuleData(): mutate_fn() must not "
                   "set to null an existing ModuleData record.";
          }
          FXL_DCHECK(new_module_data->module_path == op_state->module_path)
              << "StorageStorage::UpdateModuleData(path, ...): mutate_fn() "
                 "must set "
                 "ModuleData.module_path to |path|.";

          // We complete this Future chain when the Ledger gives us the
          // notification that |module_data| has been written. The Ledger
          // won't do that if the current value for |key| won't change, so
          // we have to short-circuit here.
          if (current_module_data && *current_module_data == *new_module_data) {
            return Future<>::CreateCompleted(
                "StoryStorage.UpdateModuleData.did_mutate");
          }

          auto module_data_copy = CloneOptional(new_module_data);
          std::string expected_value;
          XdrWrite(&expected_value, &module_data_copy, XdrModuleData);

          op_state->sub_operations.Add(new WriteDataCall<ModuleData>(
              page(), key, XdrModuleData, std::move(module_data_copy), [] {}));

          return WaitForWrite(key, expected_value);
        });

    return did_mutate;
  };

  auto ret = Future<>::Create("StoryStorage.UpdateModuleData.ret");
  operation_queue_.Add(NewCallbackOperation(
      "StoryStorage::UpdateModuleData", std::move(op_body), ret->Completer()));
  return ret;
}

FuturePtr<ModuleDataPtr> StoryStorage::ReadModuleData(
    const fidl::VectorPtr<fidl::StringPtr>& module_path) {
  auto key = MakeModuleKey(module_path);
  auto ret = Future<ModuleDataPtr>::Create("StoryStorage.ReadModuleData.ret");
  operation_queue_.Add(
      new ReadDataCall<ModuleData>(page(), key, true /* not_found_is_ok */,
                                   XdrModuleData, ret->Completer()));
  return ret;
}

FuturePtr<fidl::VectorPtr<ModuleData>> StoryStorage::ReadAllModuleData() {
  auto ret = Future<fidl::VectorPtr<ModuleData>>::Create(
      "StoryStorage.ReadAllModuleData.ret");
  operation_queue_.Add(new ReadAllDataCall<ModuleData>(
      page(), kModuleKeyPrefix, XdrModuleData, ret->Completer()));
  return ret;
}

StoryStorage::LinkWatcherAutoCancel StoryStorage::WatchLink(
    const LinkPath& link_path, LinkUpdatedCallback callback) {
  auto it = link_watchers_.emplace(MakeLinkKey(link_path), std::move(callback));

  auto auto_remove = [weak_this = GetWeakPtr(), it] {
    if (!weak_this)
      return;

    weak_this->link_watchers_.erase(it);
  };

  return LinkWatcherAutoCancel(std::move(auto_remove));
}

namespace {
constexpr char kJsonNull[] = "null";

class ReadLinkDataCall
    : public Operation<StoryStorage::Status, fidl::StringPtr> {
 public:
  ReadLinkDataCall(PageClient* page_client, fidl::StringPtr key,
                   ResultCall result_call)
      : Operation("StoryStorage::ReadLinkDataCall", std::move(result_call)),
        page_client_(page_client),
        key_(std::move(key)) {}

 private:
  void Run() override {
    FlowToken flow{this, &status_, &value_};
    status_ = StoryStorage::Status::OK;

    page_snapshot_ = page_client_->NewSnapshot([this, weak_ptr = GetWeakPtr()] {
      if (!weak_ptr) {
        return;
      }
      // An error occurred getting the snapshot. Resetting page_snapshot_
      // will ensure that the FlowToken it has captured below while waiting for
      // a connected channel will be destroyed, and the operation will be
      // complete.
      status_ = StoryStorage::Status::LEDGER_ERROR;
      page_snapshot_ = fuchsia::ledger::PageSnapshotPtr();
    });

    page_snapshot_->Get(
        to_array(key_), [this, flow](fuchsia::ledger::Status status,
                                     fuchsia::mem::BufferPtr value) {
          std::string value_as_string;
          switch (status) {
            case fuchsia::ledger::Status::KEY_NOT_FOUND:
              // Leave value_ as a null-initialized StringPtr.
              return;
            case fuchsia::ledger::Status::OK:
              if (!value) {
                value_ = kJsonNull;
                return;
              }

              if (!fsl::StringFromVmo(*value, &value_as_string)) {
                FXL_LOG(ERROR) << trace_name() << " VMO could not be copied.";
                status_ = StoryStorage::Status::VMO_COPY_ERROR;
                return;
              }
              value_ = value_as_string;
              return;
            default:
              FXL_LOG(ERROR) << trace_name() << " PageSnapshot.Get() "
                             << fidl::ToUnderlying(status);
              status_ = StoryStorage::Status::LEDGER_ERROR;
              return;
          }
        });
  }

  // Input parameters.
  PageClient* const page_client_;
  const fidl::StringPtr key_;

  // Intermediate state.
  fuchsia::ledger::PageSnapshotPtr page_snapshot_;

  // Return values.
  StoryStorage::Status status_;
  fidl::StringPtr value_;

  FXL_DISALLOW_COPY_AND_ASSIGN(ReadLinkDataCall);
};
}  // namespace

FuturePtr<StoryStorage::Status, fidl::StringPtr> StoryStorage::GetLinkValue(
    const LinkPath& link_path) {
  auto key = MakeLinkKey(link_path);
  auto ret = Future<Status, fidl::StringPtr>::Create(
      "StoryStorage::GetLinkValue " + key);
  operation_queue_.Add(new ReadLinkDataCall(this, key, ret->Completer()));
  // We use AsyncMap here, even though we could semantically use Map, because
  // we need to return >1 value and only AsyncMap lets you do that.
  return ret->AsyncMap([](StoryStorage::Status status, fidl::StringPtr value) {
    if (value.is_null()) {
      value = kJsonNull;
    }
    return Future<StoryStorage::Status, fidl::StringPtr>::CreateCompleted(
        "StoryStorage.GetLinkValue.AsyncMap", std::move(status),
        std::move(value));
  });
}

namespace {

class WriteLinkDataCall : public Operation<StoryStorage::Status> {
 public:
  WriteLinkDataCall(PageClient* page_client, fidl::StringPtr key,
                    fidl::StringPtr value, ResultCall result_call)
      : Operation("StoryStorage::WriteLinkDataCall", std::move(result_call)),
        page_client_(page_client),
        key_(std::move(key)),
        value_(std::move(value)) {}

 private:
  void Run() override {
    FlowToken flow{this, &status_};
    status_ = StoryStorage::Status::OK;

    fsl::SizedVmo vmo;
    FXL_CHECK(fsl::VmoFromString(*value_, &vmo));
    page_client_->page()->CreateReferenceFromBuffer(
        std::move(vmo).ToTransport(),
        [this, flow, weak_ptr = GetWeakPtr()](
            fuchsia::ledger::Status status,
            std::unique_ptr<fuchsia::ledger::Reference> reference) {
          if (weak_ptr && reference) {
            PutReference(std::move(reference), flow);
          }
        });
  }

  void PutReference(std::unique_ptr<fuchsia::ledger::Reference> reference,
                    FlowToken flow) {
    page_client_->page()->PutReference(
        to_array(key_), std::move(*reference), fuchsia::ledger::Priority::EAGER,
        [this, flow, weak_ptr = GetWeakPtr()](fuchsia::ledger::Status status) {
          if (!weak_ptr) {
            return;
          }
          if (status != fuchsia::ledger::Status::OK) {
            FXL_LOG(ERROR) << "StoryStorage.WriteLinkDataCall " << key_ << " "
                           << " Page.Put() " << fidl::ToUnderlying(status);
            status_ = StoryStorage::Status::LEDGER_ERROR;
          }
        });
  }

  PageClient* const page_client_;
  fidl::StringPtr key_;
  fidl::StringPtr value_;

  StoryStorage::Status status_;

  FXL_DISALLOW_COPY_AND_ASSIGN(WriteLinkDataCall);
};

// Returns the status of the mutation and the new value. If no mutation
// happened, returns Status::OK and a nullptr.
class UpdateLinkCall : public Operation<StoryStorage::Status, fidl::StringPtr> {
 public:
  UpdateLinkCall(
      PageClient* page_client, std::string key,
      std::function<void(fidl::StringPtr*)> mutate_fn,
      std::function<FuturePtr<>(const std::string&, const std::string&)>
          wait_for_write_fn,
      ResultCall done)
      : Operation("StoryStorage::UpdateLinkCall", std::move(done)),
        page_client_(page_client),
        key_(std::move(key)),
        mutate_fn_(std::move(mutate_fn)),
        wait_for_write_fn_(std::move(wait_for_write_fn)) {}

 private:
  void Run() override {
    FlowToken flow{this, &status_, &new_value_};

    operation_queue_.Add(
        new ReadLinkDataCall(page_client_, key_,
                             [this, flow](StoryStorage::Status status,
                                          fidl::StringPtr current_value) {
                               status_ = status;
                               if (status != StoryStorage::Status::OK) {
                                 return;
                               }

                               Mutate(flow, std::move(current_value));
                             }));
  }

  void Mutate(FlowToken flow, fidl::StringPtr current_value) {
    new_value_ = current_value;
    mutate_fn_(&new_value_);
    rapidjson::Document doc;
    doc.Parse(new_value_);
    if (new_value_.is_null() || doc.HasParseError()) {
      if (!new_value_.is_null()) {
        FXL_LOG(ERROR) << "StoryStorage.UpdateLinkCall.Mutate " << key_
                       << " invalid json: " << doc.GetParseError() << " "
                       << new_value_ << ";";
      }
      status_ = StoryStorage::Status::LINK_INVALID_JSON;
      return;
    }

    if (new_value_ == current_value) {
      // Set the returned new value to null so the caller knows we succeeded
      // but didn't write anything.
      new_value_ = nullptr;
      return;
    }

    operation_queue_.Add(new WriteLinkDataCall(
        page_client_, key_, new_value_,
        [this, flow](StoryStorage::Status status) {
          status_ = status;

          // If we succeeded AND we set a new value, we need to wait for
          // confirmation from the ledger.
          if (status == StoryStorage::Status::OK && new_value_) {
            wait_for_write_fn_(key_, new_value_)->Then([this, flow] {
              Done(std::move(status_), std::move(new_value_));
            });
          }
        }));
  }

  // Input parameters.
  PageClient* const page_client_;
  const std::string key_;
  std::function<void(fidl::StringPtr*)> mutate_fn_;
  std::function<FuturePtr<>(const std::string&, const std::string&)>
      wait_for_write_fn_;

  // Operation runtime state.
  OperationQueue operation_queue_;

  // Return values.
  StoryStorage::Status status_;
  fidl::StringPtr new_value_;

  FXL_DISALLOW_COPY_AND_ASSIGN(UpdateLinkCall);
};

}  // namespace

FuturePtr<StoryStorage::Status> StoryStorage::UpdateLinkValue(
    const LinkPath& link_path, std::function<void(fidl::StringPtr*)> mutate_fn,
    const void* context) {
  // nullptr is reserved for updates that came from other instances of
  // StoryStorage.
  FXL_DCHECK(context != nullptr)
      << "StoryStorage::UpdateLinkValue(..., context) of nullptr is reserved.";

  auto key = MakeLinkKey(link_path);
  auto did_update = Future<Status, fidl::StringPtr>::Create(
      "StoryStorage.UpdateLinkValue.did_update");
  operation_queue_.Add(new UpdateLinkCall(
      this, key, std::move(mutate_fn),
      std::bind(&StoryStorage::WaitForWrite, this, std::placeholders::_1,
                std::placeholders::_2),
      did_update->Completer()));

  // We can't chain this call to the parent future chain because we do
  // not want it to happen at all in the case of errors.
  return did_update->WeakMap(
      GetWeakPtr(), [this, key, context](StoryStorage::Status status,
                                         fidl::StringPtr new_value) {
        // if |new_value| is null, it means we didn't write any new data, even
        // if |status| == OK.
        if (status == StoryStorage::Status::OK && !new_value.is_null()) {
          NotifyLinkWatchers(key, new_value, context);
        }

        return status;
      });
}

FuturePtr<> StoryStorage::Sync() {
  auto ret = Future<>::Create("StoryStorage::Sync.ret");
  operation_queue_.Add(NewCallbackOperation("StoryStorage::Sync",
                                            [](OperationBase* op) {
                                              return Future<>::CreateCompleted(
                                                  "StoryStorage::Sync");
                                            },
                                            ret->Completer()));
  return ret;
}

void StoryStorage::OnPageChange(const std::string& key,
                                const std::string& value) {
  // If there are any operations waiting on this particular write
  // having happened, tell them to continue.
  auto it = pending_writes_.find(std::make_pair(key, value));
  if (it != pending_writes_.end()) {
    auto local_futures = std::move(it->second);
    for (auto fut : local_futures) {
      fut->Complete();
    }

    // Since the above write originated from this StoryStorage instance,
    // we do not notify any listeners.
    return;
  }

  if (StartsWith(key, kLinkKeyPrefix)) {
    NotifyLinkWatchers(key, value, nullptr /* context */);
  } else if (StartsWith(key, kModuleKeyPrefix)) {
    if (on_module_data_updated_) {
      auto module_data = ModuleData::New();
      if (!XdrRead(value, &module_data, XdrModuleData)) {
        FXL_LOG(ERROR) << "Unable to parse ModuleData " << key << " " << value;
        return;
      }
      on_module_data_updated_(std::move(*module_data));
    }
  } else {
    // TODO(thatguy): We store some Link data on the root page (where StoryData
    // is stored) for the user shell to make use of. This means we get notified
    // in that instance of changes we don't care about.
    //
    // Consider putting all story-scoped data under a shared prefix, and use
    // that when initializing the PageClient.
    FXL_LOG(ERROR) << "Unexpected StoryStorage Ledger key prefix: " << key;
  }
}

void StoryStorage::OnPageDelete(const std::string& key) {
  // ModuleData and Link values are never deleted, although it is
  // theoretically possible that conflict resolution results in a key
  // disappearing. We do not currently do this.
}

void StoryStorage::OnPageConflict(Conflict* conflict) {
  // TODO(thatguy): Add basic conflict resolution. We can force a conflict for
  // link data in tests by using Page.StartTranscation() in UpdateLinkValue().
  FXL_LOG(WARNING) << "StoryStorage::OnPageConflict() for link key "
                   << to_string(conflict->key);
}

void StoryStorage::NotifyLinkWatchers(const std::string& link_key,
                                      fidl::StringPtr value,
                                      const void* context) {
  auto range = link_watchers_.equal_range(link_key);
  for (auto it = range.first; it != range.second; ++it) {
    it->second(value, context);
  }
}

FuturePtr<> StoryStorage::WaitForWrite(const std::string& key,
                                       const std::string& value) {
  // TODO(thatguy): It is possible that through conflict resolution, the write
  // we expect to get will never arrive.  We must have the conflict resolver
  // update |pending_writes_| with the result of conflict resolution.
  auto did_see_write =
      Future<>::Create("StoryStorage.WaitForWrite.did_see_write");
  pending_writes_[std::make_pair(key, value)].push_back(did_see_write);
  return did_see_write;
}

fxl::WeakPtr<StoryStorage> StoryStorage::GetWeakPtr() {
  return weak_ptr_factory_.GetWeakPtr();
}

}  // namespace modular
