blob: b23ffbf86a2ac0c227c70e3f95af1b81f276a6d7 [file] [log] [blame]
// Copyright 2025 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "resultstore_streamer.h"
#include <cassert>
#include <cstdarg>
#include <cstdlib>
#include <chrono>
#include <numeric>
#include "graph.h"
#include "metrics.h"
#include "persistent_mode.h"
#include "proto.h"
#include "util.h"
#include "version.h"
#include "output_stream.h"
// On Windows, a system header #defines OPTIONAL, which conflicts with
// an enum value of the same name in resultstore_upload.proto.
// Un-define it as a workaround.
#ifdef OPTIONAL
#undef OPTIONAL
#endif
#include "resultstore.pb.h"
namespace rspb = ::rs::google::devtools::resultstore::v2;
// Ninja itself is not configuration-aware, though other tools
// that generate ninja plans may have configuration.
// For now, it is acceptable to use a single default configuration.
static constexpr char kDefaultConfigurationName[] = "default";
// Targets are displayed in the ResultStore UI.
// Ideally, this should contain some subset of non-phony targets
// that are built, but for now we can start with a single generic
// all-encompassing target.
static constexpr char kDefaultTargetName[] = ":default";
static int64_t _compute_epoch_offset_millis() {
const auto system_now = std::chrono::system_clock::now();
const auto steady_now = std::chrono::steady_clock::now();
return (
std::chrono::duration_cast<std::chrono::milliseconds>(
system_now.time_since_epoch()) -
std::chrono::duration_cast<std::chrono::milliseconds>(
steady_now.time_since_epoch())
).count();
}
// The times from functions like GetTimeMillis() in "metrics.h"
// are based on chrono::steady_clock (monotonic, implementation-defined epoch),
// but ResultStore wants timestamps based on UNIX epoch time,
// which most chrono::system_clock implementations use.
// Compute the offset once, and use for timestamp conversions.
static const int64_t epoch_offset_millis = _compute_epoch_offset_millis();
// The is an implememtation-change-detector that verifies the assumption that
// GetTimeMillis() in "metrics.cc" is based on chromo::steady_clock.
// Should this fail, time calculations in this unit should be reviewed
// and updated.
static int _verify_GetTimeMillis_range() {
const auto steady_now = std::chrono::steady_clock::now();
const auto future = steady_now + std::chrono::seconds(5);
const int64_t now_millis =
std::chrono::duration_cast<std::chrono::milliseconds>(
steady_now.time_since_epoch()).count();
const int64_t future_millis =
std::chrono::duration_cast<std::chrono::milliseconds>(
future.time_since_epoch()).count();
// GetTimeMillis() should be close to steady_now.
const int64_t get_time_millis = GetTimeMillis();
assert(get_time_millis >= now_millis);
assert(get_time_millis <= future_millis);
return 0; // value is inconsequential
}
static const int _assert_time_range = _verify_GetTimeMillis_range();
static void millis_to_timestamp(
int64_t time_millis,
::rs::google::protobuf::Timestamp* timestamp) {
// recover epoch time from steady_clock.
const auto epoch_time_millis = time_millis + epoch_offset_millis;
const ldiv_t t = std::ldiv(epoch_time_millis, 1000);
timestamp->set_seconds(t.quot);
timestamp->set_nanos(t.rem * 1e6);
}
static void millis_to_duration(
int64_t time_millis,
::rs::google::protobuf::Duration* duration) {
const ldiv_t t = std::ldiv(time_millis, 1000);
duration->set_seconds(t.quot);
duration->set_nanos(t.rem * 1e6);
}
static const char* username() {
const char* user = std::getenv("USER");
if (user != nullptr) return user;
return "unknown-user";
}
static const char* hostname() {
const char* user = std::getenv("HOSTNAME");
if (user != nullptr) return user;
return "unknown-host";
}
// Populate invocation parameters to be used in a Create request.
static void initialize_invocation(
rspb::Invocation* invocation,
const std::vector<std::string>& ninja_command,
const std::string& build_id, int64_t start_time_millis,
const BuildMetadataMap& metadata) {
if (build_id != "") {
invocation->mutable_id()->set_invocation_id(build_id); // redundant?
}
// otherwise, let proxy tool generate an invocation ID
millis_to_timestamp(start_time_millis,
invocation->mutable_timing()->mutable_start_time());
// InvocationAttributes
rspb::InvocationAttributes* attr = invocation->mutable_invocation_attributes();
// project_id need not be set here, it can be set by a ResultStore proxy process
attr->set_project_id("unknown-project");
attr->add_users(username());
attr->add_labels("ninja");
// TODO: add a tool sub-command to labels, or default to "build"?
attr->set_description(std::string("Invocation ID ") + build_id);
// WorkspaceInfo
rspb::WorkspaceInfo* ws = invocation->mutable_workspace_info();
ws->set_hostname(hostname());
ws->set_working_directory(GetCurrentDir());
ws->set_tool_tag("ninja");
rspb::CommandLine cl;
cl.set_args(ninja_command);
ws->add_command_lines(cl);
// Properties
std::vector<rspb::Property>* props = invocation->mutable_properties();
for (const auto& md : metadata) {
rspb::Property prop;
prop.set_key(md.first);
prop.set_value(md.second);
props->push_back(prop);
}
}
// Serializes a single message to an output stream.
// The message payload is preceded by its size.
template <typename StreamType, typename MessageType>
void WriteCodedOutput(StreamType* output, const MessageType& m) {
WriteFixed32NoTag(output, static_cast<uint32_t>(m.ByteSizeLong()));
// TODO: consider using Varint32 encoding for the size.
m.SerializeToOstream(output);
}
/*
// This might not be needed if we plan to use UploadRequest instead.
static std::unique_ptr<rspb::CreateInvocationRequest>
make_create_invocation_request(
const std::vector<std::string>& ninja_command,
const std::string& build_id, int64_t start_time_millis,
const BuildMetadataMap& metadata) {
auto req = std::make_unique<rspb::CreateInvocationRequest>();
rspb::Invocation* invocation = req->mutable_invocation();
initialize_invocation(invocation, ninja_command, build_id, start_time_millis,metadata);
return req;
}
*/
ResultStoreStreamer::ResultStoreStreamer(
const std::vector<std::string>& ninja_command,
const std::string& config_string, const BuildMetadataMap& metadata,
const std::string& build_id, int64_t start_time_millis, OutputStream* out)
: ninja_command_(ninja_command), config_string_(config_string),
metadata_(metadata), // keep extra copy
start_time_millis_(start_time_millis),
initial_invocation_(std::make_unique<rspb::Invocation>()), out_(out) {
initialize_invocation(initial_invocation_.get(), ninja_command,
build_id, start_time_millis, metadata);
}
ResultStoreStreamer::~ResultStoreStreamer() = default;
void ResultStoreStreamer::PostRequest(const UploadRequest& req) {
WriteCodedOutput(out_, req);
}
const std::string& ResultStoreStreamer::invocation_id() const {
return initial_invocation_->id_.invocation_id_;
}
void ResultStoreStreamer::EdgeAddedToPlan(const Edge*) {
// No ResultStore upload to do.
}
void ResultStoreStreamer::EdgeRemovedFromPlan(const Edge*) {
// No ResultStore upload to do.
}
static bool char_is_unreserved(char c) {
// Note: std::isalnum is locale-dependent and slower, so spell this out:
if ((c >= 'a' && c <= 'z') ||
(c >= 'A' && c <= 'Z') ||
(c >= '0' && c <= '9')) {
return true;
}
switch (c) {
case '-':
case '.':
case '_':
case '~':
return true;
default:
break;
}
return false;
}
// See https://datatracker.ietf.org/doc/html/rfc3986 about
// url encoding.
static std::string url_encode(const std::string& in) {
std::string result;
for (unsigned char c : in) {
if (char_is_unreserved(c)) {
result.push_back(c);
} else {
StringAppendFormat(result, "%%%02X", int(c));
}
}
return result;
}
static std::string encode_action_name(
const std::string& invocation_id,
const char target_id[],
const char configuration_id[],
const std::string& action_id) {
// The resource name must be formatted:
// invocations/${INVOCATION_ID}/targets/${url_encode(TARGET_ID)}/configuredTargets/url_encode(${CONFIG_ID})/actions/${url_encode(ACTION_ID)}
return StringFormat(
"invocations/%s/targets/%s/configuredTargets/%s/actions/%s",
invocation_id.c_str(),
url_encode(target_id).c_str(),
url_encode(configuration_id).c_str(),
url_encode(action_id).c_str());
}
// Get an action_id from an Edge.
static std::string edge_action_id(const Edge* edge) {
// Use the primary output?
// assert that there is at least one output node
return edge->outputs_.front()->path();
}
void ResultStoreStreamer::BuildEdgeStarted(const Edge* edge,
int64_t start_time_millis) {
if (out_ == nullptr) return;
rspb::UploadRequest req;
req.set_upload_operation(rspb::UploadRequest::UploadOperation::CREATE);
rspb::Action* action = req.mutable_action();
auto* id = action->mutable_id();
id->set_invocation_id(invocation_id());
id->set_target_id(kDefaultTargetName);
id->set_configuration_id(kDefaultConfigurationName);
const std::string action_id(edge_action_id(edge)); // needs to be url_encoded
id->set_action_id(action_id);
action->set_name(
encode_action_name(
invocation_id(), kDefaultTargetName, kDefaultConfigurationName, action_id));
rspb::BuildAction* build_action = action->mutable_build_action();
build_action->set_type(edge->GetBinding("description"));
if (!edge->inputs_.empty()) {
build_action->set_primary_input_path(edge->inputs_.front()->path());
}
if (!edge->outputs_.empty()) {
build_action->set_primary_output_path(edge->outputs_.front()->path());
}
// rspb::ActionAttributes* action_attrs = action->mutable_action_attributes();
rspb::StatusAttributes* status_attrs = action->mutable_status_attributes();
status_attrs->set_status(rspb::Status::BUILDING);
// auto* deps = action->mutable_action_dependencies();
// iterate over edge inputs
for (const auto* input : edge->inputs_) {
rspb::Dependency dep;
auto* dep_id = dep.mutable_id();
dep_id->set_target_id(kDefaultTargetName);
dep_id->set_configuration_id(kDefaultConfigurationName);
dep_id->set_action_id(input->path());
action->add_action_dependencies(dep);
// dep.set_label(...); // description?
}
// auto* properties = action->mutable_properties();
// auto* files = action->mutable_files();
millis_to_timestamp(start_time_millis,
action->mutable_timing()->mutable_start_time());
PostRequest(req);
}
void ResultStoreStreamer::BuildEdgeFinished(
Edge* edge, int64_t start_time_millis, int64_t end_time_millis,
ExitStatus exit_code, const std::string& output) {
if (out_ == nullptr) return;
const std::string action_id(edge_action_id(edge));
{
// use initial_invocation_ in UploadRequest for batching
// UpdateActionRequest, as a generic UploadRequest
rspb::UploadRequest req;
req.set_upload_operation(rspb::UploadRequest::UploadOperation::UPDATE);
rspb::Action* action = req.mutable_action();
auto* id = action->mutable_id();
id->set_invocation_id(invocation_id());
id->set_target_id(kDefaultTargetName);
id->set_configuration_id(kDefaultConfigurationName);
id->set_action_id(action_id);
// update status and timing
auto* update_mask = req.mutable_update_mask();
update_mask->add_paths("status_attributes");
update_mask->add_paths("timing.duration");
const rspb::Status status = (exit_code == ExitSuccess ?
rspb::Status::BUILT :
rspb::Status::FAILED_TO_BUILD);
action->mutable_status_attributes()->set_status(status);
const int64_t action_duration = end_time_millis - start_time_millis;
millis_to_duration(action_duration,
action->mutable_timing()->mutable_duration());
PostRequest(req);
}
{
// FinalizeActionRequest, as a generic UploadRequest
rspb::UploadRequest req;
req.set_upload_operation(rspb::UploadRequest::UploadOperation::FINALIZE);
rspb::Action* action = req.mutable_action();
auto* id = action->mutable_id();
id->set_invocation_id(invocation_id());
id->set_target_id(kDefaultTargetName);
id->set_configuration_id(kDefaultConfigurationName);
id->set_action_id(action_id);
PostRequest(req);
}
}
void ResultStoreStreamer::BuildStarted() {
if (out_ == nullptr) return;
{
// Prepare a CreateInvocationRequest that fits into an UploadRequest.
// The consumer (proxy process) is expected to unpack this into a proper
// CreateInvocationRequest, while uploading subsequent requests
// in batches.
rspb::UploadRequest req;
req.set_upload_operation(rspb::UploadRequest::UploadOperation::CREATE);
rspb::Invocation* invocation = req.mutable_invocation();
*invocation = *initial_invocation_;
PostRequest(req);
}
{
// Define a default "configuration" that can be referenced by
// all targets. Technically, ninja is not configuration-aware,
// but in the future, we could inform it about configuration
// from tools like GN or cmake. For now, a single "default"
// configuration suffices for prototyping.
rspb::UploadRequest req;
req.set_upload_operation(rspb::UploadRequest::UploadOperation::CREATE);
rspb::Configuration* configuration = req.mutable_configuration();
auto* id = configuration->mutable_id();
id->set_invocation_id(invocation_id());
id->set_configuration_id(kDefaultConfigurationName);
// path-formatted resource name
configuration->set_name(
StringFormat("invocations/%s/configs/%s",
invocation_id().c_str(), kDefaultConfigurationName));
// status attributes. what does a configuration status even mean?
auto* status_attrs = configuration->mutable_status_attributes();
status_attrs->set_status(rspb::Status::BUILDING);
// status_attrs->set_description(); // only needed if non-obvious
// This will probably need to be finalized as BUILT or FAILED_TO_BUILD or
// CANCELLED during BuildFinished().
// configuration attributes
// auto* config_attrs = configuration->mutable_configuration_attributes();
// config_attrs.set_cpu(); // presumably the target cpu? is this required?
// properties
// auto* properties = target->mutable_properties();
// just use the default configuration id.
// id->set_display_name();
PostRequest(req);
}
{
// For now, define a "default" target that covers all actions by default.
// Eventually, this should be expanded based on command-line arguments.
rspb::UploadRequest req;
req.set_upload_operation(rspb::UploadRequest::UploadOperation::CREATE);
rspb::Target* target = req.mutable_target();
auto* id = target->mutable_id();
id->set_invocation_id(invocation_id());
id->set_target_id(kDefaultTargetName);
// path-formatted resource name
target->set_name(
StringFormat("invocations/%s/targets/%s",
invocation_id().c_str(),
url_encode(kDefaultTargetName).c_str()));
// Use the build start time the target's start time.
rspb::Timing* timing = target->mutable_timing();
millis_to_timestamp(start_time_millis_, timing->mutable_start_time());
// various attributes
// rspb::TargetAttributes* target_attrs =
// target->mutable_target_attributes();
// target_attrs->set_type();
// target_attrs->set_language();
// auto* tags = target_attrs->mutable_tags();
// rspb::TestAttributes* test_attrs = target->mutable_test_attributes();
// auto* properties = target->mutable_properties();
// auto* files = target->mutable_files();
target->set_visible(true);
PostRequest(req);
}
{
// CreateConfiguredTarget
rspb::UploadRequest req;
req.set_upload_operation(rspb::UploadRequest::UploadOperation::CREATE);
rspb::ConfiguredTarget* ctarget = req.mutable_configured_target();
auto* id = ctarget->mutable_id();
id->set_invocation_id(invocation_id());
id->set_target_id(kDefaultTargetName);
id->set_configuration_id(kDefaultTargetName);
// path-formatted resource name
ctarget->set_name(StringFormat(
"invocations/%s/targets/%s/configuredTarget/%s",
invocation_id().c_str(),
url_encode(kDefaultTargetName).c_str(),
url_encode(kDefaultConfigurationName).c_str()));
// rspb::StatusAttributes* status_attrs =
// ctarget->mutable_status_attributes();
// Use the build start time the target's start time.
rspb::Timing* timing = ctarget->mutable_timing();
millis_to_timestamp(start_time_millis_, timing->mutable_start_time());
// no configured test attributes
// auto* properties = ctarget->mutable_properties();
// auto* files = ctarget->mutable_files();
PostRequest(req);
}
}
void ResultStoreStreamer::BuildFinished(ExitStatus exit_status) {
if (out_ == nullptr) return;
// Since there is only one configuration, target, configuredTarget,
// use the same status for all of them.
const rspb::Status common_status = (exit_status == ExitSuccess ?
rspb::Status::BUILT :
rspb::Status::FAILED_TO_BUILD);
const int64_t common_duration = GetTimeMillis() - start_time_millis_;
// UpdateConfiguredTarget
{
rspb::UploadRequest req;
req.set_upload_operation(rspb::UploadRequest::UploadOperation::UPDATE);
auto* update_mask = req.mutable_update_mask();
update_mask->add_paths("status_attributes");
update_mask->add_paths("timing.duration");
rspb::ConfiguredTarget* ctarget = req.mutable_configured_target();
auto* id = ctarget->mutable_id();
id->set_target_id(kDefaultTargetName);
id->set_configuration_id(kDefaultConfigurationName);
ctarget->mutable_status_attributes()->set_status(common_status);
millis_to_duration(common_duration,
ctarget->mutable_timing()->mutable_duration());
PostRequest(req);
}
// FinalizeConfiguredTarget
{
rspb::UploadRequest req;
req.set_upload_operation(rspb::UploadRequest::UploadOperation::FINALIZE);
rspb::ConfiguredTarget* ctarget = req.mutable_configured_target();
auto* id = ctarget->mutable_id();
id->set_target_id(kDefaultTargetName);
id->set_configuration_id(kDefaultConfigurationName);
PostRequest(req);
}
// UpdateTarget
{
rspb::UploadRequest req;
req.set_upload_operation(rspb::UploadRequest::UploadOperation::UPDATE);
// status_attributes is deprecated here
auto* update_mask = req.mutable_update_mask();
update_mask->add_paths("timing.duration");
rspb::Target* target = req.mutable_target();
auto* id = target->mutable_id();
id->set_target_id(kDefaultTargetName);
millis_to_duration(common_duration,
target->mutable_timing()->mutable_duration());
PostRequest(req);
}
// FinalizeTarget
{
rspb::UploadRequest req;
req.set_upload_operation(rspb::UploadRequest::UploadOperation::FINALIZE);
rspb::Target* target = req.mutable_target();
auto* id = target->mutable_id();
id->set_target_id(kDefaultTargetName);
PostRequest(req);
}
// UpdateConfiguration
{
rspb::UploadRequest req;
req.set_upload_operation(rspb::UploadRequest::UploadOperation::UPDATE);
// status_attributes is deprecated here
auto* update_mask = req.mutable_update_mask();
update_mask->add_paths("status_attributes");
rspb::Configuration* config = req.mutable_configuration();
auto* id = config->mutable_id();
id->set_configuration_id(kDefaultConfigurationName);
config->mutable_status_attributes()->set_status(common_status);
PostRequest(req);
}
// FinalizeConfiguration
{
rspb::UploadRequest req;
req.set_upload_operation(rspb::UploadRequest::UploadOperation::FINALIZE);
rspb::Configuration* target = req.mutable_configuration();
auto* id = target->mutable_id();
id->set_configuration_id(kDefaultConfigurationName);
PostRequest(req);
}
// UpdateInvocation, using equivalent UploadRequest
{
rspb::UploadRequest req;
req.set_upload_operation(rspb::UploadRequest::UploadOperation::UPDATE);
auto* update_mask = req.mutable_update_mask();
update_mask->add_paths("status_attributes");
update_mask->add_paths("timing.duration");
update_mask->add_paths("invocation_attributes.exit_code");
// record exit status and finish time
rspb::Invocation* invocation = req.mutable_invocation();
invocation->mutable_status_attributes()->set_status(common_status);
millis_to_duration(common_duration,
invocation->mutable_timing()->mutable_duration());
invocation->mutable_invocation_attributes()->set_exit_code(exit_status);
PostRequest(req);
}
// FinalizeInvocation, using equivalent UploadRequest
{
rspb::UploadRequest req;
req.set_upload_operation(rspb::UploadRequest::UploadOperation::FINALIZE);
req.mutable_invocation(); // no need to re-populate
// Expect proxy tool to supply the necessary batch parameters
// corresponding to this invocation.
PostRequest(req);
}
}
void ResultStoreStreamer::SetExplanations(Explanations*) {
// No ResultStore upload to do.
}
void ResultStoreStreamer::WarningV(const char* msg, va_list args) {
// TODO: implement this
}
void ResultStoreStreamer::ErrorV(const char* msg, va_list args) {
// TODO: implement this
}
void ResultStoreStreamer::InfoV(const char* msg, va_list args) {
// TODO: implement this
}