blob: 572940183a47ea6fbc1cf63cf1dd8f055d147edb [file] [log] [blame]
// Copyright 2017 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/uploader/shipping_manager.h"
#include <chrono>
#include <memory>
#include <set>
#include <string>
#include <utility>
#include <vector>
#include <gtest/gtest.h>
#include "src/lib/clearcut/clearcut.pb.h"
#include "src/lib/util/posix_file_system.h"
#include "src/logger/fake_logger.h"
#include "src/logging.h"
#include "src/observation_store/memory_observation_store.h"
#include "src/observation_store/observation_store.h"
#include "src/pb/clearcut_extensions.pb.h"
#include "src/system_data/fake_system_data.h"
#include "third_party/gflags/include/gflags/gflags.h"
#include "third_party/protobuf/src/google/protobuf/util/delimited_message_util.h"
namespace cobalt::uploader {
using cobalt::clearcut_extensions::LogEventExtension;
using lib::statusor::StatusOr;
using logger::PerProjectBytesUploadedMetricDimensionStatus::Succeeded;
using logger::testing::FakeLogger;
using observation_store::MemoryObservationStore;
using observation_store::ObservationStore;
using util::EncryptedMessageMaker;
namespace {
constexpr uint32_t kCustomerId = 11;
constexpr uint32_t kProjectId = 12;
constexpr uint32_t kMetricId = 13;
constexpr size_t kMaxBytesPerObservation = 50;
constexpr size_t kMaxBytesPerEnvelope = 200;
constexpr size_t kMaxBytesTotal = 1000;
const std::chrono::seconds kMaxSeconds = UploadScheduler::kMaxSeconds;
constexpr int kHttpOk = 200;
constexpr int kHttpInternalServerError = 500;
class FakeHTTPClient : public lib::clearcut::HTTPClient {
std::future<StatusOr<lib::clearcut::HTTPResponse>> Post(
lib::clearcut::HTTPRequest request,
std::chrono::steady_clock::time_point /*ignored*/) override {
std::unique_lock<std::mutex> lock(mutex);
lib::clearcut::LogRequest req;
EXPECT_GT(req.log_event_size(), 0);
for (const auto& event : req.log_event()) {
auto log_event = event.GetExtension(LogEventExtension::ext);
Envelope recovered_envelope;
EXPECT_EQ(1, recovered_envelope.batch_size());
EXPECT_EQ(kMetricId, recovered_envelope.batch(0).meta_data().metric_id());
observation_count += recovered_envelope.batch(0).encrypted_observation_size();
lib::clearcut::HTTPResponse response;
response.http_code = http_response_code_to_return;
lib::clearcut::LogResponse resp;
std::promise<StatusOr<lib::clearcut::HTTPResponse>> response_promise;
return response_promise.get_future();
std::mutex mutex;
int http_response_code_to_return = kHttpOk;
int send_call_count = 0;
int observation_count = 0;
class FakeActivityListener : public ActivityListenerInterface {
FakeActivityListener() = default;
~FakeActivityListener() override = default;
void Start(const std::function<void(ActivityState)>& callback) override { callback_ = callback; };
ActivityState state() override { return state_; };
void Notify() {
if (callback_) {
void SetTestState(ActivityState state) { state_ = state; }
ActivityState state_ = IDLE;
std::optional<std::function<void(ActivityState)>> callback_;
std::unique_ptr<EncryptedMessage> CreateObservationMessage(size_t num_bytes) {
CHECK_GT(num_bytes, 4);
auto message = std::make_unique<EncryptedMessage>();
// Because the MemoryObservationStore counts the size of an Observation
// to be the ciphertext size + 4, we set the ciphertext size to be
// num_bytes - 4.
message->set_ciphertext(std::string(num_bytes - 4, 'x'));
return message;
std::unique_ptr<ObservationMetadata> CreateObservationMetadata() {
auto metadata = std::make_unique<ObservationMetadata>();
return metadata;
} // namespace
class ShippingManagerTest : public ::testing::Test {
: encrypt_to_shuffler_(EncryptedMessageMaker::MakeUnencrypted()),
observation_store_(kMaxBytesPerObservation, kMaxBytesPerEnvelope, kMaxBytesTotal) {}
void Init(std::chrono::seconds schedule_interval, std::chrono::seconds min_interval,
std::unique_ptr<FakeActivityListener> listener = nullptr) {
UploadScheduler upload_scheduler(schedule_interval, min_interval);
auto http_client = std::make_unique<FakeHTTPClient>();
http_client_ = http_client.get();
shipping_manager_ = std::make_unique<ClearcutV1ShippingManager>(
upload_scheduler, &observation_store_, encrypt_to_shuffler_.get(),
/*log_source_id=*/11, /*max_attempts_per_upload=*/1);
ObservationStore::StoreStatus AddObservation(size_t num_bytes) {
auto retval = observation_store_.StoreObservation(CreateObservationMessage(num_bytes),
return retval;
void DestroyShippingManager() { shipping_manager_ = nullptr; }
void DeleteData() { observation_store_.DeleteData(); }
void CheckCallCount(int expected_call_count, int expected_observation_count) {
ASSERT_NE(nullptr, http_client_);
std::unique_lock<std::mutex> lock(http_client_->mutex);
EXPECT_EQ(expected_call_count, http_client_->send_call_count);
EXPECT_EQ(expected_observation_count, http_client_->observation_count);
std::unique_ptr<EncryptedMessageMaker> encrypt_to_shuffler_;
std::unique_ptr<EncryptedMessageMaker> encrypt_to_analyzer_;
MemoryObservationStore observation_store_;
system_data::FakeSystemData system_data_;
std::unique_ptr<ClearcutV1ShippingManager> shipping_manager_;
FakeHTTPClient* http_client_ = nullptr;
// We construct a ShippingManager and destruct it without calling any methods.
// This tests that the destructor requests that the worker thread terminate
// and then waits for it to terminate.
TEST_F(ShippingManagerTest, ConstructAndDestruct) { Init(kMaxSeconds, kMaxSeconds); }
// We construct a ShippingManager and add one small Observation to it.
// Before the ShippingManager has a chance to send the Observation we
// destruct it. We test that the Add() returns OK and the destructor
// succeeds.
TEST_F(ShippingManagerTest, AddOneObservationAndDestruct) {
Init(kMaxSeconds, kMaxSeconds);
EXPECT_EQ(ObservationStore::kOk, AddObservation(40));
// We construct a ShippingManager and add one Observation. While the ShippingManager is trying to
// send the Observation we destruct it and confirm that it terminates cleanly.
TEST_F(ShippingManagerTest, DestructWhileSendingEnvelope) {
for (int i = 0; i < 100; i++) {
Init(std::chrono::seconds::zero(), std::chrono::seconds::zero());
EXPECT_EQ(ObservationStore::kOk, AddObservation(40));
// Empty the ObservationStore so it doesn't fill up.
// We add one Observation, confirm that it is not immediately sent,
// invoke RequestSendSoon, wait for the Observation to be sent, confirm
// that it was sent.
TEST_F(ShippingManagerTest, SendOne) {
// Init with a very long time for the regular schedule interval but
// zero for the minimum interval so the test doesn't have to wait.
Init(kMaxSeconds, std::chrono::seconds::zero());
// Add one Observation().
EXPECT_EQ(ObservationStore::kOk, AddObservation(40));
// Confirm it has not been sent yet.
CheckCallCount(0, 0);
// Invoke RequestSendSoon.
// Wait for it to be sent.
// Confirm it has been sent.
EXPECT_EQ(1u, shipping_manager_->num_send_attempts());
EXPECT_EQ(0u, shipping_manager_->num_failed_attempts());
EXPECT_EQ(util::OK, shipping_manager_->last_send_status().error_code());
CheckCallCount(1, 1);
TEST_F(ShippingManagerTest, DisallowUploading) {
// Init with a very long time for the regular schedule interval but
// zero for the minimum interval so the test doesn't have to wait.
Init(kMaxSeconds, std::chrono::seconds::zero());
// Add one Observation().
EXPECT_EQ(ObservationStore::kOk, AddObservation(40));
// Confirm it has not been sent yet.
CheckCallCount(0, 0);
// Invoke RequestSendSoon.
// Wait for it to be sent.
// Confirm it has still not been sent.
CheckCallCount(0, 0);
// Invoke RequestSendSoon.
// Wait for it to be sent.
// Confirm it has been sent.
EXPECT_EQ(1u, shipping_manager_->num_send_attempts());
EXPECT_EQ(0u, shipping_manager_->num_failed_attempts());
EXPECT_EQ(util::OK, shipping_manager_->last_send_status().error_code());
CheckCallCount(1, 1);
// Performs the same test as SendOne but makes sure the right values were logged to the internal
// metrics object.
TEST_F(ShippingManagerTest, SendOneLoggedMetrics) {
// Init with a very long time for the regular schedule interval but
// zero for the minimum interval so the test doesn't have to wait.
Init(kMaxSeconds, std::chrono::seconds::zero());
FakeLogger logger;
EXPECT_EQ(ObservationStore::kOk, AddObservation(40));
// Same checks as the previous test.
EXPECT_EQ(1u, shipping_manager_->num_send_attempts());
EXPECT_EQ(0u, shipping_manager_->num_failed_attempts());
EXPECT_EQ(util::OK, shipping_manager_->last_send_status().error_code());
CheckCallCount(1, 1);
// Attempt and Succeed in the Shipping Manager for a project (2), Attempt and Succeed in Clearcut
// Uploader with both 1.0 and 1.1 metrics (4), and a IdleObservationUpload when the envelopes are
// sent (1).
EXPECT_EQ(7, logger.call_count());
auto event = logger.last_event_logged(); // The last Succeed for an Observation Batch.
EXPECT_EQ(1, event.event_count_event().event_code_size());
EXPECT_EQ(Succeeded, event.event_count_event().event_code(0));
EXPECT_EQ(absl::StrCat(kCustomerId, "/", kProjectId), event.event_count_event().component());
EXPECT_LE(40, event.event_count_event().count()); // This is an estimate.
// We add two Observations, confirm that they are not immediately sent,
// invoke RequestSendSoon, wait for the Observations to be sent, confirm
// that they were sent together in a single Envelope.
TEST_F(ShippingManagerTest, SendTwo) {
// Init with a very long time for the regular schedule interval but
// zero for the minimum interval so the test doesn't have to wait.
Init(kMaxSeconds, std::chrono::seconds::zero());
EXPECT_EQ(ObservationStore::kOk, AddObservation(40));
EXPECT_EQ(ObservationStore::kOk, AddObservation(40));
CheckCallCount(0, 0);
// Request send soon.
// Wait for both Observations to be sent.
// Confirm the two Observations were sent together in a single Envelope.
EXPECT_EQ(1u, shipping_manager_->num_send_attempts());
EXPECT_EQ(0u, shipping_manager_->num_failed_attempts());
EXPECT_EQ(util::OK, shipping_manager_->last_send_status().error_code());
CheckCallCount(1, 2);
// Performs the same test as SendTwo but makes sure the right values were logged to the internal
// metrics object.
TEST_F(ShippingManagerTest, SendTwoLoggedMetrics) {
Init(kMaxSeconds, std::chrono::seconds::zero());
FakeLogger logger;
EXPECT_EQ(ObservationStore::kOk, AddObservation(40));
EXPECT_EQ(ObservationStore::kOk, AddObservation(40));
CheckCallCount(0, 0);
// Same checks as the previous test.
EXPECT_EQ(1u, shipping_manager_->num_send_attempts());
EXPECT_EQ(0u, shipping_manager_->num_failed_attempts());
EXPECT_EQ(util::OK, shipping_manager_->last_send_status().error_code());
CheckCallCount(1, 2);
// Attempt and Succeed in the Shipping Manager (2), Attempt and Succeed in Clearcut Uploader for
// the envelope with both 1.0 and 1.1 metrics (4), and an IdleObservationUpload when the envelopes
// are sent (1).
EXPECT_EQ(7, logger.call_count());
auto event = logger.last_event_logged(); // The last Succeed for an Observation Batch.
EXPECT_EQ(1, event.event_count_event().event_code_size());
EXPECT_EQ(Succeeded, event.event_count_event().event_code(0));
EXPECT_EQ(absl::StrCat(kCustomerId, "/", kProjectId), event.event_count_event().component());
EXPECT_LE(2 * 40, event.event_count_event().count()); // This is an estimate.
// Tries to add an Observation that is too big. Tests that kObservationTooBig
// is returned.
TEST_F(ShippingManagerTest, ObservationTooBig) {
// Init with a very long time for the regular schedule interval but
// zero for the minimum interval so the test doesn't have to wait.
Init(kMaxSeconds, std::chrono::seconds::zero());
// Add one observation that is too big.
EXPECT_EQ(ObservationStore::kObservationTooBig, AddObservation(60));
// Add multiple Observations and allow them to be sent on the regular
// schedule.
TEST_F(ShippingManagerTest, ScheduledSend) {
// We set both schedule_interval_ and min_interval_ to zero so the test
// does not have to wait.
Init(std::chrono::seconds::zero(), std::chrono::seconds::zero());
// Add two Observations but do not invoke RequestSendSoon() and do
// not add enough Observations to exceed envelope_send_threshold_size_.
for (int i = 0; i < 2; i++) {
EXPECT_EQ(ObservationStore::kOk, AddObservation(40));
// Wait for the scheduled send
// We do not check the number of sends because that depends on the
// timing interaction of the test thread and the worker thread and so it
// would be flaky. Just check that all 3 Observations were sent.
std::unique_lock<std::mutex> lock(http_client_->mutex);
EXPECT_EQ(2, http_client_->observation_count);
EXPECT_EQ(util::OK, shipping_manager_->last_send_status().error_code());
// Tests the ShippingManager in the case that the ObservationStore returns
// kStoreFull.
// kMaxBytesTotal = 1000 and we are using Observations of size 40 bytes.
// 40 * 25 = 1000. The MemoryObservationStore will allow the 26th Observation
// to be added and then reject the 27th Observation.
TEST_F(ShippingManagerTest, ExceedMaxBytesTotal) {
// Init with a very long time for the regular schedule interval but
// zero for the minimum interval so the test doesn't have to wait.
Init(kMaxSeconds, std::chrono::seconds::zero());
// Configure the HttpClient to fail every time.
std::unique_lock<std::mutex> lock(http_client_->mutex);
http_client_->http_response_code_to_return = kHttpInternalServerError;
// We can add 15 observations without the ObservationStore reporting that
// it is almost full.
for (int i = 0; i < 15; i++) { // NOLINT readability-magic-numbers
EXPECT_EQ(ObservationStore::kOk, AddObservation(40));
// The ObservationStore was never almost full so the ShippingManager
// should not have attempted to do a send.
EXPECT_EQ(0u, shipping_manager_->num_send_attempts());
EXPECT_EQ(0u, shipping_manager_->num_failed_attempts());
// The sixteenth Observation causes the ObservationStore to become almost
// full and that causes the ShippingManager to attempt a send.
EXPECT_EQ(ObservationStore::kOk, AddObservation(40));
auto num_send_attempts = shipping_manager_->num_send_attempts();
EXPECT_GT(num_send_attempts, 0u);
EXPECT_EQ(num_send_attempts, shipping_manager_->num_failed_attempts());
EXPECT_EQ(util::UNKNOWN, shipping_manager_->last_send_status().error_code());
// We can add 10 more Observations bringing the total to 26,
// without the ObservationStore being full.
for (int i = 0; i < 10; i++) {
EXPECT_EQ(ObservationStore::kOk, AddObservation(40));
// Because the ObservationStore is almost full the ShippingManager has
// been attempting additional sends.
EXPECT_GT(shipping_manager_->num_send_attempts(), num_send_attempts);
num_send_attempts = shipping_manager_->num_send_attempts();
EXPECT_EQ(num_send_attempts, shipping_manager_->num_failed_attempts());
EXPECT_EQ(util::UNKNOWN, shipping_manager_->last_send_status().error_code());
// The 27th Observation will be rejected with kStoreFull.
EXPECT_EQ(ObservationStore::kStoreFull, AddObservation(40));
// Now configure the HttpClient to start succeeding,
// and reset the counts.
std::unique_lock<std::mutex> lock(http_client_->mutex);
http_client_->http_response_code_to_return = kHttpOk;
http_client_->send_call_count = 0;
http_client_->observation_count = 0;
// Send all 26 of the accumulated Observations.
// All 26 successfully-added Observations should have been sent in six
// envelopes
CheckCallCount(6, 26); // NOLINT readability-magic-numbers
EXPECT_EQ(util::OK, shipping_manager_->last_send_status().error_code());
EXPECT_GT(shipping_manager_->num_send_attempts(), num_send_attempts);
num_send_attempts = shipping_manager_->num_send_attempts();
EXPECT_GT(num_send_attempts, shipping_manager_->num_failed_attempts());
EXPECT_EQ(util::OK, shipping_manager_->last_send_status().error_code());
// Now we can add a 27th Observation and send it.
EXPECT_EQ(ObservationStore::kOk, AddObservation(40));
CheckCallCount(7, 27); // NOLINT readability-magic-numbers
EXPECT_EQ(util::OK, shipping_manager_->last_send_status().error_code());
// Tests that when the total amount of accumulated Observation data exceeds
// total_bytes_send_threshold_ then RequestSendSoon() will be invoked.
TEST_F(ShippingManagerTest, TotalBytesSendThreshold) {
// Init with a very long time for the regular schedule interval but
// zero for the minimum interval so the test doesn't have to wait.
Init(kMaxSeconds, std::chrono::seconds::zero());
// Configure the HttpClient to fail every time so that we can
// accumulate Observation data in memory.
std::unique_lock<std::mutex> lock(http_client_->mutex);
http_client_->http_response_code_to_return = kHttpInternalServerError;
// total_bytes_send_threshold_ = 0.6 * max_bytes_total_.
// kMaxBytesTotal = 1000 so total_bytes_send_threshold_ = 600.
// We are using Observations of size 40 and 40 * 15 = 600 so the first
// Observation that causes us to exceed total_bytes_send_threshold_ is #16.
// Add 15 Observations. We want to do this in such a way that we don't
// exceed max_bytes_per_envelope_. Each time we will invoke
// RequestSendSoon() and then WaitUntilWorkerWaiting() so that we know that
// between invocations of AddObservtion() the worker thread will complete
// one execution of SendAllEnvelopes().
for (int i = 0; i < 15; i++) { // NOLINT readability-magic-numbers
EXPECT_EQ(ObservationStore::kOk, AddObservation(40));
if (i < 15) { // NOLINT readability-magic-numbers
// After having added 15 observations we have exceeded the
// ObservationStore's almost_full_threshold_ and this means that each
// invocation of AddEncryptedObservation() followed by a
// NotifyObservationsAdded() automatically invokes RequestSendSoon() and
// so we don't want to invoke it again here.
// We expect there to have been 45 calls to SendRetryer::SendToShuffler() in
// which the Envelopes sent contained a total of 195 Observations. Here we
// explain how this was calculated. See the comments at the top of the file on
// kMinEnvelopeSendSize. There it is explained that the ObservationStore will
// attempt to bundle together up to 5 observations into a sinle envelope
// before sending. None of the sends succeed so the ObservationStore keeps
// accumulating more Envelopes containing 5 Observations that failed to send.
// Below is the complete pattern of send attempts. Each set in braces
// represents one execution of SendAllEnvelopes(). The numbers in each set
// represent the invocations of SendEnvelopeToBackend() with an Envelope that
// contains that many observations.
// {1, 1, 1}, {2, 2, 2}, {3, 3, 3}, {4, 4, 4}, {5, 5, 5}, {5, 5, 5}, ...
// Thus the total number of send attempts is the total number of numbers:
// 3 * 15 = 45
// And the total number of Observations is the sum of all the numbers:
// (1 + 2 + 3 + 4 + 5) * 3 + (5*3*(15-5)) = 195
CheckCallCount(45, 195); // NOLINT readability-magic-numbers
// Now configure the HttpClient to start succeeding,
// and reset the counts.
std::unique_lock<std::mutex> lock(http_client_->mutex);
http_client_->http_response_code_to_return = kHttpOk;
http_client_->send_call_count = 0;
http_client_->observation_count = 0;
// Now we send the 16th Observattion. But notice that we do *not* invoke
// RequestSendSoon() this time. So the reason the Observations get sent
// now is because we are exceeding total_bytes_send_threshold_.
EXPECT_EQ(ObservationStore::kOk, AddObservation(40));
// All 16 Observations should have been sent in 4 envelopes as {5, 5, 5, 1}.
CheckCallCount(4, 16); // NOLINT readability-magic-numbers
// Test the version of the method RequestSendSoon() that takes a callback.
// We test that the callback is invoked with success = true when the send
// succeeds and with success = false when the send fails.
TEST_F(ShippingManagerTest, RequestSendSoonWithCallback) {
Init(kMaxSeconds, std::chrono::seconds::zero());
// Invoke RequestSendSoon() with a callback before any Observations are
// added.
bool captured_success_arg = false;
[&captured_success_arg](bool success) { captured_success_arg = success; });
// Check that the callback was invoked synchronously with success = true.
CheckCallCount(0, 0);
EXPECT_EQ(0u, shipping_manager_->num_send_attempts());
EXPECT_EQ(0u, shipping_manager_->num_failed_attempts());
// Arrange for the first send to fail.
std::unique_lock<std::mutex> lock(http_client_->mutex);
http_client_->http_response_code_to_return = kHttpInternalServerError;
// Add an Observation, invoke RequestSendSoon() with a callback.
EXPECT_EQ(ObservationStore::kOk, AddObservation(31));
[&captured_success_arg](bool success) { captured_success_arg = success; });
// Check that the callback was invoked with success = false.
CheckCallCount(3, 3);
EXPECT_EQ(3u, shipping_manager_->num_send_attempts());
EXPECT_EQ(3u, shipping_manager_->num_failed_attempts());
// Arrange for the next send to succeed.
std::unique_lock<std::mutex> lock(http_client_->mutex);
http_client_->http_response_code_to_return = kHttpOk;
// Don't add another Observation but invoke RequestSendSoon() with a
// callback.
[&captured_success_arg](bool success) { captured_success_arg = success; });
// Check that the callback was invoked with success = true.
CheckCallCount(4, 4);
EXPECT_EQ(4u, shipping_manager_->num_send_attempts());
EXPECT_EQ(3u, shipping_manager_->num_failed_attempts());
// Arrange for the next send to fail.
std::unique_lock<std::mutex> lock(http_client_->mutex);
http_client_->http_response_code_to_return = kHttpInternalServerError;
// Invoke RequestSendSoon without a callback just so that there is an
// Observation cached in the inner EnvelopeMaker.
EXPECT_EQ(ObservationStore::kOk, AddObservation(31));
CheckCallCount(7, 7); // NOLINT readability-magic-numbers
EXPECT_EQ(7u, shipping_manager_->num_send_attempts());
EXPECT_EQ(6u, shipping_manager_->num_failed_attempts());
// Arrange for the next send to succeed.
std::unique_lock<std::mutex> lock(http_client_->mutex);
http_client_->http_response_code_to_return = kHttpOk;
// Add an Observation, invoke RequestSendSoon() with a callback.
EXPECT_EQ(ObservationStore::kOk, AddObservation(31));
[&captured_success_arg](bool success) { captured_success_arg = success; });
// Check that the callback was invoked with success = true.
CheckCallCount(8, 9); // NOLINT readability-magic-numbers
EXPECT_EQ(8u, shipping_manager_->num_send_attempts());
EXPECT_EQ(6u, shipping_manager_->num_failed_attempts());
TEST_F(ShippingManagerTest, NoDelayWhenDeviceStateIdle) {
auto listener = std::make_unique<FakeActivityListener>();
// We set both schedule_interval_ and min_interval_ to zero so the test
// does not have to wait.
Init(std::chrono::seconds::zero(), std::chrono::seconds::zero(), std::move(listener));
EXPECT_EQ(ObservationStore::kOk, AddObservation(40));
// Wait for the scheduled send
EXPECT_EQ(0u, shipping_manager_->num_delayed_sends());
EXPECT_EQ(1u, shipping_manager_->num_send_attempts());
TEST_F(ShippingManagerTest, NoDelayWhenDeviceStateUnknown) {
auto listener = std::make_unique<FakeActivityListener>();
// We set both schedule_interval_ and min_interval_ to zero so the test
// does not have to wait.
Init(std::chrono::seconds::zero(), std::chrono::seconds::zero(), std::move(listener));
EXPECT_EQ(ObservationStore::kOk, AddObservation(40));
// Wait for the scheduled send
EXPECT_EQ(0u, shipping_manager_->num_delayed_sends());
EXPECT_EQ(1u, shipping_manager_->num_send_attempts());
// Performs the same test as DelaysUntilDeviceStateIdle but the device state becomes IDLE.
TEST_F(ShippingManagerTest, DelaysUntilDeviceStateIdle) {
auto listener = std::make_unique<FakeActivityListener>();
FakeActivityListener* listener_ptr = listener.get();
// We set both schedule_interval_ and min_interval_ to zero so the test
// does not have to wait.
Init(std::chrono::seconds::zero(), std::chrono::seconds::zero(), std::move(listener));
// Ensure that the ShippingManager uploads because the device became idle, not
// because it forces upload after two failed attempts.
std::unique_lock<std::mutex> lock(http_client_->mutex);
EXPECT_EQ(ObservationStore::kOk, AddObservation(40));
EXPECT_EQ(1u, shipping_manager_->num_delayed_sends());
EXPECT_EQ(0u, shipping_manager_->num_send_attempts());
EXPECT_EQ(1u, shipping_manager_->num_delayed_sends());
EXPECT_EQ(1u, shipping_manager_->num_send_attempts());
// Performs the same test as DelaysUntilDeviceStateIdle but the device state becomes UNKNOWN.
TEST_F(ShippingManagerTest, DelaysUntilDeviceStateUnknown) {
auto listener = std::make_unique<FakeActivityListener>();
FakeActivityListener* listener_ptr = listener.get();
// We set both schedule_interval_ and min_interval_ to zero so the test
// does not have to wait.
Init(std::chrono::seconds::zero(), std::chrono::seconds::zero(), std::move(listener));
// Ensure that the ShippingManager uploads because the device became idle, not
// because it forces upload after two failed attempts.
std::unique_lock<std::mutex> lock(http_client_->mutex);
EXPECT_EQ(ObservationStore::kOk, AddObservation(40));
EXPECT_EQ(1u, shipping_manager_->num_delayed_sends());
EXPECT_EQ(0u, shipping_manager_->num_send_attempts());
EXPECT_EQ(1u, shipping_manager_->num_delayed_sends());
EXPECT_EQ(1u, shipping_manager_->num_send_attempts());
TEST_F(ShippingManagerTest, DelaysOnceAtMost) {
auto listener = std::make_unique<FakeActivityListener>();
// We set both schedule_interval_ and min_interval_ to zero so the test
// does not have to wait.
Init(std::chrono::seconds::zero(), std::chrono::seconds::zero(), std::move(listener));
EXPECT_EQ(ObservationStore::kOk, AddObservation(40));
EXPECT_EQ(1u, shipping_manager_->num_delayed_sends());
EXPECT_EQ(1u, shipping_manager_->num_send_attempts());
TEST_F(ShippingManagerTest, DestructWhileWaitingForIdleDevice) {
auto listener = std::make_unique<FakeActivityListener>();
FakeActivityListener* listener_ptr = listener.get();
// We set both schedule_interval_ and min_interval_ to zero so the test
// does not have to wait.
Init(std::chrono::seconds::zero(), std::chrono::seconds::zero(), std::move(listener));
EXPECT_EQ(ObservationStore::kOk, AddObservation(40));
constexpr char test_file_base[] = "/tmp/local_shipping_manager_test";
std::string GetTestFileName(const std::string& base) {
std::stringstream fname;
fname << base << "_"
<< std::chrono::duration_cast<std::chrono::milliseconds>(
return fname.str();
class LocalShippingManagerTest : public ::testing::Test {
: observation_store_(kMaxBytesPerObservation, kMaxBytesPerEnvelope, kMaxBytesTotal),
test_file_name_(GetTestFileName(test_file_base)) {}
void SetUp() override {
shipping_manager_ =
std::make_unique<LocalShippingManager>(&observation_store_, test_file_name_, &fs_);
ObservationStore::StoreStatus AddObservation(size_t num_bytes) {
auto retval = observation_store_.StoreObservation(CreateObservationMessage(num_bytes),
return retval;
void CheckObservationCount(int expected_observation_count) {
auto ifs = fs_.NewProtoInputStream(test_file_name_);
ASSERT_EQ(util::StatusCode::OK, ifs.status().error_code());
int observation_count = 0;
Envelope envelope;
bool clean_eof = false;
while (google::protobuf::util::ParseDelimitedFromZeroCopyStream(
&envelope, ifs.ValueOrDie().get(), &clean_eof)) {
for (const auto& batch : envelope.batch()) {
observation_count += batch.encrypted_observation_size();
EXPECT_EQ(true, clean_eof);
EXPECT_EQ(expected_observation_count, observation_count);
MemoryObservationStore observation_store_;
std::string test_file_name_;
std::unique_ptr<ShippingManager> shipping_manager_;
util::PosixFileSystem fs_;
// Add multiple Observations and allow them to be saved.
TEST_F(LocalShippingManagerTest, ScheduledSave) {
// Add two Observations but do not invoke RequestSendSoon() and do
// not add enough Observations to exceed envelope_send_threshold_size_.
for (int i = 0; i < 2; i++) {
EXPECT_EQ(ObservationStore::kOk, AddObservation(40));
// Wait for the save to disk.
EXPECT_EQ(util::OK, shipping_manager_->last_send_status().error_code());
} // namespace cobalt::uploader