// Copyright 2017 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "encoder/shipping_manager.h"

#include <chrono>
#include <memory>
#include <set>
#include <string>
#include <utility>
#include <vector>

#include "./clearcut_extensions.pb.h"
#include "./gtest.h"
#include "./logging.h"
#include "config/client_config.h"
#include "encoder/client_secret.h"
#include "encoder/encoder.h"
#include "encoder/fake_system_data.h"
#include "encoder/memory_observation_store.h"
#include "encoder/observation_store.h"
#include "encoder/project_context.h"
// Generated from shipping_manager_test_config.yaml
#include "encoder/shipping_manager_test_config.h"
#include "third_party/clearcut/clearcut.pb.h"
#include "third_party/gflags/include/gflags/gflags.h"

namespace cobalt {
namespace encoder {

using cobalt::clearcut_extensions::LogEventExtension;
using config::ClientConfig;
using send_retryer::CancelHandle;
using send_retryer::SendRetryer;
using statusor::StatusOr;
using util::EncryptedMessageMaker;

namespace {

// These values must match the values specified in the invocation of
// generate_test_config_h() in CMakeLists.txt. and in the invocation of
// cobalt_config_header("generate_shipping_manager_test_config") in BUILD.gn.
const uint32_t kCustomerId = 1;
const uint32_t kProjectId = 1;

const size_t kNoOpEncodingByteOverhead = 30;
const size_t kMaxBytesPerObservation = 50;
const size_t kMaxBytesPerEnvelope = 200;
const size_t kMaxBytesTotal = 1000;

const std::chrono::seconds kInitialRpcDeadline(10);
const std::chrono::seconds kDeadlinePerSendAttempt(60);
const std::chrono::seconds kMaxSeconds = UploadScheduler::kMaxSeconds;

// Returns a ProjectContext obtained by parsing the configuration specified
// in shipping_manager_test_config.yaml
std::shared_ptr<ProjectContext> GetTestProject() {
  // Parse the base64-encoded, serialized CobaltRegistry in
  // shipping_manager_test_config.h. This is generated from
  // shipping_manager_test_config.yaml. Edit that yaml file to make changes. The
  // variable name below, |kCobaltRegistryBase64|, must match what is
  // specified in the build files.
  std::unique_ptr<ClientConfig> client_config =
      ClientConfig::CreateFromCobaltRegistryBase64(kCobaltRegistryBase64);
  EXPECT_NE(nullptr, client_config);

  return std::shared_ptr<ProjectContext>(new ProjectContext(
      kCustomerId, kProjectId,
      std::shared_ptr<ClientConfig>(client_config.release())));
}

struct FakeSendRetryer : public SendRetryerInterface {
  explicit FakeSendRetryer(uint32_t metric_id = kDefaultMetricId)
      : SendRetryerInterface(), metric_id(metric_id) {}

  grpc::Status SendToShuffler(
      std::chrono::seconds initial_rpc_deadline,
      std::chrono::seconds overerall_deadline,
      send_retryer::CancelHandle* cancel_handle,
      const EncryptedMessage& encrypted_message) override {
    // Decrypt encrypted_message. (No actual decryption is involved since
    // we used the NONE encryption scheme.)
    util::MessageDecrypter decrypter("");
    Envelope recovered_envelope;
    EXPECT_TRUE(
        decrypter.DecryptMessage(encrypted_message, &recovered_envelope));
    EXPECT_EQ(1, recovered_envelope.batch_size());
    EXPECT_EQ(metric_id, recovered_envelope.batch(0).meta_data().metric_id());

    std::unique_lock<std::mutex> lock(mutex);
    send_call_count++;
    observation_count +=
        recovered_envelope.batch(0).encrypted_observation_size();
    // We grab the return value before we block. This allows the test
    // thread to wait for us to block, then change the value of
    // status_to_return for the *next* send without changing it for
    // the currently blocking send.
    grpc::Status status = status_to_return;
    if (should_block) {
      is_blocking = true;
      send_is_blocking_notifier.notify_all();
      send_can_exit_notifier.wait(lock, [this] { return !should_block; });
      is_blocking = false;
    }
    return status;
  }

  std::mutex mutex;
  bool should_block = false;
  std::condition_variable send_can_exit_notifier;
  bool is_blocking = false;
  std::condition_variable send_is_blocking_notifier;
  grpc::Status status_to_return = grpc::Status::OK;
  int send_call_count = 0;
  int observation_count = 0;
  uint32_t metric_id;
};

class FakeHTTPClient : public clearcut::HTTPClient {
 public:
  std::future<StatusOr<clearcut::HTTPResponse>> Post(
      clearcut::HTTPRequest request,
      std::chrono::steady_clock::time_point _ignored) {
    util::MessageDecrypter decrypter("");

    clearcut::LogRequest req;
    req.ParseFromString(request.body);
    EXPECT_GT(req.log_event_size(), 0);
    for (auto event : req.log_event()) {
      EXPECT_TRUE(event.HasExtension(LogEventExtension::ext));
      auto log_event = event.GetExtension(LogEventExtension::ext);
      Envelope recovered_envelope;
      EXPECT_TRUE(decrypter.DecryptMessage(
          log_event.cobalt_encrypted_envelope(), &recovered_envelope));
      EXPECT_EQ(1, recovered_envelope.batch_size());
      EXPECT_EQ(kClearcutMetricId,
                recovered_envelope.batch(0).meta_data().metric_id());
      observation_count +=
          recovered_envelope.batch(0).encrypted_observation_size();
    }
    send_call_count++;

    clearcut::HTTPResponse response;
    response.http_code = 200;
    clearcut::LogResponse resp;
    resp.SerializeToString(&response.response);

    std::promise<StatusOr<clearcut::HTTPResponse>> response_promise;
    response_promise.set_value(std::move(response));

    return response_promise.get_future();
  }

  int send_call_count = 0;
  int observation_count = 0;
};
}  // namespace

class ShippingManagerTest : public ::testing::Test {
 public:
  ShippingManagerTest()
      : encrypt_to_shuffler_("", EncryptedMessage::NONE),
        encrypt_to_analyzer_("", EncryptedMessage::NONE),
        observation_store_(kMaxBytesPerObservation, kMaxBytesPerEnvelope,
                           kMaxBytesTotal),
        project_(GetTestProject()),
        encoder_(project_, ClientSecret::GenerateNewSecret(), &system_data_) {}

 protected:
  void Init(std::chrono::seconds schedule_interval,
            std::chrono::seconds min_interval,
            uint32_t metric_id = kDefaultMetricId) {
    send_retryer_.reset(new FakeSendRetryer(metric_id));
    UploadScheduler upload_scheduler(schedule_interval, min_interval);
    LegacyShippingManager::SendRetryerParams send_retryer_params(
        kInitialRpcDeadline, kDeadlinePerSendAttempt);
    if (metric_id == kDefaultMetricId) {
      shipping_manager_.reset(new LegacyShippingManager(
          upload_scheduler, &observation_store_, &encrypt_to_shuffler_,
          send_retryer_params, send_retryer_.get()));
    } else {
      auto http_client = std::make_unique<FakeHTTPClient>();
      http_client_ = http_client.get();
      shipping_manager_.reset(new ClearcutV1ShippingManager(
          upload_scheduler, &observation_store_, &encrypt_to_shuffler_,
          std::make_unique<clearcut::ClearcutUploader>(
              "https://test.com", std::move(http_client))));
    }
    shipping_manager_->Start();
  }

  ObservationStore::StoreStatus AddObservation(
      size_t num_bytes, uint32_t metric_id = kDefaultMetricId) {
    CHECK(num_bytes > kNoOpEncodingByteOverhead) << " num_bytes=" << num_bytes;
    Encoder::Result result = encoder_.EncodeString(
        metric_id, kNoOpEncodingId,
        std::string(num_bytes - kNoOpEncodingByteOverhead, 'x'));
    auto message = std::make_unique<EncryptedMessage>();
    EXPECT_TRUE(
        encrypt_to_analyzer_.Encrypt(*result.observation, message.get()));
    auto retval = observation_store_.AddEncryptedObservation(
        std::move(message), std::move(result.metadata));
    shipping_manager_->NotifyObservationsAdded();
    return retval;
  }

  void CheckCallCount(int expected_call_count, int expected_observation_count) {
    std::unique_lock<std::mutex> lock(send_retryer_->mutex);
    EXPECT_EQ(expected_call_count, send_retryer_->send_call_count);
    EXPECT_EQ(expected_observation_count, send_retryer_->observation_count);
  }

  void CheckHTTPCallCount(int expected_call_count,
                          int expected_observation_count) {
    ASSERT_NE(nullptr, http_client_);
    EXPECT_EQ(expected_call_count, http_client_->send_call_count);
    EXPECT_EQ(expected_observation_count, http_client_->observation_count);
  }

  EncryptedMessageMaker encrypt_to_shuffler_;
  EncryptedMessageMaker encrypt_to_analyzer_;
  MemoryObservationStore observation_store_;
  FakeSystemData system_data_;
  std::unique_ptr<FakeSendRetryer> send_retryer_;
  std::unique_ptr<ShippingManager> shipping_manager_;
  std::shared_ptr<ProjectContext> project_;
  FakeHTTPClient* http_client_ = nullptr;
  Encoder encoder_;
};

// We construct a ShippingManager and destruct it without calling any methods.
// This tests that the destructor requests that the worker thread terminate
// and then waits for it to terminate.
TEST_F(ShippingManagerTest, ConstructAndDestruct) {
  Init(kMaxSeconds, kMaxSeconds);
}

// We construct a ShippingManager and add one small Observation to it.
// Before the ShippingManager has a chance to send the Observation we
// destruct it. We test that the Add() returns OK and the destructor
// succeeds.
TEST_F(ShippingManagerTest, AddOneObservationAndDestruct) {
  Init(kMaxSeconds, kMaxSeconds);
  EXPECT_EQ(ObservationStore::kOk, AddObservation(40));
}

// We add one Observation, confirm that it is not immediately sent,
// invoke RequestSendSoon, wait for the Observation to be sent, confirm
// that it was sent.
TEST_F(ShippingManagerTest, SendOne) {
  // Init with a very long time for the regular schedule interval but
  // zero for the minimum interval so the test doesn't have to wait.
  Init(kMaxSeconds, std::chrono::seconds::zero());
  // Add one Observation().
  EXPECT_EQ(ObservationStore::kOk, AddObservation(40));

  // Confirm it has not been sent yet.
  CheckCallCount(0, 0);

  // Invoke RequestSendSoon.
  shipping_manager_->RequestSendSoon();

  // Wait for it to be sent.
  shipping_manager_->WaitUntilIdle(kMaxSeconds);

  // Confirm it has been sent.
  EXPECT_EQ(1u, shipping_manager_->num_send_attempts());
  EXPECT_EQ(0u, shipping_manager_->num_failed_attempts());
  EXPECT_EQ(grpc::OK, shipping_manager_->last_send_status().error_code());
  CheckCallCount(1, 1);
}

// We add two Observations, confirm that they are not immediately sent,
// invoke RequestSendSoon, wait for the Observations to be sent, confirm
// that they were sent together in a single Envelope.
TEST_F(ShippingManagerTest, SendTwo) {
  // Init with a very long time for the regular schedule interval but
  // zero for the minimum interval so the test doesn't have to wait.
  Init(kMaxSeconds, std::chrono::seconds::zero());

  // Add two observations.
  EXPECT_EQ(ObservationStore::kOk, AddObservation(40));
  EXPECT_EQ(ObservationStore::kOk, AddObservation(40));

  // Confirm they have not been sent.
  CheckCallCount(0, 0);

  // Request send soon.
  shipping_manager_->RequestSendSoon();

  // Wait for both Observations to be sent.
  shipping_manager_->WaitUntilIdle(kMaxSeconds);

  // Confirm the two Observations were sent together in a single Envelope.
  EXPECT_EQ(1u, shipping_manager_->num_send_attempts());
  EXPECT_EQ(0u, shipping_manager_->num_failed_attempts());
  EXPECT_EQ(grpc::OK, shipping_manager_->last_send_status().error_code());
  CheckCallCount(1, 2);
}

// Trys to add an Observation that is too big. Tests that kObservationTooBig
// is returned.
TEST_F(ShippingManagerTest, ObservationTooBig) {
  // Init with a very long time for the regular schedule interval but
  // zero for the minimum interval so the test doesn't have to wait.
  Init(kMaxSeconds, std::chrono::seconds::zero());

  // Add one observation that is too big.
  EXPECT_EQ(ObservationStore::kObservationTooBig, AddObservation(60));
}

// Add multiple Observations and allow them to be sent on the regular
// schedule.
TEST_F(ShippingManagerTest, ScheduledSend) {
  // We set both schedule_interval_ and min_interval_ to zero so the test
  // does not have to wait.
  Init(std::chrono::seconds::zero(), std::chrono::seconds::zero());

  // Add two Observations but do not invoke RequestSendSoon() and do
  // not add enough Observations to exceed envelope_send_threshold_size_.
  for (int i = 0; i < 2; i++) {
    EXPECT_EQ(ObservationStore::kOk, AddObservation(40));
  }
  // Wait for the scheduled send
  shipping_manager_->WaitUntilIdle(kMaxSeconds);

  // We do not check the number of sends because that depends on the
  // timing interaction of the test thread and the worker thread and so it
  // would be flaky. Just check that all 3 Observations were sent.
  std::unique_lock<std::mutex> lock(send_retryer_->mutex);
  EXPECT_EQ(2, send_retryer_->observation_count);
  EXPECT_EQ(grpc::OK, shipping_manager_->last_send_status().error_code());
}

// Tests the ShippingManager in the case that the ObservationStore returns
// kStoreFull.
//
// kMaxBytesTotal = 1000 and we are using Observations of size 40 bytes.
// 40 * 25 = 1000. The MemoryObservationStore will allow the 26th Observation
// to be added and then reject the 27th Observation.
TEST_F(ShippingManagerTest, ExceedMaxBytesTotal) {
  // Init with a very long time for the regular schedule interval but
  // zero for the minimum interval so the test doesn't have to wait.
  Init(kMaxSeconds, std::chrono::seconds::zero());

  // Configure the FakeSendRetryer to fail every time.
  {
    std::unique_lock<std::mutex> lock(send_retryer_->mutex);
    send_retryer_->status_to_return = grpc::Status::CANCELLED;
  }

  // We can add 15 observations without the ObservationStore reporting that
  // it is almost full.
  for (int i = 0; i < 15; i++) {
    EXPECT_EQ(ObservationStore::kOk, AddObservation(40));
    shipping_manager_->WaitUntilWorkerWaiting(kMaxSeconds);
  }

  // The ObservationStore was never almost full so the ShippingManager
  // should not have attempted to do a send.
  EXPECT_EQ(0u, shipping_manager_->num_send_attempts());
  EXPECT_EQ(0u, shipping_manager_->num_failed_attempts());

  // The sixteenth Observation causes the ObservationStore to become almost
  // full and that causes the ShippingManager to attempt a send.
  EXPECT_EQ(ObservationStore::kOk, AddObservation(40));
  shipping_manager_->WaitUntilWorkerWaiting(kMaxSeconds);
  auto num_send_attempts = shipping_manager_->num_send_attempts();
  EXPECT_GT(num_send_attempts, 0u);
  EXPECT_EQ(num_send_attempts, shipping_manager_->num_failed_attempts());
  EXPECT_EQ(grpc::CANCELLED,
            shipping_manager_->last_send_status().error_code());

  // We can add 10 more Observations bringing the total to 26,
  // without the ObservationStore being full.
  for (int i = 0; i < 10; i++) {
    EXPECT_EQ(ObservationStore::kOk, AddObservation(40));
  }
  shipping_manager_->WaitUntilWorkerWaiting(kMaxSeconds);

  // Because the ObservationStore is almost full the ShippingManager has
  // been attempting additional sends.
  EXPECT_GT(shipping_manager_->num_send_attempts(), num_send_attempts);
  num_send_attempts = shipping_manager_->num_send_attempts();
  EXPECT_EQ(num_send_attempts, shipping_manager_->num_failed_attempts());
  EXPECT_EQ(grpc::CANCELLED,
            shipping_manager_->last_send_status().error_code());

  // The 27th Observation will be rejected with kStoreFull.
  EXPECT_EQ(ObservationStore::kStoreFull, AddObservation(40));

  // Now configure the FakeSendRetryer to start succeeding,
  // and reset the counts.
  {
    std::unique_lock<std::mutex> lock(send_retryer_->mutex);
    send_retryer_->status_to_return = grpc::Status::OK;
    send_retryer_->send_call_count = 0;
    send_retryer_->observation_count = 0;
  }

  // Send all 26 of the accumulated Observations.
  shipping_manager_->RequestSendSoon();
  shipping_manager_->WaitUntilIdle(kMaxSeconds);

  // All 26 successfully-added Observations should have been sent in six
  // envelopes
  CheckCallCount(6, 26);
  EXPECT_EQ(grpc::OK, shipping_manager_->last_send_status().error_code());
  EXPECT_GT(shipping_manager_->num_send_attempts(), num_send_attempts);
  num_send_attempts = shipping_manager_->num_send_attempts();
  EXPECT_GT(num_send_attempts, shipping_manager_->num_failed_attempts());
  EXPECT_EQ(grpc::OK, shipping_manager_->last_send_status().error_code());

  // Now we can add a 27th Observation and send it.
  EXPECT_EQ(ObservationStore::kOk, AddObservation(40));
  shipping_manager_->RequestSendSoon();
  shipping_manager_->WaitUntilIdle(kMaxSeconds);
  CheckCallCount(7, 27);
  EXPECT_EQ(grpc::OK, shipping_manager_->last_send_status().error_code());
}

// Tests that when the total amount of accumulated Observation data exceeds
// total_bytes_send_threshold_  then RequestSendSoon() will be invoked.
TEST_F(ShippingManagerTest, TotalBytesSendThreshold) {
  // Init with a very long time for the regular schedule interval but
  // zero for the minimum interval so the test doesn't have to wait.
  Init(kMaxSeconds, std::chrono::seconds::zero());

  // Configure the FakeSendRetryer to fail every time so that we can
  // accumulate Observation data in memory.
  {
    std::unique_lock<std::mutex> lock(send_retryer_->mutex);
    send_retryer_->status_to_return = grpc::Status::CANCELLED;
  }

  // total_bytes_send_threshold_ = 0.6 * max_bytes_total_.
  // kMaxBytesTotal = 1000 so total_bytes_send_threshold_ = 600.
  // We are using Observations of size 40 and 40 * 15 = 600 so the first
  // Observation that causes us to exceed total_bytes_send_threshold_ is #16.
  //
  // Add 15 Observations. We want to do this in such a way that we don't
  // exceed max_bytes_per_envelope_.  Each time we will invoke
  // RequestSendSoon() and then WaitUntilWorkerWaiting() so that we know that
  // between invocations of AddObservtion() the worker thread will complete
  // one execution of SendAllEnvelopes().
  for (int i = 0; i < 15; i++) {
    EXPECT_EQ(ObservationStore::kOk, AddObservation(40));
    if (i < 15) {
      // After having added 15 observations we have exceeded the
      // ObservationStore's almost_full_threshold_ and this means that each
      // invocation of AddEncryptedObservation() followed by a
      // NotifyObservationsAdded() automatically invokes RequestSendSoon() and
      // so we don't want to invoke it again here.
      shipping_manager_->RequestSendSoon();
    }
    shipping_manager_->WaitUntilWorkerWaiting(kMaxSeconds);
  }

  // We expect there to have been 45 calls to SendRetryer::SendToShuffler() in
  // which the Envelopes sent contained a total of 195 Observations. Here we
  // explain how this was calculated. See the comments at the top of the file on
  // kMinEnvelopeSendSize. There it is explained that the ObservationStore will
  // attempt to bundle together up to 5 observations into a sinle envelope
  // before sending. None of the sends succeed so the ObservationStore keeps
  // accumulating more Envelopes containing 5 Observations that failed to send.
  // Below is the complete pattern of send attempts. Each set in braces
  // represents one execution of SendAllEnvelopes(). The numbers in each set
  // represent the invocations of SendEnvelopeToBackend() with an Envelope that
  // contains that many observations.
  //
  // {1, 1, 1}, {2, 2, 2}, {3, 3, 3}, {4, 4, 4}, {5, 5, 5}, {5, 5, 5}, ...
  //
  // Thus the total number of send attempts is the total number of numbers:
  // 3 * 15 = 45
  //
  // And the total number of Observations is the sum of all the numbers:
  // (1 + 2 + 3 + 4 + 5) * 3 + (5*3*(15-5)) = 195
  CheckCallCount(45, 195);

  // Now configure the FakeSendRetryer to start succeeding,
  // and reset the counts.
  {
    std::unique_lock<std::mutex> lock(send_retryer_->mutex);
    send_retryer_->status_to_return = grpc::Status::OK;
    send_retryer_->send_call_count = 0;
    send_retryer_->observation_count = 0;
  }

  // Now we send the 16th Observattion. But notice that we do *not* invoke
  // RequestSendSoon() this time. So the reason the Observations get sent
  // now is because we are exceeding total_bytes_send_threshold_.
  EXPECT_EQ(ObservationStore::kOk, AddObservation(40));

  shipping_manager_->WaitUntilIdle(kMaxSeconds);

  // All 16 Observations should have been sent in 4 envelopes as {5, 5, 5, 1}.
  CheckCallCount(4, 16);
}

// Test the version of the method RequestSendSoon() that takes a callback.
// We test that the callback is invoked with success = true when the send
// succeeds and with success = false when the send fails.
TEST_F(ShippingManagerTest, RequestSendSoonWithCallback) {
  Init(kMaxSeconds, std::chrono::seconds::zero());

  // Invoke RequestSendSoon() with a callback before any Observations are
  // added.
  bool captured_success_arg = false;
  shipping_manager_->RequestSendSoon([&captured_success_arg](bool success) {
    captured_success_arg = success;
  });
  shipping_manager_->WaitUntilIdle(kMaxSeconds);

  // Check that the callback was invoked synchronously with success = true.
  CheckCallCount(0, 0);
  EXPECT_EQ(0u, shipping_manager_->num_send_attempts());
  EXPECT_EQ(0u, shipping_manager_->num_failed_attempts());
  EXPECT_TRUE(captured_success_arg);

  // Arrange for the first send to fail.
  {
    std::unique_lock<std::mutex> lock(send_retryer_->mutex);
    send_retryer_->status_to_return = grpc::Status::CANCELLED;
  }

  // Add an Observation, invoke RequestSendSoon() with a callback.
  shipping_manager_->WaitUntilIdle(kMaxSeconds);
  EXPECT_EQ(ObservationStore::kOk,
            AddObservation(kNoOpEncodingByteOverhead + 1));
  shipping_manager_->RequestSendSoon([&captured_success_arg](bool success) {
    captured_success_arg = success;
  });
  shipping_manager_->WaitUntilWorkerWaiting(kMaxSeconds);

  // Check that the callback was invoked with success = false.
  CheckCallCount(3, 3);
  EXPECT_EQ(3u, shipping_manager_->num_send_attempts());
  EXPECT_EQ(3u, shipping_manager_->num_failed_attempts());
  EXPECT_FALSE(captured_success_arg);

  // Arrange for the next send to succeed.
  {
    std::unique_lock<std::mutex> lock(send_retryer_->mutex);
    send_retryer_->status_to_return = grpc::Status::OK;
  }

  // Don't add another Observation but invoke RequestSendSoon() with a
  // callback.
  shipping_manager_->RequestSendSoon([&captured_success_arg](bool success) {
    captured_success_arg = success;
  });
  shipping_manager_->WaitUntilIdle(kMaxSeconds);

  // Check that the callback was invoked with success = true.
  CheckCallCount(4, 4);
  EXPECT_EQ(4u, shipping_manager_->num_send_attempts());
  EXPECT_EQ(3u, shipping_manager_->num_failed_attempts());
  EXPECT_TRUE(captured_success_arg);

  // Arrange for the next send to fail.
  {
    std::unique_lock<std::mutex> lock(send_retryer_->mutex);
    send_retryer_->status_to_return = grpc::Status::CANCELLED;
  }

  // Invoke RequestSendSoon without a callback just so that there is an
  // Observation cached in the inner EnvelopeMaker.
  EXPECT_EQ(ObservationStore::kOk,
            AddObservation(kNoOpEncodingByteOverhead + 1));
  shipping_manager_->RequestSendSoon();
  shipping_manager_->WaitUntilWorkerWaiting(kMaxSeconds);
  CheckCallCount(7, 7);
  EXPECT_EQ(7u, shipping_manager_->num_send_attempts());
  EXPECT_EQ(6u, shipping_manager_->num_failed_attempts());

  // Arrange for the next send to succeed.
  {
    std::unique_lock<std::mutex> lock(send_retryer_->mutex);
    send_retryer_->status_to_return = grpc::Status::OK;
  }

  // Add an Observation, invoke RequestSendSoon() with a callback.
  EXPECT_EQ(ObservationStore::kOk,
            AddObservation(kNoOpEncodingByteOverhead + 1));
  shipping_manager_->RequestSendSoon([&captured_success_arg](bool success) {
    captured_success_arg = success;
  });
  shipping_manager_->WaitUntilIdle(kMaxSeconds);

  // Check that the callback was invoked with success = true.
  CheckCallCount(8, 9);
  EXPECT_EQ(8u, shipping_manager_->num_send_attempts());
  EXPECT_EQ(6u, shipping_manager_->num_failed_attempts());
  EXPECT_TRUE(captured_success_arg);
}

TEST_F(ShippingManagerTest, SendObservationToClearcut) {
  // Init with a very long time for the regular schedule interval but
  // zero for the minimum interval so the test doesn't have to wait.
  Init(kMaxSeconds, std::chrono::seconds::zero(), kClearcutMetricId);

  // Add some observations for clearcut
  EXPECT_EQ(ObservationStore::kOk, AddObservation(40, kClearcutMetricId));
  EXPECT_EQ(ObservationStore::kOk, AddObservation(41, kClearcutMetricId));

  // Request send soon.
  shipping_manager_->RequestSendSoon();

  // Wait for both Observations to be sent.
  shipping_manager_->WaitUntilIdle(kMaxSeconds);

  // Ensure we sent stuff to clearcut.
  CheckHTTPCallCount(1, 2);

  // Ensure nothing was sent to legacy.
  CheckCallCount(0, 0);
}

}  // namespace encoder
}  // namespace cobalt
