[LocalAggregateStorage] Always write before shutdown

- Makes sure that the shut down only occurs _after_ a write.
- Adds condition variable 'shutdown_complete_notifier' so ShutDown can
  return early if the shutdown takes too long.
- ~DelayedLocalAggregateStorage() still waits for the thread to join for
  a clean shutdown.

Ran 300 times on local fuchsia device with no errors.
Ran 300 times on standalone linux build with no errors.

Tested-With: fx test cobalt_core_tests -- --gtest_filter="DelayedLocalAggregateStorageTest.ShutDownIsFast" --gtest_repeat=300
Tested-With: ./cobaltb.py build && out/tests/cpp/cobalt_core_tests --gtest_filter=DelayedLocalAggregateStorageTest.ShutDownIsFast --gtest_repeat=300
Bug: 88444
Bug: 86668
Change-Id: Ic0c898a6ee9b2cea32f97dbad841fd47d2d585ce
Reviewed-on: https://fuchsia-review.googlesource.com/c/cobalt/+/604944
Fuchsia-Auto-Submit: Zach Bush <zmbush@google.com>
Commit-Queue: Auto-Submit <auto-submit@fuchsia-infra.iam.gserviceaccount.com>
Reviewed-by: Cameron Dale <camrdale@google.com>
diff --git a/src/local_aggregation_1_1/local_aggregate_storage/delayed_local_aggregate_storage.cc b/src/local_aggregation_1_1/local_aggregate_storage/delayed_local_aggregate_storage.cc
index 90281ec..c589ffb 100644
--- a/src/local_aggregation_1_1/local_aggregate_storage/delayed_local_aggregate_storage.cc
+++ b/src/local_aggregation_1_1/local_aggregate_storage/delayed_local_aggregate_storage.cc
@@ -21,6 +21,7 @@
 
 constexpr std::chrono::milliseconds DelayedLocalAggregateStorage::kDefaultWritebackFrequency =
     std::chrono::minutes(5);
+constexpr std::chrono::milliseconds kMaxShutdownDelay = std::chrono::seconds(4);
 
 DelayedLocalAggregateStorage::DelayedLocalAggregateStorage(
     std::string base_directory, util::FileSystem *fs,
@@ -36,19 +37,26 @@
   writeback_thread_ = std::move(t);
 }
 
-DelayedLocalAggregateStorage::~DelayedLocalAggregateStorage() { ShutDown(); }
+DelayedLocalAggregateStorage::~DelayedLocalAggregateStorage() {
+  ShutDown();
+  if (writeback_thread_.joinable()) {
+    VLOG(4) << "~DelayedLocalAggregateStorage(): Waiting for writeback thread to exit...";
+    writeback_thread_.join();
+  }
+}
 
 void DelayedLocalAggregateStorage::ShutDown() {
+  VLOG(4) << "DelayedLocalAggregateStorage::ShutDown(): shut-down requested.";
   {
     auto locked = state_.lock();
     locked->shut_down = true;
     locked->shutdown_notifier.notify_all();
     locked->data_save_notifier.notify_all();
-  }
-  VLOG(4) << "DelayedLocalAggregateStorage::ShutDown(): shut-down requested.";
-  if (writeback_thread_.joinable()) {
-    VLOG(4) << "DelayedLocalAggregateStorage::ShutDown(): Waiting for writeback thread to exit...";
-    writeback_thread_.join();
+    if (!locked->shutdown_complete_notifier.wait_for(
+            locked, kMaxShutdownDelay, [&locked] { return locked->shut_down_complete; })) {
+      LOG(ERROR) << "DelayedLocalAggregateStorage::ShutDown(): Writeback thread did not shut down "
+                    "in time. Data loss likely.";
+    }
   }
 }
 
@@ -117,10 +125,6 @@
 void DelayedLocalAggregateStorage::Run() {
   while (true) {
     auto locked_state = state_.lock();
-    if (locked_state->shut_down) {
-      return;
-    }
-
     VLOG(4) << "DelayedLocalAggregateStorage worker: sleeping for "
             << std::chrono::duration_cast<std::chrono::seconds>(writeback_frequency_).count()
             << " seconds.";
@@ -130,7 +134,7 @@
                                              [&locked_state] { return locked_state->shut_down; });
 
     VLOG(4) << "DelayedLocalAggregateStorage worker: waking up from sleep. shut_down="
-            << locked_state->shut_down;
+            << locked_state->shut_down << ", data_modified=" << locked_state->data_modified;
 
     if (locked_state->data_modified) {
       locked_state->data_save_start_notifier.notify_all();
@@ -165,11 +169,17 @@
       VLOG(4) << "DelayedLocalAggregateStorage worker: data has been modified, writing to disk.";
       util::Status status = proto_store_.Write(aggregates_);
       if (!status.ok()) {
-        LOG(WARNING) << "Failed to snapshot GlobalAggregates to disk: " << status.error_message();
+        LOG(WARNING) << "Failed to snapshot GlobalAggregates to disk: " << status;
       }
       locked_state->data_save_notifier.notify_all();
       locked_state->data_modified = false;
     }
+
+    if (locked_state->shut_down) {
+      locked_state->shut_down_complete = true;
+      locked_state->shutdown_complete_notifier.notify_all();
+      return;
+    }
   }
 }
 
diff --git a/src/local_aggregation_1_1/local_aggregate_storage/delayed_local_aggregate_storage.h b/src/local_aggregation_1_1/local_aggregate_storage/delayed_local_aggregate_storage.h
index 817c8ff..ce92693 100644
--- a/src/local_aggregation_1_1/local_aggregate_storage/delayed_local_aggregate_storage.h
+++ b/src/local_aggregation_1_1/local_aggregate_storage/delayed_local_aggregate_storage.h
@@ -116,6 +116,9 @@
     bool shut_down = false;
     // Used to wake up threads when a shutdown has been requested.
     std::condition_variable_any shutdown_notifier;
+    // Used to notify the ShutDown method that the shutdown has completed.
+    bool shut_down_complete = false;
+    std::condition_variable_any shutdown_complete_notifier;
 
     // Set to true whenever SaveMetricAggregate is called.
     bool data_modified = false;
diff --git a/src/local_aggregation_1_1/local_aggregate_storage/delayed_local_aggregate_storage_test.cc b/src/local_aggregation_1_1/local_aggregate_storage/delayed_local_aggregate_storage_test.cc
index f22cbc2..01ddadd 100644
--- a/src/local_aggregation_1_1/local_aggregate_storage/delayed_local_aggregate_storage_test.cc
+++ b/src/local_aggregation_1_1/local_aggregate_storage/delayed_local_aggregate_storage_test.cc
@@ -104,7 +104,7 @@
   storage_->WaitUntilSave(kMaxWait);
 
   auto registry = GetRegistry();
-  google::protobuf::RepeatedPtrField<MetricDefinition>* metrics =
+  google::protobuf::RepeatedPtrField<MetricDefinition> *metrics =
       registry->mutable_customers(0)->mutable_projects(0)->mutable_metrics();
   metrics->erase(metrics->begin() + kOccurrenceMetricMetricIndex);
   InitStorage(std::move(registry));
@@ -146,8 +146,6 @@
   ASSERT_FALSE(storage_->HasMetricAggregate(123, 100, 1));
 }
 
-// TODO(fxbug.dev/88444): Enable this test in fuchsia once failures have been root caused.
-#ifndef __Fuchsia__
 TEST_F(DelayedLocalAggregateStorageTest, ShutDownIsFast) {
   // Construct a storage with an extremely long writeback frequency.
   InitStorage(GetRegistry(), std::chrono::hours(99999));
@@ -171,7 +169,6 @@
     ASSERT_EQ(agg.aggregate()->version(), 100);
   }
 }
-#endif
 
 class DelayedLocalAggregateStorageTestDeadlock : public util::testing::TestWithFiles {};
 TEST_F(DelayedLocalAggregateStorageTestDeadlock, NoDeadlock) {