blob: 0444f55e6520514a6fe1428b068e7c928652768b [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <fidl/fuchsia.inspect/cpp/natural_types.h>
#include <lib/async/cpp/task.h>
#include <lib/component/incoming/cpp/protocol.h>
#include <lib/diagnostics/reader/cpp/archive_reader.h>
#include <lib/diagnostics/reader/cpp/constants.h>
#include <lib/diagnostics/reader/cpp/inspect.h>
#include <lib/fpromise/bridge.h>
#include <lib/inspect/cpp/hierarchy.h>
#include <unistd.h>
#include <algorithm>
#include <iostream>
#include <memory>
#include <set>
#include <string>
#include <thread>
#include <vector>
#include <rapidjson/document.h>
#include <rapidjson/rapidjson.h>
#include <src/lib/fsl/vmo/strings.h>
#include <src/lib/fxl/strings/join_strings.h>
#include "lib/async/sequence_id.h"
#include "lib/async/task.h"
#include "lib/fpromise/promise.h"
#include "lib/sync/completion.h"
namespace diagnostics::reader {
// Time to delay between snapshots to find components.
// 250ms so that tests are not overly delayed. Missing the component at
// first is common since the system needs time to start it and receive
// the events.
constexpr size_t kDelayMs = 250;
void EmplaceInspect(rapidjson::Document document, std::vector<InspectData>* out) {
if (document.IsArray()) {
for (auto& value : document.GetArray()) {
// We need to ensure that the value is safely moved between documents, which may involve
// copying.
//
// It is an error to maintain a reference to a Value in a Document after that Document is
// destroyed, and the input |document| is destroyed immediately after this branch.
rapidjson::Document value_document;
rapidjson::Value temp(value.Move(), value_document.GetAllocator());
value_document.Swap(temp);
out->emplace_back(InspectData(std::move(value_document)));
}
} else {
out->emplace_back(InspectData(std::move(document)));
}
}
namespace {
void InnerReadBatches(fidl::SharedClient<fuchsia_diagnostics::BatchIterator> ptr,
fpromise::bridge<std::vector<InspectData>, std::string> done,
std::vector<InspectData> ret) {
ptr->GetNext().Then(
[ptr = std::move(ptr), done = std::move(done), ret = std::move(ret)](auto& result) mutable {
if (result.is_error()) {
done.completer.complete_error("Batch iterator returned error: " +
result.error_value().FormatDescription());
return;
}
if (result->batch().empty()) {
done.completer.complete_ok(std::move(ret));
return;
}
for (auto& content : result->batch()) {
if (content.Which() != fuchsia_diagnostics::FormattedContent::Tag::kJson) {
done.completer.complete_error("Received an unexpected content format");
return;
}
std::string json;
fsl::SizedVmo vmo(std::move(content.json()->vmo()), content.json()->size());
if (!fsl::StringFromVmo(vmo, &json)) {
done.completer.complete_error("Failed to read returned VMO");
return;
}
rapidjson::Document document;
document.Parse(json);
EmplaceInspect(std::move(document), &ret);
}
InnerReadBatches(std::move(ptr), std::move(done), std::move(ret));
});
}
fpromise::promise<std::vector<InspectData>, std::string> ReadBatches(
fidl::SharedClient<fuchsia_diagnostics::BatchIterator> ptr) {
fpromise::bridge<std::vector<InspectData>, std::string> result;
auto consumer = std::move(result.consumer);
InnerReadBatches(std::move(ptr), std::move(result), {});
return consumer.promise_or(fpromise::error("Failed to obtain consumer promise"));
}
fidl::ClientEnd<fuchsia_diagnostics::ArchiveAccessor> ConnectToAccessor() {
auto archive = component::Connect<fuchsia_diagnostics::ArchiveAccessor>();
ZX_ASSERT(archive.is_ok());
return std::move(*archive);
}
} // namespace
// TODO(b/303304683): This must become the primary API. Currently it is delegating to the
// deprecated FIDL system.
ArchiveReader::ArchiveReader(async_dispatcher_t* dispatcher, std::vector<std::string> selectors)
: ArchiveReader(dispatcher, selectors, ConnectToAccessor()) {}
ArchiveReader::ArchiveReader(async_dispatcher_t* dispatcher, std::vector<std::string> selectors,
fidl::ClientEnd<fuchsia_diagnostics::ArchiveAccessor> archive)
: archive_(Bind(dispatcher, std::move(archive))),
executor_(dispatcher),
selectors_(std::move(selectors)) {
ZX_ASSERT(dispatcher != nullptr);
}
fidl::SharedClient<fuchsia_diagnostics::ArchiveAccessor> ArchiveReader::Bind(
async_dispatcher_t* dispatcher, fidl::ClientEnd<fuchsia_diagnostics::ArchiveAccessor> archive) {
fidl::SharedClient<fuchsia_diagnostics::ArchiveAccessor> old;
old.Bind(std::move(archive), dispatcher);
return old;
}
fpromise::promise<std::vector<InspectData>, std::string> ArchiveReader::GetInspectSnapshot() {
fpromise::bridge<std::vector<InspectData>, std::string> bridge;
std::vector<fuchsia_diagnostics::SelectorArgument> selector_args;
for (const auto& selector : selectors_) {
auto arg = fuchsia_diagnostics::SelectorArgument::WithRawSelector(selector);
selector_args.emplace_back(std::move(arg));
}
fuchsia_diagnostics::StreamParameters params;
params.data_type(fuchsia_diagnostics::DataType::kInspect);
params.stream_mode(fuchsia_diagnostics::StreamMode::kSnapshot);
params.format(fuchsia_diagnostics::Format::kJson);
if (!selector_args.empty()) {
params.client_selector_configuration(
fuchsia_diagnostics::ClientSelectorConfiguration::WithSelectors(std::move(selector_args)));
} else {
params.client_selector_configuration(
fuchsia_diagnostics::ClientSelectorConfiguration::WithSelectAll(true));
}
auto endpoints = fidl::CreateEndpoints<fuchsia_diagnostics::BatchIterator>();
fidl::Request<fuchsia_diagnostics::ArchiveAccessor::StreamDiagnostics> request;
request.result_stream(std::move(endpoints->server));
request.stream_parameters(std::move(params));
archive_->StreamDiagnostics(std::move(request)).is_ok();
fidl::SharedClient<fuchsia_diagnostics::BatchIterator> client;
client.Bind(std::move(endpoints->client), this->executor_.dispatcher());
executor_.schedule_task(
ReadBatches(std::move(client))
.then([completer = std::move(bridge.completer)](
fpromise::result<std::vector<InspectData>, std::string>& result) mutable {
if (result.is_ok()) {
completer.complete_ok(result.take_value());
} else {
completer.complete_error(result.take_error());
}
}));
return bridge.consumer.promise_or(fpromise::error("Failed to obtain consumer promise"));
}
fpromise::promise<std::vector<InspectData>, std::string> ArchiveReader::SnapshotInspectUntilPresent(
std::vector<std::string> monikers) {
fpromise::bridge<std::vector<InspectData>, std::string> bridge;
InnerSnapshotInspectUntilPresent(std::move(bridge.completer), std::move(monikers));
return bridge.consumer.promise_or(fpromise::error("Failed to create bridge promise"));
}
void ArchiveReader::InnerSnapshotInspectUntilPresent(
fpromise::completer<std::vector<InspectData>, std::string> completer,
std::vector<std::string> monikers) {
executor_.schedule_task(
GetInspectSnapshot()
.then([this, monikers = std::move(monikers), completer = std::move(completer)](
fpromise::result<std::vector<InspectData>, std::string>& result) mutable {
if (result.is_error()) {
completer.complete_error(result.take_error());
return;
}
auto value = result.take_value();
std::set<std::string> remaining(monikers.begin(), monikers.end());
for (const auto& val : value) {
remaining.erase(val.moniker());
}
if (remaining.empty()) {
completer.complete_ok(std::move(value));
} else {
fpromise::bridge<> timeout;
async::PostDelayedTask(
executor_.dispatcher(),
[completer = std::move(timeout.completer)]() mutable { completer.complete_ok(); },
zx::msec(kDelayMs));
executor_.schedule_task(
timeout.consumer.promise_or(fpromise::error())
.then([this, completer = std::move(completer),
monikers = std::move(monikers)](fpromise::result<>& res) mutable {
InnerSnapshotInspectUntilPresent(std::move(completer), std::move(monikers));
})
.wrap_with(scope_));
}
})
.wrap_with(scope_));
}
LogsSubscription ArchiveReader::GetLogs(fuchsia_diagnostics::StreamMode mode) {
auto iterator = GetBatchIterator(fuchsia_diagnostics::DataType::kLogs, std::move(mode));
return LogsSubscription(std::move(iterator), executor_);
}
fidl::SharedClient<fuchsia_diagnostics::BatchIterator> ArchiveReader::GetBatchIterator(
fuchsia_diagnostics::DataType data_type, fuchsia_diagnostics::StreamMode stream_mode) {
std::vector<fuchsia_diagnostics::SelectorArgument> selector_args;
for (const auto& selector : selectors_) {
auto arg = fuchsia_diagnostics::SelectorArgument::WithRawSelector(selector);
selector_args.emplace_back(std::move(arg));
}
fuchsia_diagnostics::StreamParameters params;
params.data_type(data_type);
params.stream_mode(stream_mode);
params.format(fuchsia_diagnostics::Format::kJson);
if (!selector_args.empty()) {
auto client_selector_config =
fuchsia_diagnostics::ClientSelectorConfiguration::WithSelectors(std::move(selector_args));
params.client_selector_configuration(std::move(client_selector_config));
} else {
auto client_selector_config =
fuchsia_diagnostics::ClientSelectorConfiguration::WithSelectAll(true);
params.client_selector_configuration(std::move(client_selector_config));
}
auto endpoints = fidl::CreateEndpoints<fuchsia_diagnostics::BatchIterator>();
fidl::Request<fuchsia_diagnostics::ArchiveAccessor::StreamDiagnostics> request;
request.result_stream(std::move(endpoints->server));
request.stream_parameters(std::move(params));
archive_->StreamDiagnostics(std::move(request)).is_ok();
fidl::SharedClient<fuchsia_diagnostics::BatchIterator> client;
client.Bind(std::move(endpoints->client), this->executor_.dispatcher());
return client;
}
std::string SanitizeMonikerForSelectors(std::string_view moniker) {
std::string result(moniker);
const size_t ESCAPED_LEN = 2;
const size_t COLON_LEN = 1;
size_t index = 0;
while ((index = result.find(":", index)) != std::string::npos) {
result.replace(index, COLON_LEN, "\\:");
index += ESCAPED_LEN;
}
return result;
}
std::string MakeSelector(std::string_view moniker,
std::optional<std::vector<std::string>> inspect_tree_names,
std::vector<std::string> hierarchy, std::optional<std::string> property) {
std::string selector =
SanitizeMonikerForSelectors(moniker) + ":"; // going to add at least the root node
if (inspect_tree_names.has_value() && !inspect_tree_names.value().empty()) {
if (inspect_tree_names.value()[0] == "...") {
// syntax to match any name
selector += "[...]";
} else {
selector += "[";
for (auto i = 0ul; i < inspect_tree_names->size(); i++) {
selector += "name=\"" + std::move(inspect_tree_names.value()[i]) + "\"";
if (i < inspect_tree_names->size() - 1) {
selector += ", ";
}
}
selector += "]";
}
} else {
selector += std::string{"[name=\""} + fuchsia_inspect::kDefaultTreeName + "\"]";
}
if (hierarchy.empty()) {
selector += "root";
} else {
for (auto i = 0ul; i < hierarchy.size(); i++) {
selector += std::move(hierarchy[i]);
// all but the last element are separated by slashes; the last is separated colon
if (i < hierarchy.size() - 1) {
selector += "/";
}
}
}
if (property.has_value()) {
selector += ":" + std::move(property.value());
}
return selector;
}
} // namespace diagnostics::reader