[LocalAggregateStorage] Always write before shutdown
- Makes sure that the shut down only occurs _after_ a write.
- Adds condition variable 'shutdown_complete_notifier' so ShutDown can
return early if the shutdown takes too long.
- ~DelayedLocalAggregateStorage() still waits for the thread to join for
a clean shutdown.
Ran 300 times on local fuchsia device with no errors.
Ran 300 times on standalone linux build with no errors.
Tested-With: fx test cobalt_core_tests -- --gtest_filter="DelayedLocalAggregateStorageTest.ShutDownIsFast" --gtest_repeat=300
Tested-With: ./cobaltb.py build && out/tests/cpp/cobalt_core_tests --gtest_filter=DelayedLocalAggregateStorageTest.ShutDownIsFast --gtest_repeat=300
Bug: 88444
Bug: 86668
Change-Id: Ic0c898a6ee9b2cea32f97dbad841fd47d2d585ce
Reviewed-on: https://fuchsia-review.googlesource.com/c/cobalt/+/604944
Fuchsia-Auto-Submit: Zach Bush <zmbush@google.com>
Commit-Queue: Auto-Submit <auto-submit@fuchsia-infra.iam.gserviceaccount.com>
Reviewed-by: Cameron Dale <camrdale@google.com>
diff --git a/src/local_aggregation_1_1/local_aggregate_storage/delayed_local_aggregate_storage.cc b/src/local_aggregation_1_1/local_aggregate_storage/delayed_local_aggregate_storage.cc
index 90281ec..c589ffb 100644
--- a/src/local_aggregation_1_1/local_aggregate_storage/delayed_local_aggregate_storage.cc
+++ b/src/local_aggregation_1_1/local_aggregate_storage/delayed_local_aggregate_storage.cc
@@ -21,6 +21,7 @@
constexpr std::chrono::milliseconds DelayedLocalAggregateStorage::kDefaultWritebackFrequency =
std::chrono::minutes(5);
+constexpr std::chrono::milliseconds kMaxShutdownDelay = std::chrono::seconds(4);
DelayedLocalAggregateStorage::DelayedLocalAggregateStorage(
std::string base_directory, util::FileSystem *fs,
@@ -36,19 +37,26 @@
writeback_thread_ = std::move(t);
}
-DelayedLocalAggregateStorage::~DelayedLocalAggregateStorage() { ShutDown(); }
+DelayedLocalAggregateStorage::~DelayedLocalAggregateStorage() {
+ ShutDown();
+ if (writeback_thread_.joinable()) {
+ VLOG(4) << "~DelayedLocalAggregateStorage(): Waiting for writeback thread to exit...";
+ writeback_thread_.join();
+ }
+}
void DelayedLocalAggregateStorage::ShutDown() {
+ VLOG(4) << "DelayedLocalAggregateStorage::ShutDown(): shut-down requested.";
{
auto locked = state_.lock();
locked->shut_down = true;
locked->shutdown_notifier.notify_all();
locked->data_save_notifier.notify_all();
- }
- VLOG(4) << "DelayedLocalAggregateStorage::ShutDown(): shut-down requested.";
- if (writeback_thread_.joinable()) {
- VLOG(4) << "DelayedLocalAggregateStorage::ShutDown(): Waiting for writeback thread to exit...";
- writeback_thread_.join();
+ if (!locked->shutdown_complete_notifier.wait_for(
+ locked, kMaxShutdownDelay, [&locked] { return locked->shut_down_complete; })) {
+ LOG(ERROR) << "DelayedLocalAggregateStorage::ShutDown(): Writeback thread did not shut down "
+ "in time. Data loss likely.";
+ }
}
}
@@ -117,10 +125,6 @@
void DelayedLocalAggregateStorage::Run() {
while (true) {
auto locked_state = state_.lock();
- if (locked_state->shut_down) {
- return;
- }
-
VLOG(4) << "DelayedLocalAggregateStorage worker: sleeping for "
<< std::chrono::duration_cast<std::chrono::seconds>(writeback_frequency_).count()
<< " seconds.";
@@ -130,7 +134,7 @@
[&locked_state] { return locked_state->shut_down; });
VLOG(4) << "DelayedLocalAggregateStorage worker: waking up from sleep. shut_down="
- << locked_state->shut_down;
+ << locked_state->shut_down << ", data_modified=" << locked_state->data_modified;
if (locked_state->data_modified) {
locked_state->data_save_start_notifier.notify_all();
@@ -165,11 +169,17 @@
VLOG(4) << "DelayedLocalAggregateStorage worker: data has been modified, writing to disk.";
util::Status status = proto_store_.Write(aggregates_);
if (!status.ok()) {
- LOG(WARNING) << "Failed to snapshot GlobalAggregates to disk: " << status.error_message();
+ LOG(WARNING) << "Failed to snapshot GlobalAggregates to disk: " << status;
}
locked_state->data_save_notifier.notify_all();
locked_state->data_modified = false;
}
+
+ if (locked_state->shut_down) {
+ locked_state->shut_down_complete = true;
+ locked_state->shutdown_complete_notifier.notify_all();
+ return;
+ }
}
}
diff --git a/src/local_aggregation_1_1/local_aggregate_storage/delayed_local_aggregate_storage.h b/src/local_aggregation_1_1/local_aggregate_storage/delayed_local_aggregate_storage.h
index 817c8ff..ce92693 100644
--- a/src/local_aggregation_1_1/local_aggregate_storage/delayed_local_aggregate_storage.h
+++ b/src/local_aggregation_1_1/local_aggregate_storage/delayed_local_aggregate_storage.h
@@ -116,6 +116,9 @@
bool shut_down = false;
// Used to wake up threads when a shutdown has been requested.
std::condition_variable_any shutdown_notifier;
+ // Used to notify the ShutDown method that the shutdown has completed.
+ bool shut_down_complete = false;
+ std::condition_variable_any shutdown_complete_notifier;
// Set to true whenever SaveMetricAggregate is called.
bool data_modified = false;
diff --git a/src/local_aggregation_1_1/local_aggregate_storage/delayed_local_aggregate_storage_test.cc b/src/local_aggregation_1_1/local_aggregate_storage/delayed_local_aggregate_storage_test.cc
index f22cbc2..01ddadd 100644
--- a/src/local_aggregation_1_1/local_aggregate_storage/delayed_local_aggregate_storage_test.cc
+++ b/src/local_aggregation_1_1/local_aggregate_storage/delayed_local_aggregate_storage_test.cc
@@ -104,7 +104,7 @@
storage_->WaitUntilSave(kMaxWait);
auto registry = GetRegistry();
- google::protobuf::RepeatedPtrField<MetricDefinition>* metrics =
+ google::protobuf::RepeatedPtrField<MetricDefinition> *metrics =
registry->mutable_customers(0)->mutable_projects(0)->mutable_metrics();
metrics->erase(metrics->begin() + kOccurrenceMetricMetricIndex);
InitStorage(std::move(registry));
@@ -146,8 +146,6 @@
ASSERT_FALSE(storage_->HasMetricAggregate(123, 100, 1));
}
-// TODO(fxbug.dev/88444): Enable this test in fuchsia once failures have been root caused.
-#ifndef __Fuchsia__
TEST_F(DelayedLocalAggregateStorageTest, ShutDownIsFast) {
// Construct a storage with an extremely long writeback frequency.
InitStorage(GetRegistry(), std::chrono::hours(99999));
@@ -171,7 +169,6 @@
ASSERT_EQ(agg.aggregate()->version(), 100);
}
}
-#endif
class DelayedLocalAggregateStorageTestDeadlock : public util::testing::TestWithFiles {};
TEST_F(DelayedLocalAggregateStorageTestDeadlock, NoDeadlock) {