| // Copyright 2017 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "src/uploader/shipping_manager.h" |
| |
| #include <chrono> |
| #include <memory> |
| #include <set> |
| #include <string> |
| #include <utility> |
| #include <vector> |
| |
| #include <gtest/gtest.h> |
| |
| #include "src/lib/clearcut/clearcut.pb.h" |
| #include "src/lib/util/posix_file_system.h" |
| #include "src/logger/fake_logger.h" |
| #include "src/logging.h" |
| #include "src/observation_store/memory_observation_store.h" |
| #include "src/observation_store/observation_store.h" |
| #include "src/pb/clearcut_extensions.pb.h" |
| #include "src/system_data/fake_system_data.h" |
| #include "third_party/gflags/include/gflags/gflags.h" |
| #include "third_party/protobuf/src/google/protobuf/util/delimited_message_util.h" |
| |
| namespace cobalt::uploader { |
| |
| using cobalt::clearcut_extensions::LogEventExtension; |
| using lib::statusor::StatusOr; |
| using logger::PerProjectBytesUploadedMetricDimensionStatus::Succeeded; |
| using logger::testing::FakeLogger; |
| using observation_store::MemoryObservationStore; |
| using observation_store::ObservationStore; |
| using util::EncryptedMessageMaker; |
| |
| namespace { |
| |
| constexpr uint32_t kCustomerId = 11; |
| constexpr uint32_t kProjectId = 12; |
| constexpr uint32_t kMetricId = 13; |
| |
| constexpr size_t kMaxBytesPerObservation = 50; |
| constexpr size_t kMaxBytesPerEnvelope = 200; |
| constexpr size_t kMaxBytesTotal = 1000; |
| const std::chrono::seconds kMaxSeconds = UploadScheduler::kMaxSeconds; |
| |
| constexpr int kHttpOk = 200; |
| constexpr int kHttpInternalServerError = 500; |
| |
| class FakeHTTPClient : public lib::HTTPClient { |
| public: |
| std::future<StatusOr<lib::HTTPResponse>> Post( |
| lib::HTTPRequest request, std::chrono::steady_clock::time_point /*ignored*/) override { |
| std::unique_lock<std::mutex> lock(mutex); |
| |
| lib::clearcut::LogRequest req; |
| req.ParseFromString(request.body); |
| EXPECT_GT(req.log_event_size(), 0); |
| for (const auto& event : req.log_event()) { |
| EXPECT_TRUE(event.HasExtension(LogEventExtension::ext)); |
| auto log_event = event.GetExtension(LogEventExtension::ext); |
| Envelope recovered_envelope; |
| EXPECT_TRUE( |
| recovered_envelope.ParseFromString(log_event.cobalt_encrypted_envelope().ciphertext())); |
| EXPECT_EQ(1, recovered_envelope.batch_size()); |
| EXPECT_EQ(kMetricId, recovered_envelope.batch(0).meta_data().metric_id()); |
| observation_count += recovered_envelope.batch(0).encrypted_observation_size(); |
| } |
| send_call_count++; |
| |
| lib::HTTPResponse response; |
| response.http_code = http_response_code_to_return; |
| lib::clearcut::LogResponse resp; |
| resp.SerializeToString(&response.response); |
| |
| std::promise<StatusOr<lib::HTTPResponse>> response_promise; |
| response_promise.set_value(std::move(response)); |
| |
| return response_promise.get_future(); |
| } |
| |
| std::mutex mutex; |
| int http_response_code_to_return = kHttpOk; |
| int send_call_count = 0; |
| int observation_count = 0; |
| }; |
| |
| class FakeActivityListener : public ActivityListenerInterface { |
| public: |
| FakeActivityListener() = default; |
| ~FakeActivityListener() override = default; |
| |
| void Start(const std::function<void(ActivityState)>& callback) override { callback_ = callback; }; |
| ActivityState state() override { return state_; }; |
| |
| void Notify() { |
| if (callback_) { |
| callback_.value()(state_); |
| } |
| } |
| |
| void SetTestState(ActivityState state) { state_ = state; } |
| |
| private: |
| ActivityState state_ = IDLE; |
| std::optional<std::function<void(ActivityState)>> callback_; |
| }; |
| |
| std::unique_ptr<EncryptedMessage> CreateObservationMessage(size_t num_bytes) { |
| CHECK_GT(num_bytes, 4); |
| auto message = std::make_unique<EncryptedMessage>(); |
| // Because the MemoryObservationStore counts the size of an Observation |
| // to be the ciphertext size + 4, we set the ciphertext size to be |
| // num_bytes - 4. |
| message->set_ciphertext(std::string(num_bytes - 4, 'x')); |
| return message; |
| } |
| |
| std::unique_ptr<ObservationMetadata> CreateObservationMetadata() { |
| auto metadata = std::make_unique<ObservationMetadata>(); |
| metadata->set_customer_id(kCustomerId); |
| metadata->set_project_id(kProjectId); |
| metadata->set_metric_id(kMetricId); |
| return metadata; |
| } |
| } // namespace |
| |
| class ShippingManagerTest : public ::testing::Test { |
| public: |
| ShippingManagerTest() |
| : encrypt_to_shuffler_(EncryptedMessageMaker::MakeUnencrypted()), |
| encrypt_to_analyzer_(EncryptedMessageMaker::MakeUnencrypted()), |
| observation_store_(kMaxBytesPerObservation, kMaxBytesPerEnvelope, kMaxBytesTotal) {} |
| |
| protected: |
| void Init(std::chrono::seconds schedule_interval, std::chrono::seconds min_interval, |
| std::unique_ptr<FakeActivityListener> listener = nullptr) { |
| UploadScheduler upload_scheduler(schedule_interval, min_interval); |
| auto http_client = std::make_unique<FakeHTTPClient>(); |
| http_client_ = http_client.get(); |
| shipping_manager_ = std::make_unique<ClearcutV1ShippingManager>( |
| upload_scheduler, &observation_store_, encrypt_to_shuffler_.get(), |
| encrypt_to_analyzer_.get(), |
| std::make_unique<lib::clearcut::ClearcutUploader>("https://test.com", |
| std::move(http_client)), |
| std::move(listener), |
| /*log_source_id=*/11, /*max_attempts_per_upload=*/1); |
| shipping_manager_->Start(); |
| } |
| |
| ObservationStore::StoreStatus AddObservation(size_t num_bytes) { |
| auto retval = observation_store_.StoreObservation(CreateObservationMessage(num_bytes), |
| CreateObservationMetadata()); |
| shipping_manager_->NotifyObservationsAdded(); |
| return retval; |
| } |
| |
| void DestroyShippingManager() { shipping_manager_ = nullptr; } |
| void DeleteData() { observation_store_.DeleteData(); } |
| |
| void CheckCallCount(int expected_call_count, int expected_observation_count) { |
| ASSERT_NE(nullptr, http_client_); |
| std::unique_lock<std::mutex> lock(http_client_->mutex); |
| EXPECT_EQ(expected_call_count, http_client_->send_call_count); |
| EXPECT_EQ(expected_observation_count, http_client_->observation_count); |
| } |
| |
| private: |
| std::unique_ptr<EncryptedMessageMaker> encrypt_to_shuffler_; |
| std::unique_ptr<EncryptedMessageMaker> encrypt_to_analyzer_; |
| MemoryObservationStore observation_store_; |
| system_data::FakeSystemData system_data_; |
| |
| protected: |
| std::unique_ptr<ClearcutV1ShippingManager> shipping_manager_; |
| FakeHTTPClient* http_client_ = nullptr; |
| }; |
| |
| // We construct a ShippingManager and destruct it without calling any methods. |
| // This tests that the destructor requests that the worker thread terminate |
| // and then waits for it to terminate. |
| TEST_F(ShippingManagerTest, ConstructAndDestruct) { Init(kMaxSeconds, kMaxSeconds); } |
| |
| // We construct a ShippingManager and add one small Observation to it. |
| // Before the ShippingManager has a chance to send the Observation we |
| // destruct it. We test that the Add() returns OK and the destructor |
| // succeeds. |
| TEST_F(ShippingManagerTest, AddOneObservationAndDestruct) { |
| Init(kMaxSeconds, kMaxSeconds); |
| EXPECT_EQ(ObservationStore::kOk, AddObservation(40)); |
| } |
| |
| // We construct a ShippingManager and add one Observation. While the ShippingManager is trying to |
| // send the Observation we destruct it and confirm that it terminates cleanly. |
| TEST_F(ShippingManagerTest, DestructWhileSendingEnvelope) { |
| for (int i = 0; i < 100; i++) { |
| Init(std::chrono::seconds::zero(), std::chrono::seconds::zero()); |
| EXPECT_EQ(ObservationStore::kOk, AddObservation(40)); |
| DestroyShippingManager(); |
| |
| // Empty the ObservationStore so it doesn't fill up. |
| DeleteData(); |
| } |
| } |
| |
| // We add one Observation, confirm that it is not immediately sent, |
| // invoke RequestSendSoon, wait for the Observation to be sent, confirm |
| // that it was sent. |
| TEST_F(ShippingManagerTest, SendOne) { |
| // Init with a very long time for the regular schedule interval but |
| // zero for the minimum interval so the test doesn't have to wait. |
| Init(kMaxSeconds, std::chrono::seconds::zero()); |
| // Add one Observation(). |
| EXPECT_EQ(ObservationStore::kOk, AddObservation(40)); |
| |
| // Confirm it has not been sent yet. |
| CheckCallCount(0, 0); |
| |
| // Invoke RequestSendSoon. |
| shipping_manager_->RequestSendSoon(); |
| |
| // Wait for it to be sent. |
| shipping_manager_->WaitUntilIdle(kMaxSeconds); |
| |
| // Confirm it has been sent. |
| EXPECT_EQ(1u, shipping_manager_->num_send_attempts()); |
| EXPECT_EQ(0u, shipping_manager_->num_failed_attempts()); |
| EXPECT_EQ(util::OK, shipping_manager_->last_send_status().error_code()); |
| CheckCallCount(1, 1); |
| } |
| |
| TEST_F(ShippingManagerTest, DisallowUploading) { |
| // Init with a very long time for the regular schedule interval but |
| // zero for the minimum interval so the test doesn't have to wait. |
| Init(kMaxSeconds, std::chrono::seconds::zero()); |
| |
| shipping_manager_->Disable(true); |
| |
| // Add one Observation(). |
| EXPECT_EQ(ObservationStore::kOk, AddObservation(40)); |
| |
| // Confirm it has not been sent yet. |
| CheckCallCount(0, 0); |
| |
| // Invoke RequestSendSoon. |
| shipping_manager_->RequestSendSoon(); |
| |
| // Wait for it to be sent. |
| shipping_manager_->WaitUntilIdle(kMaxSeconds); |
| |
| // Confirm it has still not been sent. |
| CheckCallCount(0, 0); |
| |
| shipping_manager_->Disable(false); |
| |
| // Invoke RequestSendSoon. |
| shipping_manager_->RequestSendSoon(); |
| |
| // Wait for it to be sent. |
| shipping_manager_->WaitUntilIdle(kMaxSeconds); |
| |
| // Confirm it has been sent. |
| EXPECT_EQ(1u, shipping_manager_->num_send_attempts()); |
| EXPECT_EQ(0u, shipping_manager_->num_failed_attempts()); |
| EXPECT_EQ(util::OK, shipping_manager_->last_send_status().error_code()); |
| CheckCallCount(1, 1); |
| } |
| |
| // Performs the same test as SendOne but makes sure the right values were logged to the internal |
| // metrics object. |
| TEST_F(ShippingManagerTest, SendOneLoggedMetrics) { |
| // Init with a very long time for the regular schedule interval but |
| // zero for the minimum interval so the test doesn't have to wait. |
| Init(kMaxSeconds, std::chrono::seconds::zero()); |
| |
| FakeLogger logger; |
| shipping_manager_->ResetInternalMetrics(&logger); |
| |
| EXPECT_EQ(ObservationStore::kOk, AddObservation(40)); |
| shipping_manager_->RequestSendSoon(); |
| shipping_manager_->WaitUntilIdle(kMaxSeconds); |
| |
| // Same checks as the previous test. |
| EXPECT_EQ(1u, shipping_manager_->num_send_attempts()); |
| EXPECT_EQ(0u, shipping_manager_->num_failed_attempts()); |
| EXPECT_EQ(util::OK, shipping_manager_->last_send_status().error_code()); |
| CheckCallCount(1, 1); |
| |
| // Attempt and Succeed in the Shipping Manager for a project (2), Attempt and Succeed in Clearcut |
| // Uploader with both 1.0 and 1.1 metrics (4), and a IdleObservationUpload when the envelopes are |
| // sent (1). |
| EXPECT_EQ(7, logger.call_count()); |
| |
| auto event = logger.last_event_logged(); // The last Succeed for an Observation Batch. |
| EXPECT_TRUE(event.has_event_count_event()); |
| EXPECT_EQ(1, event.event_count_event().event_code_size()); |
| EXPECT_EQ(Succeeded, event.event_count_event().event_code(0)); |
| EXPECT_EQ(absl::StrCat(kCustomerId, "/", kProjectId), event.event_count_event().component()); |
| EXPECT_LE(40, event.event_count_event().count()); // This is an estimate. |
| } |
| |
| // We add two Observations, confirm that they are not immediately sent, |
| // invoke RequestSendSoon, wait for the Observations to be sent, confirm |
| // that they were sent together in a single Envelope. |
| TEST_F(ShippingManagerTest, SendTwo) { |
| // Init with a very long time for the regular schedule interval but |
| // zero for the minimum interval so the test doesn't have to wait. |
| Init(kMaxSeconds, std::chrono::seconds::zero()); |
| |
| EXPECT_EQ(ObservationStore::kOk, AddObservation(40)); |
| EXPECT_EQ(ObservationStore::kOk, AddObservation(40)); |
| |
| CheckCallCount(0, 0); |
| |
| // Request send soon. |
| shipping_manager_->RequestSendSoon(); |
| |
| // Wait for both Observations to be sent. |
| shipping_manager_->WaitUntilIdle(kMaxSeconds); |
| |
| // Confirm the two Observations were sent together in a single Envelope. |
| EXPECT_EQ(1u, shipping_manager_->num_send_attempts()); |
| EXPECT_EQ(0u, shipping_manager_->num_failed_attempts()); |
| EXPECT_EQ(util::OK, shipping_manager_->last_send_status().error_code()); |
| CheckCallCount(1, 2); |
| } |
| |
| // Performs the same test as SendTwo but makes sure the right values were logged to the internal |
| // metrics object. |
| TEST_F(ShippingManagerTest, SendTwoLoggedMetrics) { |
| Init(kMaxSeconds, std::chrono::seconds::zero()); |
| |
| FakeLogger logger; |
| shipping_manager_->ResetInternalMetrics(&logger); |
| |
| EXPECT_EQ(ObservationStore::kOk, AddObservation(40)); |
| EXPECT_EQ(ObservationStore::kOk, AddObservation(40)); |
| |
| CheckCallCount(0, 0); |
| shipping_manager_->RequestSendSoon(); |
| shipping_manager_->WaitUntilIdle(kMaxSeconds); |
| |
| // Same checks as the previous test. |
| EXPECT_EQ(1u, shipping_manager_->num_send_attempts()); |
| EXPECT_EQ(0u, shipping_manager_->num_failed_attempts()); |
| EXPECT_EQ(util::OK, shipping_manager_->last_send_status().error_code()); |
| CheckCallCount(1, 2); |
| |
| // Attempt and Succeed in the Shipping Manager (2), Attempt and Succeed in Clearcut Uploader for |
| // the envelope with both 1.0 and 1.1 metrics (4), and an IdleObservationUpload when the envelopes |
| // are sent (1). |
| EXPECT_EQ(7, logger.call_count()); |
| |
| auto event = logger.last_event_logged(); // The last Succeed for an Observation Batch. |
| EXPECT_TRUE(event.has_event_count_event()); |
| EXPECT_EQ(1, event.event_count_event().event_code_size()); |
| EXPECT_EQ(Succeeded, event.event_count_event().event_code(0)); |
| EXPECT_EQ(absl::StrCat(kCustomerId, "/", kProjectId), event.event_count_event().component()); |
| EXPECT_LE(2 * 40, event.event_count_event().count()); // This is an estimate. |
| } |
| |
| // Tries to add an Observation that is too big. Tests that kObservationTooBig |
| // is returned. |
| TEST_F(ShippingManagerTest, ObservationTooBig) { |
| // Init with a very long time for the regular schedule interval but |
| // zero for the minimum interval so the test doesn't have to wait. |
| Init(kMaxSeconds, std::chrono::seconds::zero()); |
| |
| // Add one observation that is too big. |
| EXPECT_EQ(ObservationStore::kObservationTooBig, AddObservation(60)); |
| } |
| |
| // Add multiple Observations and allow them to be sent on the regular |
| // schedule. |
| TEST_F(ShippingManagerTest, ScheduledSend) { |
| // We set both schedule_interval_ and min_interval_ to zero so the test |
| // does not have to wait. |
| Init(std::chrono::seconds::zero(), std::chrono::seconds::zero()); |
| |
| // Add two Observations but do not invoke RequestSendSoon() and do |
| // not add enough Observations to exceed envelope_send_threshold_size_. |
| for (int i = 0; i < 2; i++) { |
| EXPECT_EQ(ObservationStore::kOk, AddObservation(40)); |
| } |
| // Wait for the scheduled send |
| shipping_manager_->WaitUntilIdle(kMaxSeconds); |
| |
| // We do not check the number of sends because that depends on the |
| // timing interaction of the test thread and the worker thread and so it |
| // would be flaky. Just check that all 3 Observations were sent. |
| std::unique_lock<std::mutex> lock(http_client_->mutex); |
| EXPECT_EQ(2, http_client_->observation_count); |
| EXPECT_EQ(util::OK, shipping_manager_->last_send_status().error_code()); |
| } |
| |
| // Tests the ShippingManager in the case that the ObservationStore returns |
| // kStoreFull. |
| // |
| // kMaxBytesTotal = 1000 and we are using Observations of size 40 bytes. |
| // 40 * 25 = 1000. The MemoryObservationStore will allow the 26th Observation |
| // to be added and then reject the 27th Observation. |
| TEST_F(ShippingManagerTest, ExceedMaxBytesTotal) { |
| // Init with a very long time for the regular schedule interval but |
| // zero for the minimum interval so the test doesn't have to wait. |
| Init(kMaxSeconds, std::chrono::seconds::zero()); |
| // Configure the HttpClient to fail every time. |
| { |
| std::unique_lock<std::mutex> lock(http_client_->mutex); |
| http_client_->http_response_code_to_return = kHttpInternalServerError; |
| } |
| |
| // We can add 15 observations without the ObservationStore reporting that |
| // it is almost full. |
| for (int i = 0; i < 15; i++) { // NOLINT readability-magic-numbers |
| EXPECT_EQ(ObservationStore::kOk, AddObservation(40)); |
| shipping_manager_->WaitUntilWorkerWaiting(kMaxSeconds); |
| } |
| |
| // The ObservationStore was never almost full so the ShippingManager |
| // should not have attempted to do a send. |
| EXPECT_EQ(0u, shipping_manager_->num_send_attempts()); |
| EXPECT_EQ(0u, shipping_manager_->num_failed_attempts()); |
| |
| // The sixteenth Observation causes the ObservationStore to become almost |
| // full and that causes the ShippingManager to attempt a send. |
| EXPECT_EQ(ObservationStore::kOk, AddObservation(40)); |
| |
| shipping_manager_->WaitUntilWorkerWaiting(kMaxSeconds); |
| |
| auto num_send_attempts = shipping_manager_->num_send_attempts(); |
| EXPECT_GT(num_send_attempts, 0u); |
| EXPECT_EQ(num_send_attempts, shipping_manager_->num_failed_attempts()); |
| EXPECT_EQ(util::UNKNOWN, shipping_manager_->last_send_status().error_code()); |
| |
| // We can add 10 more Observations bringing the total to 26, |
| // without the ObservationStore being full. |
| for (int i = 0; i < 10; i++) { |
| EXPECT_EQ(ObservationStore::kOk, AddObservation(40)); |
| } |
| shipping_manager_->WaitUntilWorkerWaiting(kMaxSeconds); |
| |
| // Because the ObservationStore is almost full the ShippingManager has |
| // been attempting additional sends. |
| EXPECT_GT(shipping_manager_->num_send_attempts(), num_send_attempts); |
| num_send_attempts = shipping_manager_->num_send_attempts(); |
| EXPECT_EQ(num_send_attempts, shipping_manager_->num_failed_attempts()); |
| EXPECT_EQ(util::UNKNOWN, shipping_manager_->last_send_status().error_code()); |
| |
| // The 27th Observation will be rejected with kStoreFull. |
| EXPECT_EQ(ObservationStore::kStoreFull, AddObservation(40)); |
| |
| // Now configure the HttpClient to start succeeding, |
| // and reset the counts. |
| { |
| std::unique_lock<std::mutex> lock(http_client_->mutex); |
| http_client_->http_response_code_to_return = kHttpOk; |
| http_client_->send_call_count = 0; |
| http_client_->observation_count = 0; |
| } |
| |
| // Send all 26 of the accumulated Observations. |
| shipping_manager_->RequestSendSoon(); |
| shipping_manager_->WaitUntilIdle(kMaxSeconds); |
| |
| // All 26 successfully-added Observations should have been sent in six |
| // envelopes |
| CheckCallCount(6, 26); // NOLINT readability-magic-numbers |
| EXPECT_EQ(util::OK, shipping_manager_->last_send_status().error_code()); |
| EXPECT_GT(shipping_manager_->num_send_attempts(), num_send_attempts); |
| num_send_attempts = shipping_manager_->num_send_attempts(); |
| EXPECT_GT(num_send_attempts, shipping_manager_->num_failed_attempts()); |
| EXPECT_EQ(util::OK, shipping_manager_->last_send_status().error_code()); |
| |
| // Now we can add a 27th Observation and send it. |
| EXPECT_EQ(ObservationStore::kOk, AddObservation(40)); |
| shipping_manager_->RequestSendSoon(); |
| shipping_manager_->WaitUntilIdle(kMaxSeconds); |
| CheckCallCount(7, 27); // NOLINT readability-magic-numbers |
| EXPECT_EQ(util::OK, shipping_manager_->last_send_status().error_code()); |
| } |
| |
| // Tests that when the total amount of accumulated Observation data exceeds |
| // total_bytes_send_threshold_ then RequestSendSoon() will be invoked. |
| TEST_F(ShippingManagerTest, TotalBytesSendThreshold) { |
| // Init with a very long time for the regular schedule interval but |
| // zero for the minimum interval so the test doesn't have to wait. |
| Init(kMaxSeconds, std::chrono::seconds::zero()); |
| |
| // Configure the HttpClient to fail every time so that we can |
| // accumulate Observation data in memory. |
| { |
| std::unique_lock<std::mutex> lock(http_client_->mutex); |
| http_client_->http_response_code_to_return = kHttpInternalServerError; |
| } |
| |
| // total_bytes_send_threshold_ = 0.6 * max_bytes_total_. |
| // kMaxBytesTotal = 1000 so total_bytes_send_threshold_ = 600. |
| // We are using Observations of size 40 and 40 * 15 = 600 so the first |
| // Observation that causes us to exceed total_bytes_send_threshold_ is #16. |
| // |
| // Add 15 Observations. We want to do this in such a way that we don't |
| // exceed max_bytes_per_envelope_. Each time we will invoke |
| // RequestSendSoon() and then WaitUntilWorkerWaiting() so that we know that |
| // between invocations of AddObservtion() the worker thread will complete |
| // one execution of SendAllEnvelopes(). |
| for (int i = 0; i < 15; i++) { // NOLINT readability-magic-numbers |
| EXPECT_EQ(ObservationStore::kOk, AddObservation(40)); |
| if (i < 15) { // NOLINT readability-magic-numbers |
| // After having added 15 observations we have exceeded the |
| // ObservationStore's almost_full_threshold_ and this means that each |
| // invocation of AddEncryptedObservation() followed by a |
| // NotifyObservationsAdded() automatically invokes RequestSendSoon() and |
| // so we don't want to invoke it again here. |
| shipping_manager_->RequestSendSoon(); |
| } |
| shipping_manager_->WaitUntilWorkerWaiting(kMaxSeconds); |
| } |
| |
| // We expect there to have been 45 calls to SendRetryer::SendToShuffler() in |
| // which the Envelopes sent contained a total of 195 Observations. Here we |
| // explain how this was calculated. See the comments at the top of the file on |
| // kMinEnvelopeSendSize. There it is explained that the ObservationStore will |
| // attempt to bundle together up to 5 observations into a sinle envelope |
| // before sending. None of the sends succeed so the ObservationStore keeps |
| // accumulating more Envelopes containing 5 Observations that failed to send. |
| // Below is the complete pattern of send attempts. Each set in braces |
| // represents one execution of SendAllEnvelopes(). The numbers in each set |
| // represent the invocations of SendEnvelopeToBackend() with an Envelope that |
| // contains that many observations. |
| // |
| // {1, 1, 1}, {2, 2, 2}, {3, 3, 3}, {4, 4, 4}, {5, 5, 5}, {5, 5, 5}, ... |
| // |
| // Thus the total number of send attempts is the total number of numbers: |
| // 3 * 15 = 45 |
| // |
| // And the total number of Observations is the sum of all the numbers: |
| // (1 + 2 + 3 + 4 + 5) * 3 + (5*3*(15-5)) = 195 |
| CheckCallCount(45, 195); // NOLINT readability-magic-numbers |
| |
| // Now configure the HttpClient to start succeeding, |
| // and reset the counts. |
| { |
| std::unique_lock<std::mutex> lock(http_client_->mutex); |
| http_client_->http_response_code_to_return = kHttpOk; |
| http_client_->send_call_count = 0; |
| http_client_->observation_count = 0; |
| } |
| |
| // Now we send the 16th Observattion. But notice that we do *not* invoke |
| // RequestSendSoon() this time. So the reason the Observations get sent |
| // now is because we are exceeding total_bytes_send_threshold_. |
| EXPECT_EQ(ObservationStore::kOk, AddObservation(40)); |
| |
| shipping_manager_->WaitUntilIdle(kMaxSeconds); |
| |
| // All 16 Observations should have been sent in 4 envelopes as {5, 5, 5, 1}. |
| CheckCallCount(4, 16); // NOLINT readability-magic-numbers |
| } |
| |
| // Test the version of the method RequestSendSoon() that takes a callback. |
| // We test that the callback is invoked with success = true when the send |
| // succeeds and with success = false when the send fails. |
| TEST_F(ShippingManagerTest, RequestSendSoonWithCallback) { |
| Init(kMaxSeconds, std::chrono::seconds::zero()); |
| |
| // Invoke RequestSendSoon() with a callback before any Observations are |
| // added. |
| bool captured_success_arg = false; |
| shipping_manager_->RequestSendSoon( |
| [&captured_success_arg](bool success) { captured_success_arg = success; }); |
| shipping_manager_->WaitUntilIdle(kMaxSeconds); |
| |
| // Check that the callback was invoked synchronously with success = true. |
| CheckCallCount(0, 0); |
| EXPECT_EQ(0u, shipping_manager_->num_send_attempts()); |
| EXPECT_EQ(0u, shipping_manager_->num_failed_attempts()); |
| EXPECT_TRUE(captured_success_arg); |
| |
| // Arrange for the first send to fail. |
| { |
| std::unique_lock<std::mutex> lock(http_client_->mutex); |
| http_client_->http_response_code_to_return = kHttpInternalServerError; |
| } |
| |
| // Add an Observation, invoke RequestSendSoon() with a callback. |
| shipping_manager_->WaitUntilIdle(kMaxSeconds); |
| EXPECT_EQ(ObservationStore::kOk, AddObservation(31)); |
| shipping_manager_->RequestSendSoon( |
| [&captured_success_arg](bool success) { captured_success_arg = success; }); |
| shipping_manager_->WaitUntilWorkerWaiting(kMaxSeconds); |
| |
| // Check that the callback was invoked with success = false. |
| CheckCallCount(3, 3); |
| EXPECT_EQ(3u, shipping_manager_->num_send_attempts()); |
| EXPECT_EQ(3u, shipping_manager_->num_failed_attempts()); |
| EXPECT_FALSE(captured_success_arg); |
| |
| // Arrange for the next send to succeed. |
| { |
| std::unique_lock<std::mutex> lock(http_client_->mutex); |
| http_client_->http_response_code_to_return = kHttpOk; |
| } |
| |
| // Don't add another Observation but invoke RequestSendSoon() with a |
| // callback. |
| shipping_manager_->RequestSendSoon( |
| [&captured_success_arg](bool success) { captured_success_arg = success; }); |
| shipping_manager_->WaitUntilIdle(kMaxSeconds); |
| |
| // Check that the callback was invoked with success = true. |
| CheckCallCount(4, 4); |
| EXPECT_EQ(4u, shipping_manager_->num_send_attempts()); |
| EXPECT_EQ(3u, shipping_manager_->num_failed_attempts()); |
| EXPECT_TRUE(captured_success_arg); |
| |
| // Arrange for the next send to fail. |
| { |
| std::unique_lock<std::mutex> lock(http_client_->mutex); |
| http_client_->http_response_code_to_return = kHttpInternalServerError; |
| } |
| |
| // Invoke RequestSendSoon without a callback just so that there is an |
| // Observation cached in the inner EnvelopeMaker. |
| EXPECT_EQ(ObservationStore::kOk, AddObservation(31)); |
| shipping_manager_->RequestSendSoon(); |
| shipping_manager_->WaitUntilWorkerWaiting(kMaxSeconds); |
| CheckCallCount(7, 7); // NOLINT readability-magic-numbers |
| EXPECT_EQ(7u, shipping_manager_->num_send_attempts()); |
| EXPECT_EQ(6u, shipping_manager_->num_failed_attempts()); |
| |
| // Arrange for the next send to succeed. |
| { |
| std::unique_lock<std::mutex> lock(http_client_->mutex); |
| http_client_->http_response_code_to_return = kHttpOk; |
| } |
| |
| // Add an Observation, invoke RequestSendSoon() with a callback. |
| EXPECT_EQ(ObservationStore::kOk, AddObservation(31)); |
| shipping_manager_->RequestSendSoon( |
| [&captured_success_arg](bool success) { captured_success_arg = success; }); |
| shipping_manager_->WaitUntilIdle(kMaxSeconds); |
| |
| // Check that the callback was invoked with success = true. |
| CheckCallCount(8, 9); // NOLINT readability-magic-numbers |
| EXPECT_EQ(8u, shipping_manager_->num_send_attempts()); |
| EXPECT_EQ(6u, shipping_manager_->num_failed_attempts()); |
| EXPECT_TRUE(captured_success_arg); |
| } |
| |
| TEST_F(ShippingManagerTest, NoDelayWhenDeviceStateIdle) { |
| auto listener = std::make_unique<FakeActivityListener>(); |
| listener->SetTestState(IDLE); |
| |
| // We set both schedule_interval_ and min_interval_ to zero so the test |
| // does not have to wait. |
| Init(std::chrono::seconds::zero(), std::chrono::seconds::zero(), std::move(listener)); |
| EXPECT_EQ(ObservationStore::kOk, AddObservation(40)); |
| |
| // Wait for the scheduled send |
| shipping_manager_->WaitUntilIdle(kMaxSeconds); |
| |
| EXPECT_EQ(0u, shipping_manager_->num_delayed_sends()); |
| EXPECT_EQ(1u, shipping_manager_->num_send_attempts()); |
| } |
| |
| TEST_F(ShippingManagerTest, NoDelayWhenDeviceStateUnknown) { |
| auto listener = std::make_unique<FakeActivityListener>(); |
| listener->SetTestState(UNKNOWN); |
| |
| // We set both schedule_interval_ and min_interval_ to zero so the test |
| // does not have to wait. |
| Init(std::chrono::seconds::zero(), std::chrono::seconds::zero(), std::move(listener)); |
| EXPECT_EQ(ObservationStore::kOk, AddObservation(40)); |
| |
| // Wait for the scheduled send |
| shipping_manager_->WaitUntilIdle(kMaxSeconds); |
| |
| EXPECT_EQ(0u, shipping_manager_->num_delayed_sends()); |
| EXPECT_EQ(1u, shipping_manager_->num_send_attempts()); |
| } |
| |
| // Performs the same test as DelaysUntilDeviceStateIdle but the device state becomes IDLE. |
| TEST_F(ShippingManagerTest, DelaysUntilDeviceStateIdle) { |
| auto listener = std::make_unique<FakeActivityListener>(); |
| FakeActivityListener* listener_ptr = listener.get(); |
| listener_ptr->SetTestState(ACTIVE); |
| |
| // We set both schedule_interval_ and min_interval_ to zero so the test |
| // does not have to wait. |
| Init(std::chrono::seconds::zero(), std::chrono::seconds::zero(), std::move(listener)); |
| |
| // Ensure that the ShippingManager uploads because the device became idle, not |
| // because it forces upload after two failed attempts. |
| { |
| std::unique_lock<std::mutex> lock(http_client_->mutex); |
| |
| EXPECT_EQ(ObservationStore::kOk, AddObservation(40)); |
| shipping_manager_->WaitUntilWorkerWaitingIdleDevice(kMaxSeconds); |
| |
| EXPECT_EQ(1u, shipping_manager_->num_delayed_sends()); |
| EXPECT_EQ(0u, shipping_manager_->num_send_attempts()); |
| |
| listener_ptr->SetTestState(IDLE); |
| listener_ptr->Notify(); |
| } |
| |
| shipping_manager_->WaitUntilIdle(kMaxSeconds); |
| EXPECT_EQ(1u, shipping_manager_->num_delayed_sends()); |
| EXPECT_EQ(1u, shipping_manager_->num_send_attempts()); |
| } |
| |
| // Performs the same test as DelaysUntilDeviceStateIdle but the device state becomes UNKNOWN. |
| TEST_F(ShippingManagerTest, DelaysUntilDeviceStateUnknown) { |
| auto listener = std::make_unique<FakeActivityListener>(); |
| FakeActivityListener* listener_ptr = listener.get(); |
| listener_ptr->SetTestState(ACTIVE); |
| |
| // We set both schedule_interval_ and min_interval_ to zero so the test |
| // does not have to wait. |
| Init(std::chrono::seconds::zero(), std::chrono::seconds::zero(), std::move(listener)); |
| |
| // Ensure that the ShippingManager uploads because the device became idle, not |
| // because it forces upload after two failed attempts. |
| { |
| std::unique_lock<std::mutex> lock(http_client_->mutex); |
| |
| EXPECT_EQ(ObservationStore::kOk, AddObservation(40)); |
| shipping_manager_->WaitUntilWorkerWaitingIdleDevice(kMaxSeconds); |
| |
| EXPECT_EQ(1u, shipping_manager_->num_delayed_sends()); |
| EXPECT_EQ(0u, shipping_manager_->num_send_attempts()); |
| |
| listener_ptr->SetTestState(UNKNOWN); |
| listener_ptr->Notify(); |
| } |
| |
| shipping_manager_->WaitUntilIdle(kMaxSeconds); |
| EXPECT_EQ(1u, shipping_manager_->num_delayed_sends()); |
| EXPECT_EQ(1u, shipping_manager_->num_send_attempts()); |
| } |
| |
| TEST_F(ShippingManagerTest, DelaysOnceAtMost) { |
| auto listener = std::make_unique<FakeActivityListener>(); |
| listener->SetTestState(ACTIVE); |
| |
| // We set both schedule_interval_ and min_interval_ to zero so the test |
| // does not have to wait. |
| Init(std::chrono::seconds::zero(), std::chrono::seconds::zero(), std::move(listener)); |
| EXPECT_EQ(ObservationStore::kOk, AddObservation(40)); |
| |
| shipping_manager_->WaitUntilIdle(kMaxSeconds); |
| |
| EXPECT_EQ(1u, shipping_manager_->num_delayed_sends()); |
| EXPECT_EQ(1u, shipping_manager_->num_send_attempts()); |
| } |
| |
| TEST_F(ShippingManagerTest, DestructWhileWaitingForIdleDevice) { |
| auto listener = std::make_unique<FakeActivityListener>(); |
| FakeActivityListener* listener_ptr = listener.get(); |
| listener_ptr->SetTestState(ACTIVE); |
| |
| // We set both schedule_interval_ and min_interval_ to zero so the test |
| // does not have to wait. |
| Init(std::chrono::seconds::zero(), std::chrono::seconds::zero(), std::move(listener)); |
| EXPECT_EQ(ObservationStore::kOk, AddObservation(40)); |
| shipping_manager_->WaitUntilWorkerWaitingIdleDevice(kMaxSeconds); |
| } |
| |
| constexpr char test_file_base[] = "/tmp/local_shipping_manager_test"; |
| |
| std::string GetTestFileName(const std::string& base) { |
| std::stringstream fname; |
| fname << base << "_" |
| << std::chrono::duration_cast<std::chrono::milliseconds>( |
| std::chrono::system_clock::now().time_since_epoch()) |
| .count(); |
| return fname.str(); |
| } |
| |
| class LocalShippingManagerTest : public ::testing::Test { |
| public: |
| LocalShippingManagerTest() |
| : observation_store_(kMaxBytesPerObservation, kMaxBytesPerEnvelope, kMaxBytesTotal), |
| test_file_name_(GetTestFileName(test_file_base)) {} |
| |
| protected: |
| void SetUp() override { |
| fs_.Delete(test_file_name_); |
| shipping_manager_ = |
| std::make_unique<LocalShippingManager>(&observation_store_, test_file_name_, &fs_); |
| shipping_manager_->Start(); |
| } |
| |
| ObservationStore::StoreStatus AddObservation(size_t num_bytes) { |
| auto retval = observation_store_.StoreObservation(CreateObservationMessage(num_bytes), |
| CreateObservationMetadata()); |
| shipping_manager_->NotifyObservationsAdded(); |
| return retval; |
| } |
| |
| void CheckObservationCount(int expected_observation_count) { |
| auto ifs = fs_.NewProtoInputStream(test_file_name_); |
| ASSERT_EQ(util::StatusCode::OK, ifs.status().error_code()); |
| int observation_count = 0; |
| Envelope envelope; |
| bool clean_eof = false; |
| while (google::protobuf::util::ParseDelimitedFromZeroCopyStream( |
| &envelope, ifs.ValueOrDie().get(), &clean_eof)) { |
| for (const auto& batch : envelope.batch()) { |
| observation_count += batch.encrypted_observation_size(); |
| } |
| envelope.Clear(); |
| } |
| EXPECT_EQ(true, clean_eof); |
| EXPECT_EQ(expected_observation_count, observation_count); |
| } |
| |
| private: |
| MemoryObservationStore observation_store_; |
| |
| protected: |
| std::string test_file_name_; |
| std::unique_ptr<ShippingManager> shipping_manager_; |
| util::PosixFileSystem fs_; |
| }; |
| |
| // Add multiple Observations and allow them to be saved. |
| TEST_F(LocalShippingManagerTest, ScheduledSave) { |
| // Add two Observations but do not invoke RequestSendSoon() and do |
| // not add enough Observations to exceed envelope_send_threshold_size_. |
| for (int i = 0; i < 2; i++) { |
| EXPECT_EQ(ObservationStore::kOk, AddObservation(40)); |
| } |
| // Wait for the save to disk. |
| shipping_manager_->WaitUntilIdle(kMaxSeconds); |
| |
| CheckObservationCount(2); |
| EXPECT_EQ(util::OK, shipping_manager_->last_send_status().error_code()); |
| } |
| |
| } // namespace cobalt::uploader |