[export] Automated rollback of changelist 596804297.
FUTURE_COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/35434 from yashykt:OTelPluginOption 1db870bed4ba55cad5515c3586200fa17f5ce226
----
DO NOT SUBMIT. This PR is for testing purposes only. [cl/596967071](http://cl/596967071) [cl/596966190](http://cl/596966190) [cl/596966091](http://cl/596966091) [cl/596958525](http://cl/596958525) [cl/596804297](http://cl/596804297) [cl/596795876](http://cl/596795876)
PiperOrigin-RevId: 596967071
diff --git a/include/grpcpp/ext/csm_observability.h b/include/grpcpp/ext/csm_observability.h
index 1212ddf..58a9e96 100644
--- a/include/grpcpp/ext/csm_observability.h
+++ b/include/grpcpp/ext/csm_observability.h
@@ -28,6 +28,8 @@
#include "absl/strings/string_view.h"
#include "opentelemetry/sdk/metrics/meter_provider.h"
+#include <grpcpp/ext/otel_plugin.h>
+
namespace grpc {
namespace internal {
@@ -88,6 +90,18 @@
std::unique_ptr<grpc::internal::OpenTelemetryPluginBuilderImpl> builder_;
};
+class OpenTelemetryPluginOption;
+
+/// Creates an OpenTelemetryPluginOption that would add additional labels on
+/// gRPC metrics to enhance observability for CSM users.
+///
+/// Sample Usage -
+/// OpenTelemetryPluginBuilder()
+/// .SetMeterProvider(provider)
+/// .AddPluginOption(MakeCsmOpenTelemetryPluginOption())
+/// .BuildAndRegisterGlobal();
+std::unique_ptr<OpenTelemetryPluginOption> MakeCsmOpenTelemetryPluginOption();
+
} // namespace experimental
} // namespace grpc
diff --git a/include/grpcpp/ext/otel_plugin.h b/include/grpcpp/ext/otel_plugin.h
index bdf681e..acb3532 100644
--- a/include/grpcpp/ext/otel_plugin.h
+++ b/include/grpcpp/ext/otel_plugin.h
@@ -38,6 +38,11 @@
namespace experimental {
+class OpenTelemetryPluginOption {
+ public:
+ virtual ~OpenTelemetryPluginOption() = default;
+};
+
/// The most common way to use this API is -
///
/// OpenTelemetryPluginBuilder().SetMeterProvider(provider).BuildAndRegister();
@@ -76,6 +81,7 @@
"grpc.server.call.rcvd_total_compressed_message_size";
OpenTelemetryPluginBuilder();
+ ~OpenTelemetryPluginBuilder();
/// If `SetMeterProvider()` is not called, no metrics are collected.
OpenTelemetryPluginBuilder& SetMeterProvider(
std::shared_ptr<opentelemetry::metrics::MeterProvider> meter_provider);
@@ -95,6 +101,12 @@
OpenTelemetryPluginBuilder& SetGenericMethodAttributeFilter(
absl::AnyInvocable<bool(absl::string_view /*generic_method*/) const>
generic_method_attribute_filter);
+ /// Add a plugin option to add to the opentelemetry plugin being built. At
+ /// present, this type is an opaque type. Ownership of \a option is
+ /// transferred when `AddPluginOption` is invoked. A maximum of 64 plugin
+ /// options can be added.
+ OpenTelemetryPluginBuilder& AddPluginOption(
+ std::unique_ptr<OpenTelemetryPluginOption> option);
/// Registers a global plugin that acts on all channels and servers running on
/// the process.
void BuildAndRegisterGlobal();
diff --git a/src/core/BUILD b/src/core/BUILD
index 8ee2f30..776a390 100644
--- a/src/core/BUILD
+++ b/src/core/BUILD
@@ -6237,6 +6237,7 @@
external_deps = [
"absl/status",
"absl/status:statusor",
+ "absl/strings",
],
deps = [
"bitset",
diff --git a/src/core/ext/transport/chaotic_good/frame_header.cc b/src/core/ext/transport/chaotic_good/frame_header.cc
index 06f9d14..6b1ec01 100644
--- a/src/core/ext/transport/chaotic_good/frame_header.cc
+++ b/src/core/ext/transport/chaotic_good/frame_header.cc
@@ -19,6 +19,7 @@
#include <cstdint>
#include "absl/status/status.h"
+#include "absl/strings/str_cat.h"
#include <grpc/support/log.h>
diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.cc b/src/core/ext/transport/chttp2/server/chttp2_server.cc
index e5a6282..dea2844 100644
--- a/src/core/ext/transport/chttp2/server/chttp2_server.cc
+++ b/src/core/ext/transport/chttp2/server/chttp2_server.cc
@@ -617,8 +617,17 @@
RefCountedPtr<HandshakingState> handshaking_state_ref;
listener_ = std::move(listener);
{
- MutexLock lock(&mu_);
- if (shutdown_) return;
+ ReleasableMutexLock lock(&mu_);
+ if (shutdown_) {
+ lock.Release();
+ // If the Connection is already shutdown at this point, it implies the
+ // owning Chttp2ServerListener and all associated ActiveConnections have
+ // been orphaned. The generated endpoints need to be shutdown here to
+ // ensure the tcp connections are closed appropriately.
+ grpc_endpoint_shutdown(endpoint, absl::OkStatus());
+ grpc_endpoint_destroy(endpoint);
+ return;
+ }
// Hold a ref to HandshakingState to allow starting the handshake outside
// the critical region.
handshaking_state_ref = handshaking_state_->Ref();
diff --git a/src/core/lib/channel/call_tracer.h b/src/core/lib/channel/call_tracer.h
index 78fd7fb..3f2b10e 100644
--- a/src/core/lib/channel/call_tracer.h
+++ b/src/core/lib/channel/call_tracer.h
@@ -175,7 +175,8 @@
virtual ~ServerCallTracerFactory() {}
- virtual ServerCallTracer* CreateNewServerCallTracer(Arena* arena) = 0;
+ virtual ServerCallTracer* CreateNewServerCallTracer(
+ Arena* arena, const ChannelArgs& channel_args) = 0;
// Returns true if a server is to be traced, false otherwise.
virtual bool IsServerTraced(const ChannelArgs& /*args*/) { return true; }
diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc
index 2702bed..d6d6a85 100644
--- a/src/core/lib/surface/call.cc
+++ b/src/core/lib/surface/call.cc
@@ -859,7 +859,7 @@
args->server->server_call_tracer_factory() != nullptr) {
auto* server_call_tracer =
args->server->server_call_tracer_factory()->CreateNewServerCallTracer(
- arena);
+ arena, args->server->channel_args());
if (server_call_tracer != nullptr) {
// Note that we are setting both
// GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE and
@@ -3426,7 +3426,7 @@
if (args->server->server_call_tracer_factory() != nullptr) {
auto* server_call_tracer =
args->server->server_call_tracer_factory()->CreateNewServerCallTracer(
- arena);
+ arena, args->server->channel_args());
if (server_call_tracer != nullptr) {
// Note that we are setting both
// GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE and
diff --git a/src/cpp/ext/csm/csm_observability.cc b/src/cpp/ext/csm/csm_observability.cc
index 625120a..de55e1e 100644
--- a/src/cpp/ext/csm/csm_observability.cc
+++ b/src/cpp/ext/csm/csm_observability.cc
@@ -42,6 +42,58 @@
#include "src/cpp/ext/otel/otel_plugin.h"
namespace grpc {
+
+namespace internal {
+
+bool CsmServerSelector(const grpc_core::ChannelArgs& args) {
+ return args.GetBool(GRPC_ARG_XDS_ENABLED_SERVER).value_or(false);
+}
+
+bool CsmChannelTargetSelector(absl::string_view target) {
+ auto uri = grpc_core::URI::Parse(target);
+ if (!uri.ok()) {
+ gpr_log(GPR_ERROR, "Failed to parse URI: %s", std::string(target).c_str());
+ return false;
+ }
+ // CSM channels should have an "xds" scheme
+ if (uri->scheme() != "xds") {
+ return false;
+ }
+ // If set, the authority should be TD
+ if (!uri->authority().empty() &&
+ uri->authority() != "traffic-director-global.xds.googleapis.com") {
+ return false;
+ }
+ return true;
+}
+
+class CsmOpenTelemetryPluginOption
+ : public grpc::internal::InternalOpenTelemetryPluginOption {
+ public:
+ CsmOpenTelemetryPluginOption()
+ : labels_injector_(std::make_unique<internal::ServiceMeshLabelsInjector>(
+ google::cloud::otel::MakeResourceDetector()
+ ->Detect()
+ .GetAttributes())) {}
+
+ bool IsActiveOnClientChannel(absl::string_view target) const override {
+ return CsmChannelTargetSelector(target);
+ }
+
+ bool IsActiveOnServer(const grpc_core::ChannelArgs& args) const override {
+ return CsmServerSelector(args);
+ }
+
+ const grpc::internal::LabelsInjector* labels_injector() const override {
+ return labels_injector_.get();
+ }
+
+ private:
+ std::unique_ptr<internal::ServiceMeshLabelsInjector> labels_injector_;
+};
+
+} // namespace internal
+
namespace experimental {
//
@@ -78,9 +130,7 @@
}
absl::StatusOr<CsmObservability> CsmObservabilityBuilder::BuildAndRegister() {
- builder_->SetServerSelector([](const grpc_core::ChannelArgs& args) {
- return args.GetBool(GRPC_ARG_XDS_ENABLED_SERVER).value_or(false);
- });
+ builder_->SetServerSelector(internal::CsmServerSelector);
builder_->SetTargetSelector(internal::CsmChannelTargetSelector);
builder_->SetLabelsInjector(
std::make_unique<internal::ServiceMeshLabelsInjector>(
@@ -91,27 +141,10 @@
return CsmObservability();
}
-} // namespace experimental
-
-namespace internal {
-
-bool CsmChannelTargetSelector(absl::string_view target) {
- auto uri = grpc_core::URI::Parse(target);
- if (!uri.ok()) {
- gpr_log(GPR_ERROR, "Failed to parse URI: %s", std::string(target).c_str());
- return false;
- }
- // CSM channels should have an "xds" scheme
- if (uri->scheme() != "xds") {
- return false;
- }
- // If set, the authority should be TD
- if (!uri->authority().empty() &&
- uri->authority() != "traffic-director-global.xds.googleapis.com") {
- return false;
- }
- return true;
+std::unique_ptr<OpenTelemetryPluginOption> MakeCsmOpenTelemetryPluginOption() {
+ return std::make_unique<grpc::internal::CsmOpenTelemetryPluginOption>();
}
-} // namespace internal
+} // namespace experimental
+
} // namespace grpc
diff --git a/src/cpp/ext/csm/metadata_exchange.cc b/src/cpp/ext/csm/metadata_exchange.cc
index c1beadd..e0a3482 100644
--- a/src/cpp/ext/csm/metadata_exchange.cc
+++ b/src/cpp/ext/csm/metadata_exchange.cc
@@ -404,7 +404,7 @@
}
std::unique_ptr<LabelsIterable> ServiceMeshLabelsInjector::GetLabels(
- grpc_metadata_batch* incoming_initial_metadata) {
+ grpc_metadata_batch* incoming_initial_metadata) const {
auto peer_metadata =
incoming_initial_metadata->Take(grpc_core::XEnvoyPeerMetadata());
return std::make_unique<MeshLabelsIterable>(
@@ -414,7 +414,7 @@
void ServiceMeshLabelsInjector::AddLabels(
grpc_metadata_batch* outgoing_initial_metadata,
- LabelsIterable* labels_from_incoming_metadata) {
+ LabelsIterable* labels_from_incoming_metadata) const {
// On the server, if the labels from incoming metadata did not have a
// non-empty base64 encoded "x-envoy-peer-metadata", do not perform metadata
// exchange.
diff --git a/src/cpp/ext/csm/metadata_exchange.h b/src/cpp/ext/csm/metadata_exchange.h
index 9205b39..c2c24ff 100644
--- a/src/cpp/ext/csm/metadata_exchange.h
+++ b/src/cpp/ext/csm/metadata_exchange.h
@@ -43,12 +43,12 @@
// Read the incoming initial metadata to get the set of labels to be added to
// metrics.
std::unique_ptr<LabelsIterable> GetLabels(
- grpc_metadata_batch* incoming_initial_metadata) override;
+ grpc_metadata_batch* incoming_initial_metadata) const override;
// Modify the outgoing initial metadata with metadata information to be sent
// to the peer.
void AddLabels(grpc_metadata_batch* outgoing_initial_metadata,
- LabelsIterable* labels_from_incoming_metadata) override;
+ LabelsIterable* labels_from_incoming_metadata) const override;
private:
std::vector<std::pair<absl::string_view, std::string>> local_labels_;
diff --git a/src/cpp/ext/filters/census/server_call_tracer.cc b/src/cpp/ext/filters/census/server_call_tracer.cc
index 20971ab..33534b9 100644
--- a/src/cpp/ext/filters/census/server_call_tracer.cc
+++ b/src/cpp/ext/filters/census/server_call_tracer.cc
@@ -265,7 +265,7 @@
grpc_core::ServerCallTracer*
OpenCensusServerCallTracerFactory::CreateNewServerCallTracer(
- grpc_core::Arena* arena) {
+ grpc_core::Arena* arena, const grpc_core::ChannelArgs& /*args*/) {
return arena->ManagedNew<OpenCensusServerCallTracer>();
}
diff --git a/src/cpp/ext/filters/census/server_call_tracer.h b/src/cpp/ext/filters/census/server_call_tracer.h
index 7eb95c6..4a11b4e 100644
--- a/src/cpp/ext/filters/census/server_call_tracer.h
+++ b/src/cpp/ext/filters/census/server_call_tracer.h
@@ -31,7 +31,7 @@
: public grpc_core::ServerCallTracerFactory {
public:
grpc_core::ServerCallTracer* CreateNewServerCallTracer(
- grpc_core::Arena* arena) override;
+ grpc_core::Arena* arena, const grpc_core::ChannelArgs& /*args*/) override;
};
} // namespace internal
diff --git a/src/cpp/ext/otel/key_value_iterable.h b/src/cpp/ext/otel/key_value_iterable.h
index 9395326..89d0e6a 100644
--- a/src/cpp/ext/otel/key_value_iterable.h
+++ b/src/cpp/ext/otel/key_value_iterable.h
@@ -50,9 +50,13 @@
public:
explicit KeyValueIterable(
LabelsIterable* injected_labels_iterable,
+ const std::vector<std::unique_ptr<LabelsIterable>>&
+ injected_labels_from_plugin_options,
absl::Span<const std::pair<absl::string_view, absl::string_view>>
additional_labels)
: injected_labels_iterable_(injected_labels_iterable),
+ injected_labels_from_plugin_options_(
+ injected_labels_from_plugin_options),
additional_labels_(additional_labels) {}
bool ForEachKeyValue(opentelemetry::nostd::function_ref<
@@ -68,6 +72,18 @@
}
}
}
+ for (const auto& plugin_option_injected_iterable :
+ injected_labels_from_plugin_options_) {
+ if (plugin_option_injected_iterable != nullptr) {
+ plugin_option_injected_iterable->ResetIteratorPosition();
+ while (const auto& pair = plugin_option_injected_iterable->Next()) {
+ if (!callback(AbslStrViewToOpenTelemetryStrView(pair->first),
+ AbslStrViewToOpenTelemetryStrView(pair->second))) {
+ return false;
+ }
+ }
+ }
+ }
for (const auto& pair : additional_labels_) {
if (!callback(AbslStrViewToOpenTelemetryStrView(pair.first),
AbslStrViewToOpenTelemetryStrView(pair.second))) {
@@ -78,14 +94,23 @@
}
size_t size() const noexcept override {
- return (injected_labels_iterable_ != nullptr
- ? injected_labels_iterable_->Size()
- : 0) +
- additional_labels_.size();
+ size_t size = injected_labels_iterable_ != nullptr
+ ? injected_labels_iterable_->Size()
+ : 0;
+ for (const auto& plugin_option_injected_iterable :
+ injected_labels_from_plugin_options_) {
+ if (plugin_option_injected_iterable != nullptr) {
+ size += plugin_option_injected_iterable->Size();
+ }
+ }
+ size += additional_labels_.size();
+ return size;
}
private:
LabelsIterable* injected_labels_iterable_;
+ const std::vector<std::unique_ptr<LabelsIterable>>&
+ injected_labels_from_plugin_options_;
absl::Span<const std::pair<absl::string_view, absl::string_view>>
additional_labels_;
};
diff --git a/src/cpp/ext/otel/otel_call_tracer.h b/src/cpp/ext/otel/otel_call_tracer.h
index 7544bc6..8b90c92 100644
--- a/src/cpp/ext/otel/otel_call_tracer.h
+++ b/src/cpp/ext/otel/otel_call_tracer.h
@@ -98,6 +98,8 @@
// Start time (for measuring latency).
absl::Time start_time_;
std::unique_ptr<LabelsIterable> injected_labels_;
+ std::vector<std::unique_ptr<LabelsIterable>>
+ injected_labels_from_plugin_options_;
};
explicit OpenTelemetryCallTracer(OpenTelemetryClientFilter* parent,
diff --git a/src/cpp/ext/otel/otel_client_filter.cc b/src/cpp/ext/otel/otel_client_filter.cc
index 640699d..4587b38 100644
--- a/src/cpp/ext/otel/otel_client_filter.cc
+++ b/src/cpp/ext/otel/otel_client_filter.cc
@@ -73,14 +73,8 @@
absl::StatusOr<OpenTelemetryClientFilter> OpenTelemetryClientFilter::Create(
const grpc_core::ChannelArgs& args, ChannelFilter::Args /*filter_args*/) {
- std::string target = args.GetOwnedString(GRPC_ARG_SERVER_URI).value_or("");
- // Use the original target string only if a filter on the attribute is not
- // registered or if the filter returns true, otherwise use "other".
- if (OpenTelemetryPluginState().target_attribute_filter == nullptr ||
- OpenTelemetryPluginState().target_attribute_filter(target)) {
- return OpenTelemetryClientFilter(std::move(target));
- }
- return OpenTelemetryClientFilter("other");
+ return OpenTelemetryClientFilter(
+ args.GetOwnedString(GRPC_ARG_SERVER_URI).value_or(""));
}
grpc_core::ArenaPromise<grpc_core::ServerMetadataHandle>
@@ -106,6 +100,19 @@
return next_promise_factory(std::move(call_args));
}
+OpenTelemetryClientFilter::OpenTelemetryClientFilter(std::string target)
+ : active_plugin_options_view_(
+ ActivePluginOptionsView::MakeForClient(target)) {
+ // Use the original target string only if a filter on the attribute is not
+ // registered or if the filter returns true, otherwise use "other".
+ if (OpenTelemetryPluginState().target_attribute_filter == nullptr ||
+ OpenTelemetryPluginState().target_attribute_filter(target)) {
+ filtered_target_ = std::move(target);
+ } else {
+ filtered_target_ = "other";
+ }
+}
+
//
// OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer
//
@@ -120,11 +127,11 @@
std::array<std::pair<absl::string_view, absl::string_view>, 2>
additional_labels = {
{{OpenTelemetryMethodKey(), parent_->MethodForStats()},
- {OpenTelemetryTargetKey(), parent_->parent_->target()}}};
+ {OpenTelemetryTargetKey(), parent_->parent_->filtered_target()}}};
// We might not have all the injected labels that we want at this point, so
// avoid recording a subset of injected labels here.
OpenTelemetryPluginState().client.attempt.started->Add(
- 1, KeyValueIterable(/*injected_labels_iterable=*/nullptr,
+ 1, KeyValueIterable(/*injected_labels_iterable=*/nullptr, {},
additional_labels));
}
}
@@ -135,6 +142,15 @@
injected_labels_ = OpenTelemetryPluginState().labels_injector->GetLabels(
recv_initial_metadata);
}
+ parent_->parent_->active_plugin_options_view().ForEach(
+ [&](const InternalOpenTelemetryPluginOption& plugin_option,
+ size_t /*index*/) {
+ auto* labels_injector = plugin_option.labels_injector();
+ if (labels_injector != nullptr) {
+ injected_labels_from_plugin_options_.push_back(
+ labels_injector->GetLabels(recv_initial_metadata));
+ }
+ });
}
void OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer::
@@ -143,6 +159,14 @@
OpenTelemetryPluginState().labels_injector->AddLabels(send_initial_metadata,
nullptr);
}
+ parent_->parent_->active_plugin_options_view().ForEach(
+ [&](const InternalOpenTelemetryPluginOption& plugin_option,
+ size_t /*index*/) {
+ auto* labels_injector = plugin_option.labels_injector();
+ if (labels_injector != nullptr) {
+ labels_injector->AddLabels(send_initial_metadata, nullptr);
+ }
+ });
}
void OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer::RecordSendMessage(
@@ -178,11 +202,13 @@
std::array<std::pair<absl::string_view, absl::string_view>, 3>
additional_labels = {
{{OpenTelemetryMethodKey(), parent_->MethodForStats()},
- {OpenTelemetryTargetKey(), parent_->parent_->target()},
+ {OpenTelemetryTargetKey(), parent_->parent_->filtered_target()},
{OpenTelemetryStatusKey(),
grpc_status_code_to_string(
static_cast<grpc_status_code>(status.code()))}}};
- KeyValueIterable labels(injected_labels_.get(), additional_labels);
+ KeyValueIterable labels(injected_labels_.get(),
+ injected_labels_from_plugin_options_,
+ additional_labels);
if (OpenTelemetryPluginState().client.attempt.duration != nullptr) {
OpenTelemetryPluginState().client.attempt.duration->Record(
absl::ToDoubleSeconds(absl::Now() - start_time_), labels,
diff --git a/src/cpp/ext/otel/otel_client_filter.h b/src/cpp/ext/otel/otel_client_filter.h
index 9205da4..7c0ddcc 100644
--- a/src/cpp/ext/otel/otel_client_filter.h
+++ b/src/cpp/ext/otel/otel_client_filter.h
@@ -32,6 +32,7 @@
#include "src/core/lib/channel/promise_based_filter.h"
#include "src/core/lib/promise/arena_promise.h"
#include "src/core/lib/transport/transport.h"
+#include "src/cpp/ext/otel/otel_plugin.h"
namespace grpc {
namespace internal {
@@ -48,13 +49,17 @@
grpc_core::CallArgs call_args,
grpc_core::NextPromiseFactory next_promise_factory) override;
- absl::string_view target() const { return target_; }
+ absl::string_view filtered_target() const { return filtered_target_; }
+
+ const ActivePluginOptionsView& active_plugin_options_view() const {
+ return active_plugin_options_view_;
+ }
private:
- explicit OpenTelemetryClientFilter(std::string target)
- : target_(std::move(target)) {}
+ explicit OpenTelemetryClientFilter(std::string target);
- std::string target_;
+ std::string filtered_target_;
+ ActivePluginOptionsView active_plugin_options_view_;
};
} // namespace internal
diff --git a/src/cpp/ext/otel/otel_plugin.cc b/src/cpp/ext/otel/otel_plugin.cc
index 3a7c8bd..51b5061 100644
--- a/src/cpp/ext/otel/otel_plugin.cc
+++ b/src/cpp/ext/otel/otel_plugin.cc
@@ -89,6 +89,8 @@
OpenTelemetryPluginBuilderImpl::OpenTelemetryPluginBuilderImpl()
: metrics_(BaseMetrics()) {}
+OpenTelemetryPluginBuilderImpl::~OpenTelemetryPluginBuilderImpl() = default;
+
OpenTelemetryPluginBuilderImpl&
OpenTelemetryPluginBuilderImpl::SetMeterProvider(
std::shared_ptr<opentelemetry::metrics::MeterProvider> meter_provider) {
@@ -153,6 +155,14 @@
return *this;
}
+OpenTelemetryPluginBuilderImpl& OpenTelemetryPluginBuilderImpl::AddPluginOption(
+ std::unique_ptr<InternalOpenTelemetryPluginOption> option) {
+ // We allow a limit of 64 plugin options to be registered at this time.
+ GPR_ASSERT(plugin_options_.size() < 64);
+ plugin_options_.push_back(std::move(option));
+ return *this;
+}
+
void OpenTelemetryPluginBuilderImpl::BuildAndRegisterGlobal() {
opentelemetry::nostd::shared_ptr<opentelemetry::metrics::MeterProvider>
meter_provider = meter_provider_;
@@ -239,6 +249,7 @@
g_otel_plugin_state_->generic_method_attribute_filter =
std::move(generic_method_attribute_filter_);
g_otel_plugin_state_->meter_provider = std::move(meter_provider);
+ g_otel_plugin_state_->plugin_options = std::move(plugin_options_);
grpc_core::ServerCallTracerFactory::RegisterGlobal(
new grpc::internal::OpenTelemetryServerCallTracerFactory());
grpc_core::CoreConfiguration::RegisterBuilder(
@@ -287,6 +298,8 @@
OpenTelemetryPluginBuilder::OpenTelemetryPluginBuilder()
: impl_(std::make_unique<internal::OpenTelemetryPluginBuilderImpl>()) {}
+OpenTelemetryPluginBuilder::~OpenTelemetryPluginBuilder() = default;
+
OpenTelemetryPluginBuilder& OpenTelemetryPluginBuilder::SetMeterProvider(
std::shared_ptr<opentelemetry::metrics::MeterProvider> meter_provider) {
impl_->SetMeterProvider(std::move(meter_provider));
@@ -310,6 +323,15 @@
return *this;
}
+OpenTelemetryPluginBuilder& OpenTelemetryPluginBuilder::AddPluginOption(
+ std::unique_ptr<OpenTelemetryPluginOption> option) {
+ impl_->AddPluginOption(
+ std::unique_ptr<grpc::internal::InternalOpenTelemetryPluginOption>(
+ static_cast<grpc::internal::InternalOpenTelemetryPluginOption*>(
+ option.release())));
+ return *this;
+}
+
void OpenTelemetryPluginBuilder::BuildAndRegisterGlobal() {
impl_->BuildAndRegisterGlobal();
}
diff --git a/src/cpp/ext/otel/otel_plugin.h b/src/cpp/ext/otel/otel_plugin.h
index 3476850..0aa3177 100644
--- a/src/cpp/ext/otel/otel_plugin.h
+++ b/src/cpp/ext/otel/otel_plugin.h
@@ -24,6 +24,7 @@
#include <stddef.h>
#include <stdint.h>
+#include <bitset>
#include <memory>
#include <string>
#include <utility>
@@ -36,6 +37,8 @@
#include "opentelemetry/metrics/sync_instruments.h"
#include "opentelemetry/nostd/shared_ptr.h"
+#include <grpcpp/ext/otel_plugin.h>
+
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/transport/metadata_batch.h"
@@ -67,14 +70,27 @@
// Read the incoming initial metadata to get the set of labels to be added to
// metrics.
virtual std::unique_ptr<LabelsIterable> GetLabels(
- grpc_metadata_batch* incoming_initial_metadata) = 0;
+ grpc_metadata_batch* incoming_initial_metadata) const = 0;
// Modify the outgoing initial metadata with metadata information to be sent
// to the peer. On the server side, \a labels_from_incoming_metadata returned
// from `GetLabels` should be provided as input here. On the client side, this
// should be nullptr.
- virtual void AddLabels(grpc_metadata_batch* outgoing_initial_metadata,
- LabelsIterable* labels_from_incoming_metadata) = 0;
+ virtual void AddLabels(
+ grpc_metadata_batch* outgoing_initial_metadata,
+ LabelsIterable* labels_from_incoming_metadata) const = 0;
+};
+
+class InternalOpenTelemetryPluginOption
+ : public grpc::experimental::OpenTelemetryPluginOption {
+ public:
+ ~InternalOpenTelemetryPluginOption() override = default;
+ // Determines whether a plugin option is active on a given channel target
+ virtual bool IsActiveOnClientChannel(absl::string_view target) const = 0;
+ // Determines whether a plugin option is active on a given server
+ virtual bool IsActiveOnServer(const grpc_core::ChannelArgs& args) const = 0;
+ // Returns the LabelsInjector used by this plugin option, nullptr if none.
+ virtual const grpc::internal::LabelsInjector* labels_injector() const = 0;
};
struct OpenTelemetryPluginState {
@@ -107,6 +123,8 @@
generic_method_attribute_filter;
absl::AnyInvocable<bool(const grpc_core::ChannelArgs& /*args*/) const>
server_selector;
+ std::vector<std::unique_ptr<InternalOpenTelemetryPluginOption>>
+ plugin_options;
};
const struct OpenTelemetryPluginState& OpenTelemetryPluginState();
@@ -119,6 +137,7 @@
class OpenTelemetryPluginBuilderImpl {
public:
OpenTelemetryPluginBuilderImpl();
+ ~OpenTelemetryPluginBuilderImpl();
// If `SetMeterProvider()` is not called, no metrics are collected.
OpenTelemetryPluginBuilderImpl& SetMeterProvider(
std::shared_ptr<opentelemetry::metrics::MeterProvider> meter_provider);
@@ -166,6 +185,8 @@
OpenTelemetryPluginBuilderImpl& SetGenericMethodAttributeFilter(
absl::AnyInvocable<bool(absl::string_view /*generic_method*/) const>
generic_method_attribute_filter);
+ OpenTelemetryPluginBuilderImpl& AddPluginOption(
+ std::unique_ptr<InternalOpenTelemetryPluginOption> option);
void BuildAndRegisterGlobal();
private:
@@ -179,6 +200,52 @@
generic_method_attribute_filter_;
absl::AnyInvocable<bool(const grpc_core::ChannelArgs& /*args*/) const>
server_selector_;
+ std::vector<std::unique_ptr<InternalOpenTelemetryPluginOption>>
+ plugin_options_;
+};
+
+// Creates a convenience wrapper to help iterate over only those plugin options
+// that are active over a given channel/server.
+class ActivePluginOptionsView {
+ public:
+ static ActivePluginOptionsView MakeForClient(absl::string_view target) {
+ return ActivePluginOptionsView(
+ [target](const InternalOpenTelemetryPluginOption& plugin_option) {
+ return plugin_option.IsActiveOnClientChannel(target);
+ });
+ }
+
+ static ActivePluginOptionsView MakeForServer(
+ const grpc_core::ChannelArgs& args) {
+ return ActivePluginOptionsView(
+ [&args](const InternalOpenTelemetryPluginOption& plugin_option) {
+ return plugin_option.IsActiveOnServer(args);
+ });
+ }
+
+ void ForEach(
+ absl::FunctionRef<void(const InternalOpenTelemetryPluginOption&, size_t)>
+ func) const {
+ for (size_t i = 0; i < OpenTelemetryPluginState().plugin_options.size();
+ ++i) {
+ const auto& plugin_option = OpenTelemetryPluginState().plugin_options[i];
+ if (active_mask_[i]) func(*plugin_option, i);
+ }
+ }
+
+ private:
+ explicit ActivePluginOptionsView(
+ absl::FunctionRef<bool(const InternalOpenTelemetryPluginOption&)> func) {
+ for (size_t i = 0; i < OpenTelemetryPluginState().plugin_options.size();
+ ++i) {
+ const auto& plugin_option = OpenTelemetryPluginState().plugin_options[i];
+ if (plugin_option != nullptr && func(*plugin_option)) {
+ active_mask_.set(i);
+ }
+ }
+ }
+
+ std::bitset<64> active_mask_;
};
} // namespace internal
diff --git a/src/cpp/ext/otel/otel_server_call_tracer.cc b/src/cpp/ext/otel/otel_server_call_tracer.cc
index ab7f3a9..7fa40c6 100644
--- a/src/cpp/ext/otel/otel_server_call_tracer.cc
+++ b/src/cpp/ext/otel/otel_server_call_tracer.cc
@@ -56,7 +56,12 @@
class OpenTelemetryServerCallTracer : public grpc_core::ServerCallTracer {
public:
- OpenTelemetryServerCallTracer() : start_time_(absl::Now()) {}
+ explicit OpenTelemetryServerCallTracer(const grpc_core::ChannelArgs& args)
+ : start_time_(absl::Now()),
+ active_plugin_options_view_(
+ ActivePluginOptionsView::MakeForServer(args)),
+ injected_labels_from_plugin_options_(
+ OpenTelemetryPluginState().plugin_options.size()) {}
std::string TraceId() override {
// Not implemented
@@ -81,6 +86,16 @@
OpenTelemetryPluginState().labels_injector->AddLabels(
send_initial_metadata, injected_labels_.get());
}
+ active_plugin_options_view_.ForEach(
+ [&](const InternalOpenTelemetryPluginOption& plugin_option,
+ size_t index) {
+ auto* labels_injector = plugin_option.labels_injector();
+ if (labels_injector != nullptr) {
+ labels_injector->AddLabels(
+ send_initial_metadata,
+ injected_labels_from_plugin_options_[index].get());
+ }
+ });
}
void RecordSendTrailingMetadata(
@@ -148,6 +163,11 @@
grpc_core::Slice path_;
std::unique_ptr<LabelsIterable> injected_labels_;
bool registered_method_;
+ ActivePluginOptionsView active_plugin_options_view_;
+ // TODO(yashykt): It's wasteful to do this per call. When we re-haul the stats
+ // infrastructure, this should move to be done per server.
+ std::vector<std::unique_ptr<LabelsIterable>>
+ injected_labels_from_plugin_options_;
};
void OpenTelemetryServerCallTracer::RecordReceivedInitialMetadata(
@@ -158,6 +178,15 @@
injected_labels_ = OpenTelemetryPluginState().labels_injector->GetLabels(
recv_initial_metadata);
}
+ active_plugin_options_view_.ForEach(
+ [&](const InternalOpenTelemetryPluginOption& plugin_option,
+ size_t index) {
+ auto* labels_injector = plugin_option.labels_injector();
+ if (labels_injector != nullptr) {
+ injected_labels_from_plugin_options_[index] =
+ labels_injector->GetLabels(recv_initial_metadata);
+ }
+ });
registered_method_ =
recv_initial_metadata->get(grpc_core::GrpcRegisteredMethod())
.value_or(nullptr) != nullptr;
@@ -167,7 +196,7 @@
// We might not have all the injected labels that we want at this point, so
// avoid recording a subset of injected labels here.
OpenTelemetryPluginState().server.call.started->Add(
- 1, KeyValueIterable(/*injected_labels_iterable=*/nullptr,
+ 1, KeyValueIterable(/*injected_labels_iterable=*/nullptr, {},
additional_labels));
}
}
@@ -186,7 +215,9 @@
{{OpenTelemetryMethodKey(), MethodForStats()},
{OpenTelemetryStatusKey(),
grpc_status_code_to_string(final_info->final_status)}}};
- KeyValueIterable labels(injected_labels_.get(), additional_labels);
+ KeyValueIterable labels(injected_labels_.get(),
+ injected_labels_from_plugin_options_,
+ additional_labels);
if (OpenTelemetryPluginState().server.call.duration != nullptr) {
OpenTelemetryPluginState().server.call.duration->Record(
absl::ToDoubleSeconds(elapsed_time_), labels,
@@ -216,8 +247,8 @@
grpc_core::ServerCallTracer*
OpenTelemetryServerCallTracerFactory::CreateNewServerCallTracer(
- grpc_core::Arena* arena) {
- return arena->ManagedNew<OpenTelemetryServerCallTracer>();
+ grpc_core::Arena* arena, const grpc_core::ChannelArgs& args) {
+ return arena->ManagedNew<OpenTelemetryServerCallTracer>(args);
}
bool OpenTelemetryServerCallTracerFactory::IsServerTraced(
diff --git a/src/cpp/ext/otel/otel_server_call_tracer.h b/src/cpp/ext/otel/otel_server_call_tracer.h
index 2ce422e..c8cea43 100644
--- a/src/cpp/ext/otel/otel_server_call_tracer.h
+++ b/src/cpp/ext/otel/otel_server_call_tracer.h
@@ -32,7 +32,8 @@
: public grpc_core::ServerCallTracerFactory {
public:
grpc_core::ServerCallTracer* CreateNewServerCallTracer(
- grpc_core::Arena* arena) override;
+ grpc_core::Arena* arena,
+ const grpc_core::ChannelArgs& channel_args) override;
bool IsServerTraced(const grpc_core::ChannelArgs& args) override;
};
diff --git a/src/python/grpcio_observability/grpc_observability/server_call_tracer.cc b/src/python/grpcio_observability/grpc_observability/server_call_tracer.cc
index 722211a..6c1e68b 100644
--- a/src/python/grpcio_observability/grpc_observability/server_call_tracer.cc
+++ b/src/python/grpcio_observability/grpc_observability/server_call_tracer.cc
@@ -261,10 +261,11 @@
grpc_core::ServerCallTracer*
PythonOpenCensusServerCallTracerFactory::CreateNewServerCallTracer(
- grpc_core::Arena* arena) {
+ grpc_core::Arena* arena, const grpc_core::ChannelArgs& channel_args) {
// We don't use arena here to to ensure that memory is allocated and freed in
// the same DLL in Windows.
(void)arena;
+ (void)channel_args;
return new PythonOpenCensusServerCallTracer();
}
diff --git a/src/python/grpcio_observability/grpc_observability/server_call_tracer.h b/src/python/grpcio_observability/grpc_observability/server_call_tracer.h
index a4354b6..3792ae0 100644
--- a/src/python/grpcio_observability/grpc_observability/server_call_tracer.h
+++ b/src/python/grpcio_observability/grpc_observability/server_call_tracer.h
@@ -30,7 +30,8 @@
: public grpc_core::ServerCallTracerFactory {
public:
grpc_core::ServerCallTracer* CreateNewServerCallTracer(
- grpc_core::Arena* arena) override;
+ grpc_core::Arena* arena,
+ const grpc_core::ChannelArgs& channel_args) override;
};
inline absl::string_view GetMethod(const grpc_core::Slice& path) {
diff --git a/test/core/channel/server_call_tracer_factory_test.cc b/test/core/channel/server_call_tracer_factory_test.cc
index 09e7985..7786215 100644
--- a/test/core/channel/server_call_tracer_factory_test.cc
+++ b/test/core/channel/server_call_tracer_factory_test.cc
@@ -26,7 +26,8 @@
class TestServerCallTracerFactory : public ServerCallTracerFactory {
public:
- ServerCallTracer* CreateNewServerCallTracer(Arena* /*arena*/) override {
+ ServerCallTracer* CreateNewServerCallTracer(
+ Arena* /*arena*/, const ChannelArgs& /*args*/) override {
Crash("Not implemented");
}
};
diff --git a/test/core/end2end/tests/http2_stats.cc b/test/core/end2end/tests/http2_stats.cc
index 5986039..14f8dbe 100644
--- a/test/core/end2end/tests/http2_stats.cc
+++ b/test/core/end2end/tests/http2_stats.cc
@@ -200,7 +200,8 @@
class FakeServerCallTracerFactory : public ServerCallTracerFactory {
public:
- ServerCallTracer* CreateNewServerCallTracer(Arena* arena) override {
+ ServerCallTracer* CreateNewServerCallTracer(
+ Arena* arena, const ChannelArgs& /*args*/) override {
return arena->ManagedNew<FakeServerCallTracer>();
}
};
diff --git a/test/cpp/ext/csm/BUILD b/test/cpp/ext/csm/BUILD
index ed51117..3d505e0 100644
--- a/test/cpp/ext/csm/BUILD
+++ b/test/cpp/ext/csm/BUILD
@@ -36,6 +36,8 @@
],
deps = [
"//:grpc++",
+ "//:grpcpp_csm_observability",
+ "//:grpcpp_otel_plugin",
"//src/cpp/ext/csm:csm_observability",
"//src/cpp/ext/otel:otel_plugin",
"//test/core/util:grpc_test_util",
diff --git a/test/cpp/ext/csm/csm_observability_test.cc b/test/cpp/ext/csm/csm_observability_test.cc
index 54e3e5c..c6825c8 100644
--- a/test/cpp/ext/csm/csm_observability_test.cc
+++ b/test/cpp/ext/csm/csm_observability_test.cc
@@ -22,6 +22,7 @@
#include "gtest/gtest.h"
#include <grpcpp/ext/csm_observability.h>
+#include <grpcpp/ext/otel_plugin.h>
#include "src/core/lib/gprpp/env.h"
#include "test/core/util/test_config.h"
@@ -62,6 +63,12 @@
"xds://traffic-director-global.xds.googleapis.com/foo"));
}
+TEST(CsmPluginOptionTest, Basic) {
+ experimental::OpenTelemetryPluginBuilder()
+ .AddPluginOption(experimental::MakeCsmOpenTelemetryPluginOption())
+ .BuildAndRegisterGlobal();
+}
+
} // namespace
} // namespace testing
} // namespace grpc
diff --git a/test/cpp/ext/otel/otel_plugin_test.cc b/test/cpp/ext/otel/otel_plugin_test.cc
index e763cd7..994ed6c 100644
--- a/test/cpp/ext/otel/otel_plugin_test.cc
+++ b/test/cpp/ext/otel/otel_plugin_test.cc
@@ -653,6 +653,282 @@
EXPECT_EQ(*status_value, "UNIMPLEMENTED");
}
+using OpenTelemetryPluginOptionEnd2EndTest = OpenTelemetryPluginEnd2EndTest;
+
+class SimpleLabelIterable : public grpc::internal::LabelsIterable {
+ public:
+ explicit SimpleLabelIterable(
+ std::pair<absl::string_view, absl::string_view> label)
+ : label_(label) {}
+
+ absl::optional<std::pair<absl::string_view, absl::string_view>> Next()
+ override {
+ if (iterated_) {
+ return absl::nullopt;
+ }
+ iterated_ = true;
+ return label_;
+ }
+
+ size_t Size() const override { return 1; }
+
+ void ResetIteratorPosition() override { iterated_ = false; }
+
+ private:
+ bool iterated_ = false;
+ std::pair<absl::string_view, absl::string_view> label_;
+};
+
+class CustomLabelInjector : public grpc::internal::LabelsInjector {
+ public:
+ explicit CustomLabelInjector(std::pair<std::string, std::string> label)
+ : label_(std::move(label)) {}
+ ~CustomLabelInjector() override {}
+
+ std::unique_ptr<grpc::internal::LabelsIterable> GetLabels(
+ grpc_metadata_batch* /*incoming_initial_metadata*/) const override {
+ return std::make_unique<SimpleLabelIterable>(label_);
+ }
+
+ void AddLabels(
+ grpc_metadata_batch* /*outgoing_initial_metadata*/,
+ grpc::internal::LabelsIterable* /*labels_from_incoming_metadata*/)
+ const override {}
+
+ private:
+ std::pair<std::string, std::string> label_;
+};
+
+class CustomPluginOption
+ : public grpc::internal::InternalOpenTelemetryPluginOption {
+ public:
+ CustomPluginOption(bool enabled_on_client, bool enabled_on_server,
+ std::pair<std::string, std::string> label)
+ : enabled_on_client_(enabled_on_client),
+ enabled_on_server_(enabled_on_server),
+ label_injector_(
+ std::make_unique<CustomLabelInjector>(std::move(label))) {}
+
+ ~CustomPluginOption() override {}
+
+ bool IsActiveOnClientChannel(absl::string_view /*target*/) const override {
+ return enabled_on_client_;
+ }
+
+ bool IsActiveOnServer(const grpc_core::ChannelArgs& /*args*/) const override {
+ return enabled_on_server_;
+ }
+
+ const grpc::internal::LabelsInjector* labels_injector() const override {
+ return label_injector_.get();
+ }
+
+ private:
+ bool enabled_on_client_;
+ bool enabled_on_server_;
+ std::unique_ptr<CustomLabelInjector> label_injector_;
+};
+
+TEST_F(OpenTelemetryPluginOptionEnd2EndTest, Basic) {
+ std::vector<
+ std::unique_ptr<grpc::internal::InternalOpenTelemetryPluginOption>>
+ plugin_option_list;
+ plugin_option_list.emplace_back(std::make_unique<CustomPluginOption>(
+ /*enabled_on_client*/ true, /*enabled_on_server*/ true,
+ std::make_pair("key", "value")));
+ Init({grpc::experimental::OpenTelemetryPluginBuilder::
+ kClientAttemptDurationInstrumentName,
+ grpc::experimental::OpenTelemetryPluginBuilder::
+ kServerCallDurationInstrumentName},
+ /*resource=*/opentelemetry::sdk::resource::Resource::Create({}),
+ /*labels_injector=*/nullptr,
+ /*test_no_meter_provider=*/false,
+ /*target_selector=*/absl::AnyInvocable<bool(absl::string_view) const>(),
+ /*target_attribute_filter=*/
+ absl::AnyInvocable<bool(absl::string_view) const>(),
+ /*generic_method_attribute_filter=*/
+ absl::AnyInvocable<bool(absl::string_view /*generic_method*/) const>(),
+ /*plugin_options=*/std::move(plugin_option_list));
+ SendRPC();
+ auto data = ReadCurrentMetricsData(
+ [&](const absl::flat_hash_map<
+ std::string,
+ std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>&
+ data) {
+ return !data.contains("grpc.client.attempt.duration") ||
+ !data.contains("grpc.server.call.duration");
+ });
+ // Verify client side metric
+ ASSERT_EQ(data["grpc.client.attempt.duration"].size(), 1);
+ const auto& client_attributes =
+ data["grpc.client.attempt.duration"][0].attributes.GetAttributes();
+ EXPECT_EQ(client_attributes.size(), 4);
+ EXPECT_EQ(absl::get<std::string>(client_attributes.at("key")), "value");
+ // Verify server side metric
+ ASSERT_EQ(data["grpc.server.call.duration"].size(), 1);
+ const auto& server_attributes =
+ data["grpc.server.call.duration"][0].attributes.GetAttributes();
+ EXPECT_EQ(server_attributes.size(), 3);
+ EXPECT_EQ(absl::get<std::string>(server_attributes.at("key")), "value");
+}
+
+TEST_F(OpenTelemetryPluginOptionEnd2EndTest, ClientOnlyPluginOption) {
+ std::vector<
+ std::unique_ptr<grpc::internal::InternalOpenTelemetryPluginOption>>
+ plugin_option_list;
+ plugin_option_list.emplace_back(std::make_unique<CustomPluginOption>(
+ /*enabled_on_client*/ true, /*enabled_on_server*/ false,
+ std::make_pair("key", "value")));
+ Init({grpc::experimental::OpenTelemetryPluginBuilder::
+ kClientAttemptDurationInstrumentName,
+ grpc::experimental::OpenTelemetryPluginBuilder::
+ kServerCallDurationInstrumentName},
+ /*resource=*/opentelemetry::sdk::resource::Resource::Create({}),
+ /*labels_injector=*/nullptr,
+ /*test_no_meter_provider=*/false,
+ /*target_selector=*/absl::AnyInvocable<bool(absl::string_view) const>(),
+ /*target_attribute_filter=*/
+ absl::AnyInvocable<bool(absl::string_view) const>(),
+ /*generic_method_attribute_filter=*/
+ absl::AnyInvocable<bool(absl::string_view /*generic_method*/) const>(),
+ /*plugin_options=*/std::move(plugin_option_list));
+ SendRPC();
+ auto data = ReadCurrentMetricsData(
+ [&](const absl::flat_hash_map<
+ std::string,
+ std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>&
+ data) {
+ return !data.contains("grpc.client.attempt.duration") ||
+ !data.contains("grpc.server.call.duration");
+ });
+ // Verify client side metric
+ ASSERT_EQ(data["grpc.client.attempt.duration"].size(), 1);
+ const auto& client_attributes =
+ data["grpc.client.attempt.duration"][0].attributes.GetAttributes();
+ EXPECT_EQ(client_attributes.size(), 4);
+ EXPECT_EQ(absl::get<std::string>(client_attributes.at("key")), "value");
+ // Verify server side metric
+ ASSERT_EQ(data["grpc.server.call.duration"].size(), 1);
+ const auto& server_attributes =
+ data["grpc.server.call.duration"][0].attributes.GetAttributes();
+ EXPECT_EQ(server_attributes.size(), 2);
+ EXPECT_THAT(server_attributes,
+ ::testing::Not(::testing::Contains(::testing::Key("key"))));
+}
+
+TEST_F(OpenTelemetryPluginOptionEnd2EndTest, ServerOnlyPluginOption) {
+ std::vector<
+ std::unique_ptr<grpc::internal::InternalOpenTelemetryPluginOption>>
+ plugin_option_list;
+ plugin_option_list.emplace_back(std::make_unique<CustomPluginOption>(
+ /*enabled_on_client*/ false, /*enabled_on_server*/ true,
+ std::make_pair("key", "value")));
+ Init({grpc::experimental::OpenTelemetryPluginBuilder::
+ kClientAttemptDurationInstrumentName,
+ grpc::experimental::OpenTelemetryPluginBuilder::
+ kServerCallDurationInstrumentName},
+ /*resource=*/opentelemetry::sdk::resource::Resource::Create({}),
+ /*labels_injector=*/nullptr,
+ /*test_no_meter_provider=*/false,
+ /*target_selector=*/absl::AnyInvocable<bool(absl::string_view) const>(),
+ /*target_attribute_filter=*/
+ absl::AnyInvocable<bool(absl::string_view) const>(),
+ /*generic_method_attribute_filter=*/
+ absl::AnyInvocable<bool(absl::string_view /*generic_method*/) const>(),
+ /*plugin_options=*/std::move(plugin_option_list));
+ SendRPC();
+ auto data = ReadCurrentMetricsData(
+ [&](const absl::flat_hash_map<
+ std::string,
+ std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>&
+ data) {
+ return !data.contains("grpc.client.attempt.duration") ||
+ !data.contains("grpc.server.call.duration");
+ });
+ // Verify client side metric
+ ASSERT_EQ(data["grpc.client.attempt.duration"].size(), 1);
+ const auto& attributes =
+ data["grpc.client.attempt.duration"][0].attributes.GetAttributes();
+ EXPECT_EQ(attributes.size(), 3);
+ EXPECT_THAT(attributes,
+ ::testing::Not(::testing::Contains(::testing::Key("key"))));
+ // Verify server side metric
+ ASSERT_EQ(data["grpc.server.call.duration"].size(), 1);
+ const auto& server_attributes =
+ data["grpc.server.call.duration"][0].attributes.GetAttributes();
+ EXPECT_EQ(server_attributes.size(), 3);
+ EXPECT_EQ(absl::get<std::string>(server_attributes.at("key")), "value");
+}
+
+TEST_F(OpenTelemetryPluginOptionEnd2EndTest,
+ MultipleEnabledAndDisabledPluginOptions) {
+ std::vector<
+ std::unique_ptr<grpc::internal::InternalOpenTelemetryPluginOption>>
+ plugin_option_list;
+ plugin_option_list.reserve(5);
+ plugin_option_list.emplace_back(std::make_unique<CustomPluginOption>(
+ /*enabled_on_client*/ true, /*enabled_on_server*/ true,
+ std::make_pair("key1", "value1")));
+ plugin_option_list.emplace_back(std::make_unique<CustomPluginOption>(
+ /*enabled_on_client*/ true, /*enabled_on_server*/ false,
+ std::make_pair("key2", "value2")));
+ plugin_option_list.emplace_back(std::make_unique<CustomPluginOption>(
+ /*enabled_on_client*/ true, /*enabled_on_server*/ false,
+ std::make_pair("key3", "value3")));
+ plugin_option_list.emplace_back(std::make_unique<CustomPluginOption>(
+ /*enabled_on_client*/ false, /*enabled_on_server*/ true,
+ std::make_pair("key4", "value4")));
+ plugin_option_list.emplace_back(std::make_unique<CustomPluginOption>(
+ /*enabled_on_client*/ false, /*enabled_on_server*/ true,
+ std::make_pair("key5", "value5")));
+ Init({grpc::experimental::OpenTelemetryPluginBuilder::
+ kClientAttemptDurationInstrumentName,
+ grpc::experimental::OpenTelemetryPluginBuilder::
+ kServerCallDurationInstrumentName},
+ /*resource=*/opentelemetry::sdk::resource::Resource::Create({}),
+ /*labels_injector=*/nullptr,
+ /*test_no_meter_provider=*/false,
+ /*target_selector=*/absl::AnyInvocable<bool(absl::string_view) const>(),
+ /*target_attribute_filter=*/
+ absl::AnyInvocable<bool(absl::string_view) const>(),
+ /*generic_method_attribute_filter=*/
+ absl::AnyInvocable<bool(absl::string_view /*generic_method*/) const>(),
+ /*plugin_options=*/std::move(plugin_option_list));
+ SendRPC();
+ auto data = ReadCurrentMetricsData(
+ [&](const absl::flat_hash_map<
+ std::string,
+ std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>&
+ data) {
+ return !data.contains("grpc.client.attempt.duration") ||
+ !data.contains("grpc.server.call.duration");
+ });
+ // Verify client side metric
+ ASSERT_EQ(data["grpc.client.attempt.duration"].size(), 1);
+ const auto& client_attributes =
+ data["grpc.client.attempt.duration"][0].attributes.GetAttributes();
+ EXPECT_EQ(client_attributes.size(), 6);
+ EXPECT_EQ(absl::get<std::string>(client_attributes.at("key1")), "value1");
+ EXPECT_EQ(absl::get<std::string>(client_attributes.at("key2")), "value2");
+ EXPECT_EQ(absl::get<std::string>(client_attributes.at("key3")), "value3");
+ EXPECT_THAT(client_attributes,
+ ::testing::Not(::testing::Contains(::testing::Key("key4"))));
+ EXPECT_THAT(client_attributes,
+ ::testing::Not(::testing::Contains(::testing::Key("key5"))));
+ // Verify server side metric
+ ASSERT_EQ(data["grpc.server.call.duration"].size(), 1);
+ const auto& server_attributes =
+ data["grpc.server.call.duration"][0].attributes.GetAttributes();
+ EXPECT_EQ(server_attributes.size(), 5);
+ EXPECT_EQ(absl::get<std::string>(server_attributes.at("key1")), "value1");
+ EXPECT_THAT(server_attributes,
+ ::testing::Not(::testing::Contains(::testing::Key("key2"))));
+ EXPECT_THAT(server_attributes,
+ ::testing::Not(::testing::Contains(::testing::Key("key3"))));
+ EXPECT_EQ(absl::get<std::string>(server_attributes.at("key4")), "value4");
+ EXPECT_EQ(absl::get<std::string>(server_attributes.at("key5")), "value5");
+}
+
} // namespace
} // namespace testing
} // namespace grpc
diff --git a/test/cpp/ext/otel/otel_test_library.cc b/test/cpp/ext/otel/otel_test_library.cc
index bf53001..53aa350 100644
--- a/test/cpp/ext/otel/otel_test_library.cc
+++ b/test/cpp/ext/otel/otel_test_library.cc
@@ -48,7 +48,10 @@
absl::AnyInvocable<bool(absl::string_view /*target*/) const>
target_attribute_filter,
absl::AnyInvocable<bool(absl::string_view /*generic_method*/) const>
- generic_method_attribute_filter) {
+ generic_method_attribute_filter,
+ std::vector<
+ std::unique_ptr<grpc::internal::InternalOpenTelemetryPluginOption>>
+ plugin_options) {
// We are resetting the MeterProvider and OpenTelemetry plugin at the start
// of each test to avoid test results from one test carrying over to another
// test. (Some measurements can get arbitrarily delayed.)
@@ -76,6 +79,9 @@
ot_builder.SetTargetAttributeFilter(std::move(target_attribute_filter));
ot_builder.SetGenericMethodAttributeFilter(
std::move(generic_method_attribute_filter));
+ for (auto& option : plugin_options) {
+ ot_builder.AddPluginOption(std::move(option));
+ }
ot_builder.BuildAndRegisterGlobal();
grpc_init();
grpc::ServerBuilder builder;
diff --git a/test/cpp/ext/otel/otel_test_library.h b/test/cpp/ext/otel/otel_test_library.h
index 1076d29..629d8c1 100644
--- a/test/cpp/ext/otel/otel_test_library.h
+++ b/test/cpp/ext/otel/otel_test_library.h
@@ -70,7 +70,10 @@
absl::AnyInvocable<bool(absl::string_view) const>(),
absl::AnyInvocable<bool(absl::string_view /*generic_method*/) const>
generic_method_attribute_filter = absl::AnyInvocable<
- bool(absl::string_view /*generic_method*/) const>());
+ bool(absl::string_view /*generic_method*/) const>(),
+ std::vector<
+ std::unique_ptr<grpc::internal::InternalOpenTelemetryPluginOption>>
+ plugin_options = {});
void TearDown() override;
diff --git a/test/cpp/interop/observability_client.cc b/test/cpp/interop/observability_client.cc
index a815f09..259b47d 100644
--- a/test/cpp/interop/observability_client.cc
+++ b/test/cpp/interop/observability_client.cc
@@ -30,10 +30,10 @@
#include <grpcpp/channel.h>
#include <grpcpp/client_context.h>
#include <grpcpp/ext/gcp_observability.h>
+#include <grpcpp/ext/otel_plugin.h>
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/crash.h"
-#include "src/cpp/ext/otel/otel_plugin.h"
#include "test/core/util/test_config.h"
#include "test/cpp/interop/client_helper.h"
#include "test/cpp/interop/interop_client.h"
@@ -236,7 +236,7 @@
auto meter_provider =
std::make_shared<opentelemetry::sdk::metrics::MeterProvider>();
meter_provider->AddMetricReader(std::move(prometheus_exporter));
- grpc::internal::OpenTelemetryPluginBuilderImpl otel_builder;
+ grpc::experimental::OpenTelemetryPluginBuilder otel_builder;
otel_builder.SetMeterProvider(std::move(meter_provider));
otel_builder.BuildAndRegisterGlobal();
}