blob: 8015319b8dbeb0b8ab9abb1cecf657d65e93ad50 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved. Use of this source
// code is governed by a BSD-style license that can be found in the LICENSE
// file.
#include "src/observation_store/file_observation_store.h"
#include <ctime>
#include <iomanip>
#include <regex>
#include <utility>
#include "src/logger/internal_metrics.h"
#include "src/logger/internal_metrics_config.cb.h"
#include "src/logger/logger_interface.h"
#include "src/logging.h"
#include "src/observation_store/observation_store_internal.pb.h"
#include "src/public/lib/registry_identifiers.h"
#include "src/tracing.h"
#include "third_party/protobuf/src/google/protobuf/util/delimited_message_util.h"
namespace cobalt::observation_store {
using lib::statusor::StatusOr;
using util::FileSystem;
constexpr char kActiveFileName[] = "in_progress.data";
// The first part of the filename is 13 digits representing the milliseconds
// since the unix epoch. The millisecond timestamp became 13 digits in
// September 2001, and won't be 14 digits until 2286.
//
// The second part of the filename is 10 digits, which is a random number in the
// range 1000000000-9999999999.
const std::regex kFinalizedFileRegex(R"(\d{13}-\d{10}.data)");
constexpr uint32_t kTimestampWidth = 13;
constexpr uint64_t kMinRandomNumber = 1000000000;
constexpr uint64_t kMaxRandomNumber = 9999999999;
FileObservationStore::FileObservationStore(size_t max_bytes_per_observation,
size_t max_bytes_per_envelope, size_t max_bytes_total,
FileSystem &fs, DiagnosticsInterface *diagnostics,
std::string root_directory, std::string name,
logger::InternalMetrics *internal_metrics)
: ObservationStore(max_bytes_per_observation, max_bytes_per_envelope, max_bytes_total),
fs_(fs),
diagnostics_(diagnostics),
root_directory_(std::move(root_directory)),
active_file_name_(FullPath(kActiveFileName)),
name_(std::move(name)) {
ResetInternalMetrics(internal_metrics);
// Check if root_directory_ already exists.
if (!fs_.ListFiles(root_directory_).ok()) {
// If it doesn't exist, create it here.
// TODO(fxbug.dev/3752): If MakeDirectory doesn't work, we should fail over to
// MemoryObservationStore.
CHECK(fs_.MakeDirectory(root_directory_));
}
{
auto fields = protected_fields_.lock();
fields->finalized_bytes = 0;
for (const auto &file : ListFinalizedFiles()) {
fields->finalized_bytes += fs_.FileSize(FullPath(file)).ConsumeValueOr(0);
}
// If there exists an active file, it likely means that the process
// terminated unexpectedly last time. In this case, the file should be
// finalized in order to be Taken from the store.
//
// For simplicity's sake, we attempt to finalize the active_file_name_ while
// ignoring the result. If the operation succeeds, then we rescued the
// active file. Otherwise, there probably was no active file in the first
// place.
FinalizeActiveFile(&fields);
}
}
Status FileObservationStore::StoreObservation(std::unique_ptr<StoredObservation> observation,
std::unique_ptr<ObservationMetadata> metadata) {
if (IsDisabled()) {
return Status::OkStatus();
}
TRACE_DURATION("cobalt_core", "FileObservationStore::StoreObservation");
auto fields = protected_fields_.lock();
size_t obs_size = observation->ByteSizeLong();
internal_metrics_->BytesStored(
logger::PerProjectBytesStoredMigratedMetricDimensionStatus::Attempted, obs_size,
lib::ProjectIdentifier(lib::CustomerIdentifier(metadata->customer_id()),
metadata->project_id()));
google::protobuf::io::ZeroCopyOutputStream *active_file = GetActiveFile(&fields);
if (active_file == nullptr) {
std::stringstream err;
err << "Failed to open " << kActiveFileName;
return Status(StatusCode::UNAVAILABLE, err.str());
}
auto metadata_str = metadata->SerializeAsString();
if (obs_size > max_bytes_per_observation_) {
std::stringstream err;
err << "An observation that was too big was passed in to "
"FileObservationStore::StoreObservation(): "
<< obs_size;
LOG(WARNING) << err.str();
return Status(StatusCode::FAILED_PRECONDITION, err.str());
}
VLOG(6) << name_ << ": StoreObservation() metric=(" << metadata->customer_id() << ","
<< metadata->project_id() << "," << metadata->metric_id() << "), size=" << obs_size
<< ".";
size_t estimated_new_byte_count = fields->finalized_bytes + active_file->ByteCount() + obs_size;
if (estimated_new_byte_count > max_bytes_total_) {
std::stringstream err;
err << name_
<< ": The observation store is full. estimated_new_byte_count=" << estimated_new_byte_count
<< " > " << max_bytes_total_ << ".";
VLOG(4) << err.str();
return Status(StatusCode::RESOURCE_EXHAUSTED, err.str());
}
if (!fields->metadata_written || metadata_str != fields->last_written_metadata) {
VLOG(5) << name_ << ": Writing observation metadata.";
FileObservationStoreRecord stored_metadata;
stored_metadata.mutable_meta_data()->Swap(metadata.get());
if (!google::protobuf::util::SerializeDelimitedToZeroCopyStream(stored_metadata, active_file)) {
std::stringstream err;
err << name_ << ": Unable to write metadata to `" << active_file_name_ << "`";
LOG(WARNING) << err.str();
return Status(StatusCode::DATA_LOSS, err.str());
}
// Swap needed to report the customer id and project id to the internal metrics.
metadata->Swap(stored_metadata.mutable_meta_data());
fields->metadata_written = true;
fields->last_written_metadata = metadata_str;
}
FileObservationStoreRecord stored_message;
stored_message.set_contribution_id(observation->contribution_id());
if (observation->has_encrypted()) {
stored_message.mutable_encrypted_observation()->Swap(observation->mutable_encrypted());
} else if (observation->has_unencrypted()) {
stored_message.mutable_unencrypted_observation()->Swap(observation->mutable_unencrypted());
} else {
std::stringstream err;
err << "Recieved StoredObservation of unexpected type.";
LOG(ERROR) << err.str();
return Status(StatusCode::DATA_LOSS, err.str());
}
if (!google::protobuf::util::SerializeDelimitedToZeroCopyStream(stored_message, active_file)) {
std::stringstream err;
err << "Unable to write encrypted_observation to `" << active_file_name_ << "`";
LOG(WARNING) << err.str();
return Status(StatusCode::DATA_LOSS, err.str());
}
if (active_file->ByteCount() >= static_cast<int64_t>(max_bytes_per_envelope_)) {
VLOG(4) << name_ << ": In-progress file contains " << active_file->ByteCount()
<< " bytes (>= " << max_bytes_per_envelope_ << "). Finalizing it.";
if (!FinalizeActiveFile(&fields)) {
std::stringstream err;
err << "Unable to finalize `" << active_file_name_;
LOG(WARNING) << err.str();
return Status(StatusCode::DATA_LOSS, err.str());
}
}
if (diagnostics_ != nullptr) {
diagnostics_->ObservationStoreUpdated(num_obs_per_report_,
static_cast<int64_t>(estimated_new_byte_count),
static_cast<int64_t>(max_bytes_total_));
}
num_obs_per_report_[lib::ReportIdentifier(*metadata)]++;
internal_metrics_->BytesStored(
logger::PerProjectBytesStoredMigratedMetricDimensionStatus::Succeeded, obs_size,
lib::ProjectIdentifier(lib::CustomerIdentifier(metadata->customer_id()),
metadata->project_id()));
// Cannot cause a loop, since TrackDiskUsage is handled asynchronously.
internal_metrics_->TrackDiskUsage(logger::InternalMetrics::StorageClass::ObservationStore,
static_cast<int64_t>(estimated_new_byte_count),
static_cast<int64_t>(max_bytes_total_));
return Status::OkStatus();
}
bool FileObservationStore::FinalizeActiveFile(
util::ProtectedFields<Fields>::LockedFieldsPtr *fields) {
VLOG(6) << name_ << ": FinalizeActiveFile()";
auto &f = *fields;
// Close the current file (if it is open).
f->active_file = nullptr;
f->metadata_written = false;
auto filesize = fs_.FileSize(active_file_name_);
if (filesize.ok()) {
if (filesize.value() == 0) {
// File exists, but is empty. Let's just delete it instead of renaming.
fs_.Delete(active_file_name_);
return false;
}
} else {
// if !filesize.ok(), the file likely doesn't even exist.
return false;
}
auto new_name = FullPath(filename_generator_.GenerateFilename());
if (!fs_.Rename(active_file_name_, new_name)) {
return false;
}
f->finalized_bytes += fs_.FileSize(new_name).ConsumeValueOr(0);
return true;
}
FileObservationStore::FilenameGenerator::FilenameGenerator()
: FilenameGenerator([]() {
return std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
}) {}
FileObservationStore::FilenameGenerator::FilenameGenerator(std::function<int64_t()> now)
: now_(std::move(now)), random_int_(kMinRandomNumber, kMaxRandomNumber) {}
std::string FileObservationStore::FilenameGenerator::GenerateFilename() const {
std::stringstream date;
std::stringstream fname;
date << std::setfill('0') << std::setw(kTimestampWidth) << now_();
fname << date.str().substr(0, kTimestampWidth) << "-" << random_int_(random_dev_) << ".data";
return fname.str();
}
std::string FileObservationStore::FullPath(const std::string &filename) const {
return root_directory_ + "/" + filename;
}
std::string FileObservationStore::FileEnvelopeHolder::FullPath(const std::string &filename) const {
return root_directory_ + "/" + filename;
}
google::protobuf::io::ZeroCopyOutputStream *FileObservationStore::GetActiveFile(
util::ProtectedFields<Fields>::LockedFieldsPtr *fields) {
auto &f = *fields;
if (f->active_file == nullptr) {
auto stream_or = fs_.NewProtoOutputStream(active_file_name_);
if (!stream_or.ok()) {
LOG_FIRST_N(ERROR, 10) << "Failed to open file. (Perhaps the disk is full): "
<< stream_or.status().error_message();
return nullptr;
}
f->active_file = std::move(stream_or).value();
}
return f->active_file.get();
}
std::vector<std::string> FileObservationStore::ListFinalizedFiles() const {
auto files = fs_.ListFiles(root_directory_).ConsumeValueOr({});
std::vector<std::string> retval;
for (const auto &file : files) {
if (std::regex_match(file, kFinalizedFileRegex)) {
retval.push_back(file);
}
}
return retval;
}
StatusOr<std::string> FileObservationStore::GetOldestFinalizedFile(
util::ProtectedFields<Fields>::LockedFieldsPtr *fields) const {
auto &f = *fields;
std::string found_file_name;
for (const auto &file : ListFinalizedFiles()) {
if (f->files_taken.find(file) == f->files_taken.end()) {
if (found_file_name.empty()) {
found_file_name = file;
} else {
// We compare the file names to try to find the oldest one. This works
// because file names are prefixed with the timestamp when they were
// finalized and that timestamp is always 13 digits long.
//
// A lexigraphic order of fixed length number strings is identical to
// ordering their numerical values.
auto result = strcmp(file.c_str(), found_file_name.c_str());
if (result < 0) {
found_file_name = file;
}
}
}
}
if (found_file_name.empty()) {
return Status(StatusCode::NOT_FOUND, "No finalized file");
}
return found_file_name;
}
std::unique_ptr<ObservationStore::EnvelopeHolder> FileObservationStore::TakeNextEnvelopeHolder() {
auto fields = protected_fields_.lock();
auto oldest_file_name_or = GetOldestFinalizedFile(&fields);
if (!oldest_file_name_or.ok()) {
if (!fields->active_file || fields->active_file->ByteCount() == 0) {
// Active file isn't open or is empty. Return nullptr.
return nullptr;
}
if (!FinalizeActiveFile(&fields)) {
// Finalizing the active file failed, no envelope to return.
return nullptr;
}
oldest_file_name_or = GetOldestFinalizedFile(&fields);
if (!oldest_file_name_or.ok()) {
return nullptr;
}
}
auto oldest_file_name = oldest_file_name_or.value();
fields->files_taken.insert(oldest_file_name);
return std::make_unique<FileEnvelopeHolder>(fs_, *this, root_directory_, oldest_file_name);
}
void FileObservationStore::ReturnEnvelopeHolder(
std::unique_ptr<ObservationStore::EnvelopeHolder> envelope) {
std::unique_ptr<FileObservationStore::FileEnvelopeHolder> env(
static_cast<FileObservationStore::FileEnvelopeHolder *>(envelope.release()));
auto fields = protected_fields_.lock();
for (const auto &file_name : env->file_names()) {
fields->files_taken.erase(file_name);
}
env->clear();
}
size_t FileObservationStore::Size() const {
auto fields = protected_fields_.const_lock();
auto bytes = fields->finalized_bytes;
VLOG(4) << name_ << "::Size(): finalized_bytes=" << bytes;
if (fields->active_file) {
bytes += fields->active_file->ByteCount();
} else {
VLOG(4) << name_ << "::Size(): there is no active file.";
}
VLOG(4) << name_ << "::Size(): total_bytes=" << bytes;
return bytes;
}
bool FileObservationStore::Empty() const { return Size() == 0; }
FileObservationStore::FileEnvelopeHolder::~FileEnvelopeHolder() {
auto fields = store_.protected_fields_.lock();
for (const auto &file_name : file_names_) {
fields->finalized_bytes -= fs_.FileSize(FullPath(file_name)).ConsumeValueOr(0);
fs_.Delete(FullPath(file_name));
}
}
void FileObservationStore::FileEnvelopeHolder::MergeWith(
std::unique_ptr<EnvelopeHolder> container) {
std::unique_ptr<FileEnvelopeHolder> file_container(
static_cast<FileEnvelopeHolder *>(container.release()));
file_names_.insert(file_container->file_names_.begin(), file_container->file_names_.end());
file_container->file_names_.clear();
envelope_.Clear();
cached_file_size_ = 0;
envelope_read_ = false;
}
const Envelope &FileObservationStore::FileEnvelopeHolder::GetEnvelope(
util::EncryptedMessageMaker *encrypter) {
if (envelope_read_) {
return envelope_;
}
std::string serialized_metadata;
std::unordered_map<std::string, ObservationBatch *> batch_map;
ObservationBatch *current_batch;
FileObservationStoreRecord stored;
for (const auto &file_name : file_names_) {
auto iis_or = fs_.NewProtoInputStream(FullPath(file_name));
if (!iis_or.ok()) {
LOG(ERROR) << "WARNING: Trying to open `" << FullPath(file_name)
<< "` failed with error: " << iis_or.status().error_message();
continue;
}
auto iis = std::move(iis_or).value();
bool clean_eof;
while (
google::protobuf::util::ParseDelimitedFromZeroCopyStream(&stored, iis.get(), &clean_eof)) {
if (stored.has_meta_data()) {
std::unique_ptr<ObservationMetadata> current_metadata(stored.release_meta_data());
current_metadata->SerializeToString(&serialized_metadata);
auto iter = batch_map.find(serialized_metadata);
if (iter != batch_map.end()) {
current_batch = iter->second;
} else {
current_batch = envelope_.add_batch();
current_batch->set_allocated_meta_data(current_metadata.release());
batch_map[serialized_metadata] = current_batch;
}
} else if (stored.has_encrypted_observation()) {
stored.mutable_encrypted_observation()->set_contribution_id(stored.contribution_id());
current_batch->add_encrypted_observation()->Swap(stored.mutable_encrypted_observation());
} else if (stored.has_unencrypted_observation()) {
auto *encrypted = current_batch->add_encrypted_observation();
if (!encrypter->Encrypt(stored.unencrypted_observation(), encrypted)) {
LOG_FIRST_N(ERROR, 10) << "ERROR: Unable to encrypt observation on read.";
}
encrypted->set_contribution_id(stored.contribution_id());
} else {
clean_eof = false;
break;
}
}
if (!clean_eof) {
VLOG(1) << "WARNING: Trying to read from `" << file_name
<< "` encountered a corrupted message. Returning the envelope that "
"has been read so far.";
break;
}
}
envelope_read_ = true;
return envelope_;
}
size_t FileObservationStore::FileEnvelopeHolder::Size() {
if (cached_file_size_ != 0) {
return cached_file_size_;
}
cached_file_size_ = 0;
for (const auto &file_name : file_names_) {
cached_file_size_ += fs_.FileSize(FullPath(file_name)).ConsumeValueOr(0);
}
return cached_file_size_;
}
void FileObservationStore::DeleteData() {
LOG(INFO) << "FileObservationStore: Deleting stored data";
auto fields = protected_fields_.lock();
FinalizeActiveFile(&fields);
fields->metadata_written = false;
fields->last_written_metadata = "";
fields->active_file = nullptr;
fields->files_taken = {};
fields->finalized_bytes = 0;
auto files = fs_.ListFiles(root_directory_).ConsumeValueOr({});
for (const auto &file : files) {
fs_.Delete(FullPath(file));
}
}
} // namespace cobalt::observation_store