blob: d7b9fdb4f0453934deb394424a4b9040159a8614 [file] [log] [blame]
#!/usr/bin/env python
# Copyright 2016 The Fuchsia Authors
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
"""Runs all of the analyzers. This file also contains utilities common to
all analyzers.
import csv
import logging
import os
import shutil
import subprocess
import sys
THIS_DIR = os.path.dirname(__file__)
ROOT_DIR = os.path.abspath(os.path.join(THIS_DIR, os.path.pardir))
sys.path.insert(0, ROOT_DIR)
THIRD_PARTY_DIR = os.path.join(ROOT_DIR, "third_party")
ALGORITHMS_DIR = os.path.join(ROOT_DIR, "algorithms")
import algorithms.forculus.forculus as forculus
import algorithms.laplace.laplacian as laplacian
import algorithms.rappor.sum_bits as sum_bits
import city_analyzer
import hour_analyzer
import module_name_analyzer
import help_query_analyzer
import url_analyzer
import as data
import utils.file_util as file_util
import utils.public_key_crypto_helper as crypto_helper
import third_party.rappor.client.python.rappor as rappor
import third_party.rappor.bin.hash_candidates as hash_candidates
_logger = logging.getLogger()
# Should public key encryption be used for communication between the
# Randomizers and the Analyzers via the Shufflers?
def analyzeUsingForculus(input_file, config_file, output_file):
''' A helper function that may be invoked by individual analyzers. It reads
input data in the form of a CSV file (that is generated as output from the
preceeding shuffle operation in the pipeline), analyzes the data using
Forculus, and then writes output to another CSV file to be consumed by the
visualizer at the end of the pipeline.
It uses Forculus to decrypt those entries that occur more than |threshold|
times, where |threshold| is read from the config file.
input_file {string}: The simple name of the CSV file to be read from
the 's_to_a' directory.
config_file {string}: The simple name of the Forculus config file used
for analyzing the data from the input file.
output_file {string}: The simple name of the CSV file to be written in
the 'out' directory, that is consumed by the visualizer.
with file_util.openFileForReading(config_file, file_util.CONFIG_DIR) as cf:
config = forculus.Config.from_csv(cf)
if _use_public_key_encryption:
ch = crypto_helper.CryptoHelper()
with file_util.openForAnalyzerReading(input_file) as input_f:
with file_util.openForWriting(output_file) as output_f:
forculus_evaluator = forculus.ForculusEvaluator(config.threshold, input_f)
def analyzeUsingRAPPOR(input_file, config_file, output_file,
''' Run the RAPPOR analysis using the given inputs.
input_file: {string} The simple name of the CSV file to be read from the
's_to_a' directory.
config_file: {string} The simple name of the RAPPOR config file to be read
from the 'config' directory
output_file: {string} The simple name of the CSV file to be written in
the 'out' directory
metric_name: {string} A short name for the metric being computed. It will
be used to form the name of temporary files and so it should contain
only letters and underscores.
candidates_file_name: {string} The simple name of the RAPPOR candidates
file to be read from the 'config' directory. This must be none-None if and
only if |use_basic_rappor|=False.
basic_map_file_name: {string} The name of the pre-computed map file
to be read from the 'config' directory. This must be none-None if and
only if |use_basic_rappor|=True.
use_basic_rappor {bool}: If True use basic RAPPOR analysis
(no Bloom filters.)
for_private_release {bool}: If True then in addition to doing the RAPPOR
decoding, Laplace noise is also added to the analyzed output. We use
different input, output and config files in this case.
assoc_options {dictionary}: If this parameter is not None it indicates that
a special two-dimensional RAPPOR analysis should be performed. This
analysis uses correlation to compute average scores for the second
dimension as a function of the first dimension. This assumes that the
second dimension uses basic RAPPOR with values being integers in the
range 1 to N for some positive integer N.
In that case this parameter should be a dictionary of options for the
correlation analysis. Also in this case there are additional requirements
on the other parameters of this method:
- The |input_file| must contain the data for *both* variables. The
- |config_file| should contain the RAPPOR params for the *first*
- The |output_file| should be named appropriately for *both* variables.
- The |metric_name| should be named appropriately for *both* variables.
- The |candidates_file_name| should refer to the candidates of
the *first* variable.
The dictionary should contain the following string keys and values:
- "metric_name": The final correlation analysis metric name such as
|CityAverageRating|. Note that this is a different value than the
parameter to this function called |metric_name|. This "metric_name"
refers to a field in the csv file specified by "config_file" below.
- "vars": A list of two variable names used for correlation during
analysis such as ['city_name', 'rating']. These strings refer to
fields in the csv file specified by "config_file" below.
- "config_file": The simple name of the CSV file specifying the
correlation configuration parameters.
# First get the RAPPOR config params.
with file_util.openFileForReading(
config_file, file_util.CONFIG_DIR) as cf:
config_params = rappor.Params.from_csv(cf)
map_file_prefix = metric_name
if assoc_options is not None:
# If we are doing a correlation analysis then the map file corresponds
# only to the first variable.
assert len(assoc_options["vars"]) == 2
map_file_prefix = assoc_options["vars"][0]
map_file_name = '%s_map_file.csv' % map_file_prefix
map_file = os.path.join(file_util.CACHE_DIR, map_file_name)
if use_basic_rappor:
# Because we are using basic RAPPOR the map file is a static file in the
# config directory--we do not have to generate a map file from a candidate
# file. But we copy it into the cache directory because the R script will
# write a .rda file in the same directory and we don't want it to write
# into the config directory.
assert basic_map_file_name is not None
assert candidates_file_name is None
basic_map_file = os.path.join(file_util.CONFIG_DIR, basic_map_file_name)
_copyFileIfNecessary(basic_map_file, map_file)
assert basic_map_file_name is None
assert candidates_file_name is not None
_generateMapFileIfNecessary(map_file, map_file_name, candidates_file_name,
# Decrypt the input file contents from shuffler if necessary.
if _use_public_key_encryption:
ch = crypto_helper.CryptoHelper()
decrypt_on_analyzer = ch.decryptOnAnalyzer
# Build the command string to invoke the appropriate R script.
if assoc_options is None:
# Use generic RAPPOR analysis
# Compute counts per cohort and write into analyzer temp directory.
counts_file_name = '%s_count_file.csv' % metric_name
with file_util.openForAnalyzerReading(input_file) as input_f:
with file_util.openForAnalyzerTempWriting(counts_file_name) as output_f:
sum_bits.sumBits(config_params, input_f, output_f, decrypt_on_analyzer)
cmd = _buildRAPPORAnalysisCmd(counts_file_name, config_file, map_file)
# Use correlation based RAPPOR analysis
if decrypt_on_analyzer is not None:
# Decrypt the input contents into a temporary file before invoking the
# RAPPOR script.
decrypted_input_file = ('%s_decrypted_shuffler_out.csv' % metric_name)
with file_util.openForAnalyzerReading(input_file) as input_f:
with file_util.openForAnalyzerTempWriting(
decrypted_input_file) as tmp_output_f:
reader = csv.reader(input_f)
writer = csv.writer(tmp_output_f)
for i, row in enumerate(reader):
# The tuple read from csv_in represents a cipher text. Pass the
# elements of that tuple as arguments to the decryption function
# receiving back the plaintext which is a single string that is a
# comma-separated list of fields. Split that string into fields and
# use that as the value of the read row.
reports_file = os.path.abspath(os.path.join(
file_util.ANALYZER_TMP_OUT_DIR, decrypted_input_file))
reports_file = os.path.abspath(os.path.join(file_util.S_TO_A_DIR,
cmd = _buildRAPPORCorrelationAnalysisCmd(
# Then we change into the Rappor directory and execute the command.
savedir = os.getcwd()
os.environ["RAPPOR_REPO"] = os.path.join(THIRD_PARTY_DIR, 'rappor/')
# We supress the output from the R script unless it fails.
subprocess.check_output(cmd, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as e:
print "\n**** Error running RAPPOR decode script:"
print e.output
raise Exception('Fatal error. Cobalt pipeline terminating.')
if assoc_options or use_basic_rappor or for_private_release == False:
# Simply copy the results file to the out directory and return.
results_file_name = 'assoc-results.csv' if assoc_options else 'results.csv'
src = os.path.abspath(os.path.join(
file_util.ANALYZER_TMP_OUT_DIR, results_file_name))
dst = os.path.abspath(os.path.join(
file_util.OUT_DIR, output_file))
shutil.copyfile(src, dst)
# Read in the results of the RAPPOR analysis
with file_util.openFileForReading('results.csv',
file_util.ANALYZER_TMP_OUT_DIR) as f:
reader = csv.reader(f)
results = [result for result in reader]
# If requested, add Laplacian noise in order to affect differentially
# private release.
if for_private_release:
# Here we are using a Laplacian with lambda = 1. This gives use
# differential privacy with epsilon = 1.
laplacian_distribution = laplacian.Laplacian(1)
# We skip row 1 of results because it is a header row.
for result in results[1:]:
# The estimated count is in column 1 of the RAPPOR results. Since
# the values are counts we round to a whole number.
result[1] = int(round(int(result[1]) + laplacian_distribution.sample()))
# Write the final output file.
with file_util.openForWriting(output_file) as f:
writer = csv.writer(f)
for result in results:
def _buildRAPPORCorrelationAnalysisCmd(assoc_config_file, map1_file,
reports_file, metric_name, vars):
''' Invoke the R script 'decode_assoc_averages.R' for running correlation
analysis using multiple params.
# Create the temp directory.
# First we build the command string.
rappor_avg_decode_script = os.path.join(ALGORITHMS_DIR,
'rappor', 'decode_assoc_averages.R')
schema_file = os.path.abspath(os.path.join(
em_executable_file = os.path.abspath(os.path.join(THIRD_PARTY_DIR,
'rappor', 'analysis',
'cpp', '_tmp', 'fast_em'))
if os.path.isfile(em_executable_file) == False:
exception_str = 'Expect fast_em executable at %s.' % em_executable_file
exception_str += (' Terminating Correlations Analyzer.'
' Please run ./ build to build'
' fast_em executable.\n')
raise Exception('\n****' + exception_str)
cmd = [rappor_avg_decode_script,
'--metric-name', metric_name,
'--schema', schema_file,
'--reports', reports_file,
'--params-dir', file_util.CONFIG_DIR,
'--var1', vars[0],
'--var2', vars[1],
'--map1', map1_file,
'--max-em-iters', "1000",
'--num-cores', "2",
'--output-dir', file_util.ANALYZER_TMP_OUT_DIR,
'--em-executable', em_executable_file
return cmd
def _buildRAPPORAnalysisCmd(counts_file_name, config_file, map_file):
rappor_decode_script = os.path.join(THIRD_PARTY_DIR,
'rappor', 'bin', 'decode_dist.R')
counts_file = os.path.abspath(os.path.join(file_util.ANALYZER_TMP_OUT_DIR,
params_file = os.path.abspath(os.path.join(file_util.CONFIG_DIR,
cmd = [rappor_decode_script, '--map', map_file, '--counts', counts_file,
'--params', params_file, '--output-dir',
return cmd
def _generateMapFileIfNecessary(map_file, map_file_name, candidates_file_name,
''' Generates the map file in the cache directory if there is not already
an up-to-date version.
candidate_file = os.path.join(file_util.CONFIG_DIR, candidates_file_name)
# Generate the map file unless it already exists and has a modified
# timestamp later than that of the candidate file. We allow a buffer of
# 10 seconds in case the timestamps are out of sync for some reason.
if (not os.path.exists(map_file) or
os.path.getmtime(map_file) < os.path.getmtime(candidate_file) + 10):
with file_util.openFileForReading(candidates_file_name,
file_util.CONFIG_DIR) as cand_f:
with file_util.openFileForWriting(map_file_name,
file_util.CACHE_DIR) as map_f:
_logger.debug('Generating a RAPPOR map file at %s based on '
'candidate file at %s' % (map_file, candidate_file))
hash_candidates.HashCandidates(params, cand_f, map_f)
def _copyFileIfNecessary(from_file, to_file):
''' Copies from_file to to_file if from_file has been modified more
recently than to_file.
if (not os.path.exists(to_file) or
os.path.getmtime(to_file) < os.path.getmtime(from_file) + 10):
shutil.copyfile(from_file, to_file)
def runAllAnalyzers(use_public_key_encryption=False):
"""Runs all of the analyzers.
This function does not return anything but it invokes all of the
analyzers each of which will read a file from the 's_to_a' directory
and write a file in the top level out directory.
use_public_key_encryption {boolean}: Should public key encrytpion be
used to encrypt communication between the Randomizers and the Analyzers
via the shufflers?
global _use_public_key_encryption
_use_public_key_encryption = use_public_key_encryption
print "Running the help-query analyzer..."
hq_analyzer = help_query_analyzer.HelpQueryAnalyzer()
print "Running the city/ratings analyzer..."
cr_analyzer = city_analyzer.CityRatingsAnalyzer()
print "Running the module names analyzer..."
mn_analyzer = module_name_analyzer.ModuleNameAnalyzer()
print("Running the module names analyzer with differentially private "
print "Running the hour-of-day analyzer..."
hd_analyzer = hour_analyzer.HourOfDayAnalyzer()
print "Running the url analyzer..."
u_analyzer = url_analyzer.UrlAnalyzer()
def main():
if __name__ == '__main__':