// Copyright 2017 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include <memory>

#include "gtest/gtest.h"
#include "lib/fxl/macros.h"
#include "peridot/lib/fidl/array_to_string.h"
#include "peridot/lib/ledger_client/ledger_client.h"
#include "peridot/lib/ledger_client/page_client.h"
#include "peridot/lib/ledger_client/page_id.h"
#include "peridot/lib/testing/test_with_ledger.h"

namespace modular {
namespace testing {
namespace {

class PageClientImpl : PageClient {
 public:
  PageClientImpl(LedgerClient* ledger_client,
                 LedgerPageId page_id,
                 std::string prefix = "")
      : PageClient("PageClientImpl",
                   ledger_client,
                   std::move(page_id),
                   std::move(prefix)) {}

  ~PageClientImpl() override = default;

  void OnPageChange(const std::string& key, const std::string& value) override {
    ++change_count_;

    values_[key] = value;

    FXL_LOG(INFO) << "OnPageChange \"" << prefix() << "\""
                  << " " << change_count_ << " " << key << " " << value;
  }

  void OnPageConflict(Conflict* const conflict) override {
    ++conflict_count_;

    FXL_LOG(INFO) << "OnPageConflict " << prefix() << " " << conflict_count_
                  << " " << to_string(conflict->key) << " " << conflict->left
                  << " " << conflict->right;

    conflict->resolution = MERGE;
    conflict->merged = "value3";
  }

  int change_count() const { return change_count_; }
  int conflict_count() const { return conflict_count_; }

  const std::string& value(const std::string& key) { return values_[key]; }
  ledger::Page* page() { return PageClient::page(); }

 private:
  std::map<std::string, std::string> values_;
  int change_count_{};
  int conflict_count_{};
};

class PageClientTest : public TestWithLedger {
 public:
  PageClientTest() : page_id_(MakePageId("0123456789123456")) {}

  ~PageClientTest() = default;

  void SetUp() override {
    TestWithLedger::SetUp();
    // We only handle one conflict resolution per test case for now.
    ledger_client()->add_watcher([this] { resolved_ = true; });

    // TODO(mesch): Registration order matters for overlapping prefixes. This
    // should not be like that.
    page_client_a_.reset(new PageClientImpl(ledger_client(), page_id_, "a/"));
    page_client_b_.reset(new PageClientImpl(ledger_client(), page_id_, "b/"));
    page_client_.reset(new PageClientImpl(ledger_client(), page_id_));

    ledger_client()->ledger()->GetPage(
        std::make_unique<ledger::PageId>(page_id_), page_.NewRequest(),
        [](ledger::Status status) { ASSERT_EQ(ledger::Status::OK, status); });
  }

  void TearDown() override {
    page_client_.reset();
    page_client_a_.reset();
    page_client_b_.reset();

    TestWithLedger::TearDown();
  }

  PageClientImpl* page_client() { return page_client_.get(); }
  ledger::Page* page() { return page_.get(); }

  PageClientImpl* page_client_a() { return page_client_a_.get(); }
  PageClientImpl* page_client_b() { return page_client_b_.get(); }

  // Shorthand for the two connections to the page, and the two clients on the
  // first connection.
  ledger::Page* page1() { return page_client_->page(); }
  ledger::Page* page1a() { return page_client_a_->page(); }
  ledger::Page* page1b() { return page_client_b_->page(); }
  ledger::Page* page2() { return page_.get(); }

  // Factory for a ledger callback function that just logs errors.
  std::function<void(ledger::Status)> log(std::string context) {
    return [context](ledger::Status status) {
      EXPECT_EQ(ledger::Status::OK, status) << context;
    };
  }

  void Finish() { finished_ = true; }

  // Runs the message loop until after Finish() completes, then optionally until
  // conflicts are resolved, then until the observed |condition| becomes true.
  void Run(const bool wait_for_resolved, std::function<bool()> condition) {
    RunLoopUntilWithTimeout(
        [ this, wait_for_resolved, condition = std::move(condition) ] {
          // First wait for Finish() to be called.
          if (!finished_) {
            return false;
          }

          // Then wait for conflict resolution to complete.
          if (wait_for_resolved && !resolved_) {
            return false;
          }

          // Finally, wait for the observed condition. TODO(mesch): This is
          // lame, because test failure now requires us to time out, however the
          // actual condition to wait for -- that the conflict is resolved and
          // watcher notifications are dispatched -- is not observable from
          // here.
          if (!condition()) {
            return false;
          }

          return true;
        },

        // NOTE(mesch): Test cases here take about 300ms when running in CI.
        // Occasionally they take much longer, presumably because of load on
        // shared machines. With the default timeout, we see flakiness. Cf.
        // FW-287.
        fxl::TimeDelta::FromSeconds(10));
  }

 private:
  const LedgerPageId page_id_;

  // Separate page clients for different prefixes all for the same page. They
  // share the same page connection since they are created on the same ledger
  // client.
  std::unique_ptr<PageClientImpl> page_client_;
  std::unique_ptr<PageClientImpl> page_client_a_;
  std::unique_ptr<PageClientImpl> page_client_b_;

  // A separate connection to the same page as page_client_, used to create
  // conflicts.
  ledger::PagePtr page_;

  // Used by Finish() and Run();
  bool finished_{};
  bool resolved_{};

  FXL_DISALLOW_COPY_AND_ASSIGN(PageClientTest);
};

// This test is flaky. https://fuchsia.atlassian.net/browse/MI4-797
TEST_F(PageClientTest, DISABLED_SimpleWrite) {
  page1()->Put(to_array("key"), to_array("value"), log("Put"));
  Finish();

  Run(false, [this] { return page_client()->value("key") == "value"; });

  EXPECT_EQ(0, page_client()->conflict_count());
  EXPECT_EQ("value", page_client()->value("key"));
}

TEST_F(PageClientTest, PrefixWrite) {
  page2()->Put(to_array("a/key"), to_array("value"), log("Put"));
  page2()->Put(to_array("b/key"), to_array("value"), log("Put"));
  Finish();

  Run(false, [this] {
    return page_client_a()->value("a/key") == "value" &&
           page_client_b()->value("b/key") == "value";
  });

  EXPECT_EQ(0, page_client()->conflict_count());
  EXPECT_EQ(0, page_client_a()->conflict_count());
  EXPECT_EQ(0, page_client_b()->conflict_count());
  EXPECT_EQ("value", page_client_a()->value("a/key"));
  EXPECT_EQ("value", page_client_b()->value("b/key"));
}

TEST_F(PageClientTest, ConcurrentWrite) {
  page1()->Put(to_array("key1"), to_array("value1"), log("Put key1"));
  page2()->Put(to_array("key2"), to_array("value2"), log("Put key2"));
  Finish();

  Run(false, [this] {
    return page_client()->value("key1") == "value1" &&
           page_client()->value("key2") == "value2";
  });

  EXPECT_EQ(0, page_client()->conflict_count());
  EXPECT_EQ("value1", page_client()->value("key1"));
  EXPECT_EQ("value2", page_client()->value("key2"));
}

TEST_F(PageClientTest, ConflictWrite) {
  page2()->StartTransaction([this](ledger::Status status) {
    EXPECT_EQ(ledger::Status::OK, status);
    page2()->Put(
        to_array("key"), to_array("value2"), [this](ledger::Status status) {
          EXPECT_EQ(ledger::Status::OK, status);
          page1()->StartTransaction([this](ledger::Status status) {
            EXPECT_EQ(ledger::Status::OK, status);
            page1()->Put(to_array("key"), to_array("value1"),
                         [this](ledger::Status status) {
                           EXPECT_EQ(ledger::Status::OK, status);
                           page2()->Commit([this](ledger::Status status) {
                             EXPECT_EQ(ledger::Status::OK, status);
                             page1()->Commit([this](ledger::Status status) {
                               EXPECT_EQ(ledger::Status::OK, status);
                               Finish();
                             });
                           });
                         });
          });
        });
  });

  Run(true, [this] { return page_client()->value("key") == "value3"; });

  EXPECT_EQ(1, page_client()->conflict_count());
  EXPECT_EQ("value3", page_client()->value("key"));
}

TEST_F(PageClientTest, ConflictPrefixWrite) {
  page2()->StartTransaction([this](ledger::Status status) {
    EXPECT_EQ(ledger::Status::OK, status);
    page2()->Put(
        to_array("a/key"), to_array("value2"), [this](ledger::Status status) {
          EXPECT_EQ(ledger::Status::OK, status);
          page1()->StartTransaction([this](ledger::Status status) {
            EXPECT_EQ(ledger::Status::OK, status);
            page1()->Put(to_array("a/key"), to_array("value1"),
                         [this](ledger::Status status) {
                           EXPECT_EQ(ledger::Status::OK, status);
                           page2()->Commit([this](ledger::Status status) {
                             EXPECT_EQ(ledger::Status::OK, status);
                             page1()->Commit([this](ledger::Status status) {
                               EXPECT_EQ(ledger::Status::OK, status);
                               Finish();
                             });
                           });
                         });
          });
        });
  });

  Run(true, [this] { return page_client_a()->value("a/key") == "value3"; });

  EXPECT_EQ(1, page_client_a()->conflict_count());
  EXPECT_EQ(0, page_client_b()->conflict_count());
  EXPECT_EQ("value3", page_client_a()->value("a/key"));
}

TEST_F(PageClientTest, ConcurrentConflictWrite) {
  page2()->StartTransaction([this](ledger::Status status) {
    EXPECT_EQ(ledger::Status::OK, status);
    page2()->Put(to_array("key2"), to_array("value2"), log("Put 2 key2"));
    page2()->Put(
        to_array("key"), to_array("value2"), [this](ledger::Status status) {
          EXPECT_EQ(ledger::Status::OK, status);
          page1()->StartTransaction([this](ledger::Status status) {
            EXPECT_EQ(ledger::Status::OK, status);
            page1()->Put(to_array("key1"), to_array("value1"),
                         log("Put 1 key1"));
            page1()->Put(to_array("key"), to_array("value1"),
                         [this](ledger::Status status) {
                           EXPECT_EQ(ledger::Status::OK, status);
                           page2()->Commit([this](ledger::Status status) {
                             EXPECT_EQ(ledger::Status::OK, status);
                             page1()->Commit([this](ledger::Status status) {
                               EXPECT_EQ(ledger::Status::OK, status);
                               Finish();
                             });
                           });
                         });
          });
        });
  });

  Run(true, [this] {
    return page_client()->value("key") == "value3" &&
           page_client()->value("key1") == "value1" &&
           page_client()->value("key2") == "value2";
  });

  EXPECT_EQ(1, page_client()->conflict_count());
  EXPECT_EQ("value1", page_client()->value("key1"));
  EXPECT_EQ("value2", page_client()->value("key2"));
  EXPECT_EQ("value3", page_client()->value("key"));
}

}  // namespace
}  // namespace testing
}  // namespace modular
