blob: be1027c363368472d9d1ed02fa05b2416c28f375 [file] [log] [blame]
# Copyright 2019 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from recipe_engine import recipe_api
class BquploadApi(recipe_api.RecipeApi):
"""BquploadApi provides support for BigQuery upload."""
@property
def _bqupload_tool(self):
return self.m.ensure_tool("bqupload", self.resource("tool_manifest.json"))
def insert(
self, step_name, project, dataset, table, rows, alert_emails=(), **kwargs
):
"""Inserts rows into a BigQuery table.
Args:
step_name (str): Name of the step.
project (str): ID of the gcloud project.
dataset (str): Name of the BigQuery dataset.
table (str): Name of the BigQuery table.
rows (seq(dict)): Rows to insert insert into a BigQuery table.
**kwargs (dict): Passed through to api.step().
"""
kwargs.setdefault("infra_step", True)
full_table_name = f"{project}.{dataset}.{table}"
# bqupload accepts rows as newline-delimited JSON.
data = "\n".join([self.m.json.dumps(row) for row in rows])
step = self.m.step(
step_name,
[self._bqupload_tool, full_table_name, self.m.raw_io.input(data)],
**kwargs,
)
if alert_emails:
step.presentation.tags["fuchsia.failure_alert_emails"] = ",".join(
alert_emails
)
return step