blob: 57486df1084aef71b232bbbf539758af110c73a1 [file] [log] [blame]
# Copyright 2019 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from recipe_engine import recipe_api
BQUPLOAD_CIPD_PATH = "infra/tools/bqupload/${platform}"
BQUPLOAD_CIPD_VERSION = "git_revision:d85fe78f303c3e969f815121e17c8b08868039ef"
class BquploadApi(recipe_api.RecipeApi):
"""BquploadApi provides support for BigQuery upload."""
@property
def _bqupload_tool(self):
return self.m.cipd.ensure_tool(BQUPLOAD_CIPD_PATH, BQUPLOAD_CIPD_VERSION)
def insert(self, step_name, project, dataset, table, rows, **kwargs):
"""Inserts rows into a BigQuery table.
Args:
step_name (str): Name of the step.
project (str): ID of the gcloud project.
dataset (str): Name of the BigQuery dataset.
table (str): Name of the BigQuery table.
rows (seq(dict)): Rows to insert insert into a BigQuery table.
**kwargs (dict): Passed through to api.step().
"""
full_table_name = "%s.%s.%s" % (project, dataset, table)
# bqupload accepts rows as newline-delimited JSON.
data = "\n".join([self.m.json.dumps(row) for row in rows])
return self.m.step(
step_name,
[self._bqupload_tool, full_table_name, self.m.raw_io.input(data)],
**kwargs
)