| #!/usr/bin/env python3 |
| # Copyright 2017 gRPC authors. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| """Helper to upload Jenkins test results to BQ""" |
| |
| from __future__ import print_function |
| |
| import os |
| import sys |
| import time |
| import uuid |
| |
| import six |
| |
| gcp_utils_dir = os.path.abspath( |
| os.path.join(os.path.dirname(__file__), "../../gcp/utils") |
| ) |
| sys.path.append(gcp_utils_dir) |
| import big_query_utils |
| |
| _DATASET_ID = "jenkins_test_results" |
| _DESCRIPTION = "Test results from master job run on Jenkins" |
| # 365 days in milliseconds |
| _EXPIRATION_MS = 365 * 24 * 60 * 60 * 1000 |
| _PARTITION_TYPE = "DAY" |
| _PROJECT_ID = "grpc-testing" |
| _RESULTS_SCHEMA = [ |
| ("job_name", "STRING", "Name of Jenkins job"), |
| ("build_id", "INTEGER", "Build ID of Jenkins job"), |
| ("build_url", "STRING", "URL of Jenkins job"), |
| ("test_name", "STRING", "Individual test name"), |
| ("language", "STRING", "Language of test"), |
| ("platform", "STRING", "Platform used for test"), |
| ("config", "STRING", "Config used for test"), |
| ("compiler", "STRING", "Compiler used for test"), |
| ("iomgr_platform", "STRING", "Iomgr used for test"), |
| ("result", "STRING", "Test result: PASSED, TIMEOUT, FAILED, or SKIPPED"), |
| ("timestamp", "TIMESTAMP", "Timestamp of test run"), |
| ("elapsed_time", "FLOAT", "How long test took to run"), |
| ("cpu_estimated", "FLOAT", "Estimated CPU usage of test"), |
| ("cpu_measured", "FLOAT", "Actual CPU usage of test"), |
| ("return_code", "INTEGER", "Exit code of test"), |
| ] |
| _INTEROP_RESULTS_SCHEMA = [ |
| ("job_name", "STRING", "Name of Jenkins/Kokoro job"), |
| ("build_id", "INTEGER", "Build ID of Jenkins/Kokoro job"), |
| ("build_url", "STRING", "URL of Jenkins/Kokoro job"), |
| ( |
| "test_name", |
| "STRING", |
| "Unique test name combining client, server, and test_name", |
| ), |
| ( |
| "suite", |
| "STRING", |
| "Test suite: cloud_to_cloud, cloud_to_prod, or cloud_to_prod_auth", |
| ), |
| ("client", "STRING", "Client language"), |
| ("server", "STRING", "Server host name"), |
| ("test_case", "STRING", "Name of test case"), |
| ("result", "STRING", "Test result: PASSED, TIMEOUT, FAILED, or SKIPPED"), |
| ("timestamp", "TIMESTAMP", "Timestamp of test run"), |
| ("elapsed_time", "FLOAT", "How long test took to run"), |
| ] |
| |
| |
| def _get_build_metadata(test_results): |
| """Add Kokoro build metadata to test_results based on environment |
| variables set by Kokoro. |
| """ |
| build_id = os.getenv("KOKORO_BUILD_NUMBER") |
| build_url = ( |
| "https://source.cloud.google.com/results/invocations/%s" |
| % os.getenv("KOKORO_BUILD_ID") |
| ) |
| job_name = os.getenv("KOKORO_JOB_NAME") |
| |
| if build_id: |
| test_results["build_id"] = build_id |
| if build_url: |
| test_results["build_url"] = build_url |
| if job_name: |
| test_results["job_name"] = job_name |
| |
| |
| def _insert_rows_with_retries(bq, bq_table, bq_rows): |
| """Insert rows to bq table. Retry on error.""" |
| # BigQuery sometimes fails with large uploads, so batch 1,000 rows at a time. |
| for i in range((len(bq_rows) // 1000) + 1): |
| max_retries = 3 |
| for attempt in range(max_retries): |
| if big_query_utils.insert_rows( |
| bq, |
| _PROJECT_ID, |
| _DATASET_ID, |
| bq_table, |
| bq_rows[i * 1000 : (i + 1) * 1000], |
| ): |
| break |
| else: |
| if attempt < max_retries - 1: |
| print("Error uploading result to bigquery, will retry.") |
| else: |
| print( |
| "Error uploading result to bigquery, all attempts" |
| " failed." |
| ) |
| sys.exit(1) |
| |
| |
| def upload_results_to_bq(resultset, bq_table, extra_fields): |
| """Upload test results to a BQ table. |
| |
| Args: |
| resultset: dictionary generated by jobset.run |
| bq_table: string name of table to create/upload results to in BQ |
| extra_fields: dict with extra values that will be uploaded along with the results |
| """ |
| bq = big_query_utils.create_big_query() |
| big_query_utils.create_partitioned_table( |
| bq, |
| _PROJECT_ID, |
| _DATASET_ID, |
| bq_table, |
| _RESULTS_SCHEMA, |
| _DESCRIPTION, |
| partition_type=_PARTITION_TYPE, |
| expiration_ms=_EXPIRATION_MS, |
| ) |
| |
| bq_rows = [] |
| for shortname, results in six.iteritems(resultset): |
| for result in results: |
| test_results = {} |
| _get_build_metadata(test_results) |
| test_results["cpu_estimated"] = result.cpu_estimated |
| test_results["cpu_measured"] = result.cpu_measured |
| test_results["elapsed_time"] = "%.2f" % result.elapsed_time |
| test_results["result"] = result.state |
| test_results["return_code"] = result.returncode |
| test_results["test_name"] = shortname |
| test_results["timestamp"] = time.strftime("%Y-%m-%d %H:%M:%S") |
| for field_name, field_value in six.iteritems(extra_fields): |
| test_results[field_name] = field_value |
| row = big_query_utils.make_row(str(uuid.uuid4()), test_results) |
| bq_rows.append(row) |
| _insert_rows_with_retries(bq, bq_table, bq_rows) |
| |
| |
| def upload_interop_results_to_bq(resultset, bq_table): |
| """Upload interop test results to a BQ table. |
| |
| Args: |
| resultset: dictionary generated by jobset.run |
| bq_table: string name of table to create/upload results to in BQ |
| """ |
| bq = big_query_utils.create_big_query() |
| big_query_utils.create_partitioned_table( |
| bq, |
| _PROJECT_ID, |
| _DATASET_ID, |
| bq_table, |
| _INTEROP_RESULTS_SCHEMA, |
| _DESCRIPTION, |
| partition_type=_PARTITION_TYPE, |
| expiration_ms=_EXPIRATION_MS, |
| ) |
| |
| bq_rows = [] |
| for shortname, results in six.iteritems(resultset): |
| for result in results: |
| test_results = {} |
| _get_build_metadata(test_results) |
| test_results["elapsed_time"] = "%.2f" % result.elapsed_time |
| test_results["result"] = result.state |
| test_results["test_name"] = shortname |
| test_results["suite"] = shortname.split(":")[0] |
| test_results["client"] = shortname.split(":")[1] |
| test_results["server"] = shortname.split(":")[2] |
| test_results["test_case"] = shortname.split(":")[3] |
| test_results["timestamp"] = time.strftime("%Y-%m-%d %H:%M:%S") |
| row = big_query_utils.make_row(str(uuid.uuid4()), test_results) |
| bq_rows.append(row) |
| _insert_rows_with_retries(bq, bq_table, bq_rows) |