blob: be14c0e522d81a6cf23f2a329d0d935e9f815311 [file] [log] [blame]
// Copyright 2016 The Fuchsia Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "analyzer/store/observation_store.h"
#include <algorithm>
#include <chrono>
#include <cstring>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "./observation.pb.h"
#include "analyzer/store/data_store.h"
#include "analyzer/store/observation_store_internal.h"
#include "glog/logging.h"
#include "util/crypto_util/hash.h"
#include "util/crypto_util/random.h"
namespace cobalt {
namespace analyzer {
namespace store {
using crypto::byte;
using crypto::hash::DIGEST_SIZE;
using crypto::hash::Hash;
using internal::DayIndexFromRowKey;
using internal::GenerateNewRowKey;
using internal::ParseEncryptedObservationPart;
using internal::ParseEncryptedSystemProfile;
using internal::RangeLimitKey;
using internal::RangeStartKey;
namespace {
// The name of the column in which we store the serialized SystemProfile
// of each Observation. This column cannot be confused with a metric part
// column because metric parts names are not allowed to begin with an
// underscore.
static const char kSystemProfileColumnName[] = "_CobaltSystemProfile";
} // namespace
// The internal namespace contains private implementation functions that need
// to be accessible to unit tests. The functions are declared in
// observation_store_internal.h.
namespace internal {
// Returns the row key that encapsulates the given data. In the current version
// of Cobalt we are using human-readable row keys for the sake of debugability.
// In the future we should replace this with a more compact representation.
// The row key is 75 bytes long and is an ASCII string of the form:
// <customer>:<project>:<metric>:<day>:<random>:<hash> where
// customer: The customer id as a positive 10 digit decimal string
// project: The project id as a positive 10 digit decimal string
// metric: The metric id as a positive 10 digit decimal string
// random: A random 64-bit number as a positive 20 digit decimal string
// hash: A 32-bit hash of the Observation as a positive 10 digit decimal string.
//
// The first four components of the row key
// <customer>:<project>:<metric>:<day>
// represent the metric-day. This unit is important because it is the unit on
// which a report runs. The QueryObservations() method operates on whole
// metric days.
//
// The remainder of the row key
// <random>:<hash>
// serves to form, along with the metric-day, a unique identifier for the
// Observation. The random 64-bit number is generated on the client. This allows
// the add-observation operation to be idempotent: If the client sends us
// the identical Observation twice we will only store it once. The hash is
// computed on the server. It reduces the probability of collision while
// maintaining idempotency. That is, using the <hash> maintains the property
// that if the client sends us the identical Observation twice we will only
// store it once, and it reduces the probability that we will receive what is
// supposed to be two different Observations but store only one and discard
// the other.
//
// In an earlier version of this code the last two components of the row key
// were different:
// <time>:<random> where
// time: Arrival time in millis as a positive 20 digit decimal string
// random: A random 32-bit number as a positive 10 digit decimal string.
// Both the arrival time and the random were generated on the server. This older
// scheme sufficed for obtaining unique identifiers but did not give us the
// idempotency of the add-observation operation.
//
// Our Observation store will contain a mixture of both the old and the new
// row keys. The new row key format was chosen in a way to make this harmless.
// Because the last two components of the row key are never interpreted (in fact
// they are never even parsed) they are only used for the purpose of having
// unique row keys, there is no harm in replacing the old format with the new
// one.
std::string RowKey(uint32_t customer_id, uint32_t project_id,
uint32_t metric_id, uint32_t day_index, uint64_t random,
uint32_t hash) {
// We write five ten-digit numbers, plus one twenty-digit number plus five
// colons. The string has size 76 to accommodate a trailing null character.
std::string out(76, 0);
// TODO(rudominer): Replace human-readable row key with smaller more efficient
// representation.
std::snprintf(&out[0], out.size(), "%.10u:%.10u:%.10u:%.10u:%.20lu:%.10u",
customer_id, project_id, metric_id, day_index, random, hash);
// Discard the trailing null character.
out.resize(75);
return out;
}
// Returns a 32-bit hash of (|observation|, |metadata|) appropriate for use as
// the <hash> component of a row key. See comments on RowKey() above.
uint32_t HashObservation(const Observation& observation,
const ObservationMetadata metadata) {
std::string serialized_observation;
observation.SerializeToString(&serialized_observation);
std::string serialized_metadta;
metadata.SerializeToString(&serialized_metadta);
serialized_observation += serialized_metadta;
byte hash_bytes[DIGEST_SIZE];
Hash(reinterpret_cast<const byte*>(serialized_observation.data()),
serialized_observation.size(), hash_bytes);
uint32_t return_value = 0;
std::memcpy(&return_value, hash_bytes,
std::min(DIGEST_SIZE, sizeof(return_value)));
return return_value;
}
// Returns the common prefix of all rows keys for the given metric.
std::string RowKeyPrefix(uint32_t customer_id, uint32_t project_id,
uint32_t metric_id) {
// TODO(rudominer) This length corresponds to our current, temporary,
// human-readable row-keys built in RowKey() above. This function needs
// to change when the implementation changes. The prefix we return
// includes three ten-digit numbers plus three colons.
static const size_t kPrefixLength = 33;
std::string row_key = RowKey(customer_id, project_id, metric_id, 0, 0, 0);
row_key.resize(kPrefixLength);
return row_key;
}
// Returns the day_index encoded by |row_key|.
uint32_t DayIndexFromRowKey(const std::string& row_key) {
uint32_t day_index = 0;
// Parse the string produced by the RowKey() function above. We skip three
// ten-digit integers and three colons and then parse 10 digits.
CHECK_GT(row_key.size(), 33u);
std::sscanf(&row_key[33], "%10u", &day_index);
return day_index;
}
// Returns the lexicographically least row key for rows with the given
// data.
std::string RangeStartKey(uint32_t customer_id, uint32_t project_id,
uint32_t metric_id, uint32_t day_index) {
return RowKey(customer_id, project_id, metric_id, day_index, 0, 0);
}
// Returns the lexicographically least row key that is greater than all row
// keys for rows with the given metadata, if day_index < UINT32_MAX. In the case
// that |day_index| = UINT32_MAX, returns the lexicographically least row key
// that is greater than all row keys for rows with the given values of
// the other parameters.
std::string RangeLimitKey(uint32_t customer_id, uint32_t project_id,
uint32_t metric_id, uint32_t day_index) {
if (day_index < UINT32_MAX) {
return RowKey(customer_id, project_id, metric_id, day_index + 1, 0, 0);
} else {
// UINT32_MAX is already greater than all valid values of day_index.
return RowKey(customer_id, project_id, metric_id, UINT32_MAX, 0, 0);
}
}
// Generates a new row key for a row for the given Observation.
std::string GenerateNewRowKey(const ObservationMetadata& metadata,
const Observation& observation) {
uint64_t random;
if (observation.random_id().size() > 0 &&
observation.random_id().size() != sizeof(random)) {
LOG(WARNING) << "Unexpected size for random_id field: "
<< observation.random_id().size();
}
if (observation.random_id().size() >= sizeof(random)) {
std::memcpy(&random, observation.random_id().data(), sizeof(random));
VLOG(5) << "ObservationStore: received random_id from client: " << random;
} else {
// If the client did not send us a random we will generate one.
cobalt::crypto::Random rand;
random = rand.RandomUint64();
VLOG(5) << "ObservationStore: No random_id from client.";
}
return RowKey(metadata.customer_id(), metadata.project_id(),
metadata.metric_id(), metadata.day_index(), random,
HashObservation(observation, metadata));
}
bool ParseEncryptedObservationPart(ObservationPart* observation_part,
std::string bytes) {
// TODO(rudominer) Arrange for ObservationParts to be encrypted.
return observation_part->ParseFromString(bytes);
}
bool ParseEncryptedSystemProfile(SystemProfile* system_profile,
std::string bytes) {
// TODO(rudominer) Arrange for SystemProfiles to be encrypted.
return system_profile->ParseFromString(bytes);
}
// Copies SystemProfile fields from |src| to |dst| based on the list of
// SystemProfileFields.
void WriteFilteredSystemProfile(SystemProfile* dst,
std::unique_ptr<SystemProfile> src,
const SystemProfileFields& fields) {
if (src != nullptr && dst != nullptr) {
for (int field : fields) {
switch (field) {
case SystemProfileField::OS:
dst->set_os(src->os());
break;
case SystemProfileField::ARCH:
dst->set_arch(src->arch());
break;
case SystemProfileField::BOARD_NAME:
dst->set_allocated_board_name(src->release_board_name());
break;
case SystemProfileField::PRODUCT_NAME:
dst->set_allocated_product_name(src->release_product_name());
break;
case SystemProfileField::SYSTEM_VERSION:
dst->set_allocated_system_version(src->release_system_version());
break;
case SystemProfileField::CHANNEL:
dst->set_allocated_channel(src->release_channel());
break;
}
}
}
}
} // namespace internal
ObservationStore::ObservationStore(std::shared_ptr<DataStore> store)
: store_(store) {}
Status ObservationStore::AddObservation(const ObservationMetadata& metadata,
const Observation& observation) {
std::vector<Observation> observations;
observations.emplace_back(observation);
return AddObservationBatch(metadata, observations);
}
Status ObservationStore::AddObservationBatch(
const ObservationMetadata& metadata,
const std::vector<Observation>& observations) {
std::string serialized_system_profile;
if (metadata.has_system_profile()) {
metadata.system_profile().SerializeToString(&serialized_system_profile);
}
std::vector<DataStore::Row> rows;
for (const Observation& observation : observations) {
DataStore::Row row;
row.key = GenerateNewRowKey(metadata, observation);
for (const auto& pair : observation.parts()) {
std::string serialized_observation_part;
pair.second.SerializeToString(&serialized_observation_part);
// TODO(rudominer) Consider ways to avoid having so many copies of the
// part names.
row.column_values[pair.first] = std::move(serialized_observation_part);
}
if (!serialized_system_profile.empty()) {
row.column_values[kSystemProfileColumnName] = serialized_system_profile;
}
rows.emplace_back(std::move(row));
}
return store_->WriteRows(DataStore::kObservations, std::move(rows));
}
ObservationStore::QueryResponse ObservationStore::QueryObservations(
uint32_t customer_id, uint32_t project_id, uint32_t metric_id,
uint32_t start_day_index, uint32_t end_day_index,
std::vector<std::string> parts,
const SystemProfileFields& system_profile_fields, size_t max_results,
std::string pagination_token) {
ObservationStore::QueryResponse query_response;
std::string start_row;
bool inclusive = true;
std::string range_start_key =
RangeStartKey(customer_id, project_id, metric_id, start_day_index);
if (!pagination_token.empty()) {
// The pagination token should be the row key of the last row returned the
// previous time this method was invoked.
if (pagination_token < range_start_key) {
query_response.status = kInvalidArguments;
return query_response;
}
start_row.swap(pagination_token);
inclusive = false;
} else {
start_row.swap(range_start_key);
}
std::string limit_row =
RangeLimitKey(customer_id, project_id, metric_id, end_day_index);
if (limit_row <= start_row) {
query_response.status = kInvalidArguments;
return query_response;
}
if (!parts.empty() && system_profile_fields.size() > 0) {
// If parts is empty this will indicate to the underlying DataStore that
// we wish to retrieve all columns and so we don't want to append the
// column name for SystemProfile because this would change the meaning
// of the query to indicate that we want to retrieve that column
// *only*, which is not what we want.
parts.emplace_back(kSystemProfileColumnName);
}
DataStore::ReadResponse read_response = store_->ReadRows(
DataStore::kObservations, std::move(start_row), inclusive,
std::move(limit_row), std::move(parts), max_results);
query_response.status = read_response.status;
if (query_response.status != kOK) {
return query_response;
}
for (const DataStore::Row& row : read_response.rows) {
// For each row of the read_response we add a query_result to the
// query_response.
query_response.results.emplace_back();
auto& query_result = query_response.results.back();
query_result.metadata.set_customer_id(customer_id);
query_result.metadata.set_project_id(project_id);
query_result.metadata.set_metric_id(metric_id);
query_result.metadata.set_day_index(DayIndexFromRowKey(row.key));
for (auto& pair : row.column_values) {
const std::string& column_name = pair.first;
const std::string& column_value = pair.second;
if (column_name == kSystemProfileColumnName) {
if (system_profile_fields.size() > 0) {
auto profile = std::make_unique<SystemProfile>();
if (!ParseEncryptedSystemProfile(profile.get(), column_value)) {
query_response.status = kOperationFailed;
return query_response;
}
internal::WriteFilteredSystemProfile(
query_result.metadata.mutable_system_profile(),
std::move(profile), system_profile_fields);
}
} else {
// The column name is a metric part name so we add an ObservationPart
// with this metric part name as the key to the map.
// The insert_result is a pair of the form < <key, value>, bool> where
// the bool indicates whether or not the key was newly added to the map.
auto insert_result = query_result.observation.mutable_parts()->insert(
google::protobuf::Map<std::string, ObservationPart>::value_type(
column_name, ObservationPart()));
// The column names should all be unique so each insert should return
// true.
DCHECK(insert_result.second);
// The ObservationPart is the value and so the second element of the
// first element of insert_result.
auto& observation_part = insert_result.first->second;
// We deserialize the ObservationPart from the column value.
if (!ParseEncryptedObservationPart(&observation_part, column_value)) {
query_response.status = kOperationFailed;
return query_response;
}
}
}
}
if (read_response.more_available) {
// If the underlying store says that there are more rows available, then
// we return the row key of the last row as the pagination_token.
if (read_response.rows.empty()) {
// There Read operation indicated that there were more rows available yet
// it did not return even one row. In this pathological case we return
// an error.
query_response.status = kOperationFailed;
return query_response;
}
size_t last_index = read_response.rows.size() - 1;
query_response.pagination_token.swap(read_response.rows[last_index].key);
}
return query_response;
}
Status ObservationStore::DeleteAllForMetric(uint32_t customer_id,
uint32_t project_id,
uint32_t metric_id) {
return store_->DeleteRowsWithPrefix(
DataStore::kObservations,
internal::RowKeyPrefix(customer_id, project_id, metric_id));
}
} // namespace store
} // namespace analyzer
} // namespace cobalt