blob: d09d053c6c7293992ec9ef632a20d3145f3be7ec [file] [log] [blame]
// Copyright 2017 The Fuchsia Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "analyzer/report_master/histogram_analysis_engine.h"
#include <memory>
#include <string>
#include <vector>
#include "./observation.pb.h"
#include "algorithms/forculus/forculus_analyzer.h"
#include "algorithms/rappor/basic_rappor_analyzer.h"
#include "glog/logging.h"
namespace cobalt {
namespace analyzer {
using config::AnalyzerConfig;
using forculus::ForculusAnalyzer;
using rappor::BasicRapporAnalyzer;
using store::ObservationStore;
using store::ReportStore;
namespace {
// Checks that the type of encoding used by the observation_part is the
// one specified by the encoding_config.
bool CheckConsistentEncoding(const EncodingConfig& encoding_config,
const ObservationPart& observation_part,
const ReportId& report_id) {
bool consistent = true;
switch (observation_part.value_case()) {
case ObservationPart::kForculus:
consistent = encoding_config.has_forculus();
break;
case ObservationPart::kBasicRappor:
consistent = encoding_config.has_basic_rappor();
break;
case ObservationPart::kRappor:
consistent = encoding_config.has_rappor();
break;
case ObservationPart::VALUE_NOT_SET:
consistent = false;
break;
default:
LOG(FATAL) << "Unexpected case " << observation_part.value_case();
}
if (!consistent) {
LOG(ERROR) << "Bad ObservationPart! Value uses encoding "
<< observation_part.value_case() << " but "
<< encoding_config.config_case() << " expected."
<< " For report_id=" << ReportStore::ToString(report_id);
}
return consistent;
}
} // namespace
////////////////////////////////////////////////////////////////////////////
/// class ForculusAdapter
//
// A concrete subclass of DecoderAdapter that adapts to a ForculsAnalyzer.
///////////////////////////////////////////////////////////////////////////
class ForculusAdapter : public DecoderAdapter {
public:
ForculusAdapter(const ReportId& report_id,
const cobalt::ForculusConfig& config)
: report_id_(report_id), analyzer_(new ForculusAnalyzer(config)) {}
bool ProcessObservationPart(uint32_t day_index,
const ObservationPart& obs) override {
return analyzer_->AddObservation(day_index, obs.forculus());
}
grpc::Status PerformAnalysis(std::vector<ReportRow>* results) override {
auto result_map = analyzer_->TakeResults();
results->clear();
for (const auto& pair : result_map) {
ValuePart value_part;
if (!value_part.ParseFromString(pair.first)) {
LOG(ERROR) << "Bad value. Could not parse as ValuePart: " << pair.first
<< "report_id=" << ReportStore::ToString(report_id_);
continue;
}
results->emplace_back();
HistogramReportRow* row = results->back().mutable_histogram();
row->mutable_value()->Swap(&value_part);
row->set_count_estimate(pair.second->total_count);
// TODO(rudominer) We are not using some of the data that the
// ForculusAnalyzer can return to us such as observation_errors().
// Consider adding monitoring around this.
}
return grpc::Status::OK;
}
private:
ReportId report_id_;
std::unique_ptr<ForculusAnalyzer> analyzer_;
};
////////////////////////////////////////////////////////////////////////////
/// class RapporAdapter
//
// A concrete subclass of DecoderAdapter that adapts to a
// StringRapporAnalyzer.
//
// NOTE: String RAPPOR analysis is not yet implemented in Cobalt.
///////////////////////////////////////////////////////////////////////////
class RapporAdapter : public DecoderAdapter {
public:
bool ProcessObservationPart(uint32_t day_index,
const ObservationPart& obs) override {
return false;
}
grpc::Status PerformAnalysis(std::vector<ReportRow>* results) override {
return grpc::Status(grpc::UNIMPLEMENTED,
"String RAPPOR analysis in not yet implemented.");
}
};
////////////////////////////////////////////////////////////////////////////
/// class BasicRapporAdapter
//
// A concrete subclass of DecoderAdapter that adapts to a BasicRapporAnalyzer.
///////////////////////////////////////////////////////////////////////////
class BasicRapporAdapter : public DecoderAdapter {
public:
BasicRapporAdapter(const ReportId& report_id,
const cobalt::BasicRapporConfig& config,
const IndexLabels* index_labels)
: report_id_(report_id),
analyzer_(new BasicRapporAnalyzer(config)),
index_labels(index_labels) {}
bool ProcessObservationPart(uint32_t day_index,
const ObservationPart& obs) override {
return analyzer_->AddObservation(obs.basic_rappor());
}
grpc::Status PerformAnalysis(std::vector<ReportRow>* results) override {
auto category_results = analyzer_->Analyze();
for (auto& category_result : category_results) {
results->emplace_back();
HistogramReportRow* row = results->back().mutable_histogram();
row->mutable_value()->Swap(&category_result.category);
// If the value is of type INDEX, meaning that it represents an index
// into some enumerated set defined outside of the Cobalt configuration,
// then check whether we were given an index label for this index and
// if so attach the label to the report row.
if (index_labels != nullptr &&
row->value().data_case() == ValuePart::kIndexValue) {
auto iter = index_labels->labels().find(row->value().index_value());
if (iter != index_labels->labels().end()) {
row->set_label(iter->second);
}
}
row->set_count_estimate(category_result.count_estimate);
row->set_std_error(category_result.std_error);
}
// TODO(rudominer) We are not using some of the data that the
// BasicRapporAnalyzer can return to us such as observation_errors().
// Consider adding monitoring around this.
return grpc::Status::OK;
}
private:
ReportId report_id_;
std::unique_ptr<BasicRapporAnalyzer> analyzer_;
const IndexLabels* index_labels; // not owned.
};
////////////////////////////////////////////////////////////////////////////
/// HistogramAnalysisEngine methods.
///////////////////////////////////////////////////////////////////////////
HistogramAnalysisEngine::HistogramAnalysisEngine(
const ReportId& report_id, const ReportVariable* report_variable,
std::shared_ptr<AnalyzerConfig> analyzer_config)
: report_id_(report_id),
report_variable_(report_variable),
analyzer_config_(analyzer_config) {}
bool HistogramAnalysisEngine::ProcessObservationPart(
uint32_t day_index, const ObservationPart& obs) {
DecoderAdapter* decoder = GetDecoder(obs);
if (!decoder) {
return false;
}
return decoder->ProcessObservationPart(day_index, obs);
}
// Note that despite the comments in histogram_analysis_engine.h, version 0.1 of
// Cobalt does not yet support reports that are heterogeneous with respect
// to encoding. In this version the purpose of the HistogramAnalysisEngine is to
// ensure that in fact the set of observations is not heterogeneous.
grpc::Status HistogramAnalysisEngine::PerformAnalysis(
std::vector<ReportRow>* results) {
CHECK(results);
if (decoders_.size() > 1) {
std::ostringstream stream;
stream << "Analysis aborted because more than one encoding_config_id was "
"found among the observations: ";
bool first = true;
for (const auto& pair : decoders_) {
if (!first) {
stream << ", ";
}
stream << pair.first;
first = false;
}
stream << ". This version of Cobalt does not "
"support heterogeneous reports. report_id="
<< ReportStore::ToString(report_id_);
std::string message = stream.str();
LOG(ERROR) << message;
return grpc::Status(grpc::UNIMPLEMENTED, message);
}
if (decoders_.size() == 0) {
std::ostringstream stream;
stream << "Analysis failed. No valid observations were added. report_id="
<< ReportStore::ToString(report_id_);
std::string message = stream.str();
LOG(ERROR) << message;
return grpc::Status(grpc::FAILED_PRECONDITION, message);
}
return decoders_.begin()->second->PerformAnalysis(results);
}
DecoderAdapter* HistogramAnalysisEngine::GetDecoder(
const ObservationPart& observation_part) {
uint32_t encoding_config_id = observation_part.encoding_config_id();
const EncodingConfig* encoding_config = analyzer_config_->EncodingConfig(
report_id_.customer_id(), report_id_.project_id(), encoding_config_id);
if (!encoding_config) {
LOG(ERROR) << "Bad ObservationPart! Contains invalid encoding_config_id "
<< encoding_config_id
<< " for report_id=" << ReportStore::ToString(report_id_);
return nullptr;
}
if (!CheckConsistentEncoding(*encoding_config, observation_part,
report_id_)) {
return nullptr;
}
auto iter = decoders_.find(encoding_config_id);
if (iter != decoders_.end()) {
return iter->second.get();
}
// This is the first time we have seen the |encoding_config_id|. Make
// a new decoder/analyzer for it.
decoders_[encoding_config_id] = NewDecoder(encoding_config);
return decoders_[encoding_config_id].get();
}
std::unique_ptr<DecoderAdapter> HistogramAnalysisEngine::NewDecoder(
const EncodingConfig* encoding_config) {
switch (encoding_config->config_case()) {
case EncodingConfig::kForculus:
return std::unique_ptr<DecoderAdapter>(
new ForculusAdapter(report_id_, encoding_config->forculus()));
case EncodingConfig::kRappor:
return std::unique_ptr<DecoderAdapter>(new RapporAdapter);
case EncodingConfig::kBasicRappor: {
const IndexLabels* index_labels = nullptr;
if (report_variable_->has_index_labels()) {
index_labels = &(report_variable_->index_labels());
}
return std::unique_ptr<DecoderAdapter>(new BasicRapporAdapter(
report_id_, encoding_config->basic_rappor(), index_labels));
}
default:
LOG(FATAL) << "Unexpected EncodingConfig type "
<< encoding_config->config_case();
}
}
} // namespace analyzer
} // namespace cobalt