[export] Automated rollback of changelist 596804297.

FUTURE_COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/35434 from yashykt:OTelPluginOption 1db870bed4ba55cad5515c3586200fa17f5ce226

----
DO NOT SUBMIT. This PR is for testing purposes only. [cl/596967071](http://cl/596967071) [cl/596966190](http://cl/596966190) [cl/596966091](http://cl/596966091) [cl/596958525](http://cl/596958525) [cl/596804297](http://cl/596804297) [cl/596795876](http://cl/596795876)

PiperOrigin-RevId: 596967071
diff --git a/include/grpcpp/ext/csm_observability.h b/include/grpcpp/ext/csm_observability.h
index 1212ddf..58a9e96 100644
--- a/include/grpcpp/ext/csm_observability.h
+++ b/include/grpcpp/ext/csm_observability.h
@@ -28,6 +28,8 @@
 #include "absl/strings/string_view.h"
 #include "opentelemetry/sdk/metrics/meter_provider.h"
 
+#include <grpcpp/ext/otel_plugin.h>
+
 namespace grpc {
 
 namespace internal {
@@ -88,6 +90,18 @@
   std::unique_ptr<grpc::internal::OpenTelemetryPluginBuilderImpl> builder_;
 };
 
+class OpenTelemetryPluginOption;
+
+/// Creates an OpenTelemetryPluginOption that would add additional labels on
+/// gRPC metrics to enhance observability for CSM users.
+///
+/// Sample Usage -
+/// OpenTelemetryPluginBuilder()
+///     .SetMeterProvider(provider)
+///     .AddPluginOption(MakeCsmOpenTelemetryPluginOption())
+///     .BuildAndRegisterGlobal();
+std::unique_ptr<OpenTelemetryPluginOption> MakeCsmOpenTelemetryPluginOption();
+
 }  // namespace experimental
 }  // namespace grpc
 
diff --git a/include/grpcpp/ext/otel_plugin.h b/include/grpcpp/ext/otel_plugin.h
index bdf681e..acb3532 100644
--- a/include/grpcpp/ext/otel_plugin.h
+++ b/include/grpcpp/ext/otel_plugin.h
@@ -38,6 +38,11 @@
 
 namespace experimental {
 
+class OpenTelemetryPluginOption {
+ public:
+  virtual ~OpenTelemetryPluginOption() = default;
+};
+
 /// The most common way to use this API is -
 ///
 /// OpenTelemetryPluginBuilder().SetMeterProvider(provider).BuildAndRegister();
@@ -76,6 +81,7 @@
           "grpc.server.call.rcvd_total_compressed_message_size";
 
   OpenTelemetryPluginBuilder();
+  ~OpenTelemetryPluginBuilder();
   /// If `SetMeterProvider()` is not called, no metrics are collected.
   OpenTelemetryPluginBuilder& SetMeterProvider(
       std::shared_ptr<opentelemetry::metrics::MeterProvider> meter_provider);
@@ -95,6 +101,12 @@
   OpenTelemetryPluginBuilder& SetGenericMethodAttributeFilter(
       absl::AnyInvocable<bool(absl::string_view /*generic_method*/) const>
           generic_method_attribute_filter);
+  /// Add a plugin option to add to the opentelemetry plugin being built. At
+  /// present, this type is an opaque type. Ownership of \a option is
+  /// transferred when `AddPluginOption` is invoked. A maximum of 64 plugin
+  /// options can be added.
+  OpenTelemetryPluginBuilder& AddPluginOption(
+      std::unique_ptr<OpenTelemetryPluginOption> option);
   /// Registers a global plugin that acts on all channels and servers running on
   /// the process.
   void BuildAndRegisterGlobal();
diff --git a/src/core/BUILD b/src/core/BUILD
index 8ee2f30..776a390 100644
--- a/src/core/BUILD
+++ b/src/core/BUILD
@@ -6237,6 +6237,7 @@
     external_deps = [
         "absl/status",
         "absl/status:statusor",
+        "absl/strings",
     ],
     deps = [
         "bitset",
diff --git a/src/core/ext/transport/chaotic_good/frame_header.cc b/src/core/ext/transport/chaotic_good/frame_header.cc
index 06f9d14..6b1ec01 100644
--- a/src/core/ext/transport/chaotic_good/frame_header.cc
+++ b/src/core/ext/transport/chaotic_good/frame_header.cc
@@ -19,6 +19,7 @@
 #include <cstdint>
 
 #include "absl/status/status.h"
+#include "absl/strings/str_cat.h"
 
 #include <grpc/support/log.h>
 
diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.cc b/src/core/ext/transport/chttp2/server/chttp2_server.cc
index e5a6282..dea2844 100644
--- a/src/core/ext/transport/chttp2/server/chttp2_server.cc
+++ b/src/core/ext/transport/chttp2/server/chttp2_server.cc
@@ -617,8 +617,17 @@
   RefCountedPtr<HandshakingState> handshaking_state_ref;
   listener_ = std::move(listener);
   {
-    MutexLock lock(&mu_);
-    if (shutdown_) return;
+    ReleasableMutexLock lock(&mu_);
+    if (shutdown_) {
+      lock.Release();
+      // If the Connection is already shutdown at this point, it implies the
+      // owning Chttp2ServerListener and all associated ActiveConnections have
+      // been orphaned. The generated endpoints need to be shutdown here to
+      // ensure the tcp connections are closed appropriately.
+      grpc_endpoint_shutdown(endpoint, absl::OkStatus());
+      grpc_endpoint_destroy(endpoint);
+      return;
+    }
     // Hold a ref to HandshakingState to allow starting the handshake outside
     // the critical region.
     handshaking_state_ref = handshaking_state_->Ref();
diff --git a/src/core/lib/channel/call_tracer.h b/src/core/lib/channel/call_tracer.h
index 78fd7fb..3f2b10e 100644
--- a/src/core/lib/channel/call_tracer.h
+++ b/src/core/lib/channel/call_tracer.h
@@ -175,7 +175,8 @@
 
   virtual ~ServerCallTracerFactory() {}
 
-  virtual ServerCallTracer* CreateNewServerCallTracer(Arena* arena) = 0;
+  virtual ServerCallTracer* CreateNewServerCallTracer(
+      Arena* arena, const ChannelArgs& channel_args) = 0;
 
   // Returns true if a server is to be traced, false otherwise.
   virtual bool IsServerTraced(const ChannelArgs& /*args*/) { return true; }
diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc
index 2702bed..d6d6a85 100644
--- a/src/core/lib/surface/call.cc
+++ b/src/core/lib/surface/call.cc
@@ -859,7 +859,7 @@
         args->server->server_call_tracer_factory() != nullptr) {
       auto* server_call_tracer =
           args->server->server_call_tracer_factory()->CreateNewServerCallTracer(
-              arena);
+              arena, args->server->channel_args());
       if (server_call_tracer != nullptr) {
         // Note that we are setting both
         // GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE and
@@ -3426,7 +3426,7 @@
   if (args->server->server_call_tracer_factory() != nullptr) {
     auto* server_call_tracer =
         args->server->server_call_tracer_factory()->CreateNewServerCallTracer(
-            arena);
+            arena, args->server->channel_args());
     if (server_call_tracer != nullptr) {
       // Note that we are setting both
       // GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE and
diff --git a/src/cpp/ext/csm/csm_observability.cc b/src/cpp/ext/csm/csm_observability.cc
index 625120a..de55e1e 100644
--- a/src/cpp/ext/csm/csm_observability.cc
+++ b/src/cpp/ext/csm/csm_observability.cc
@@ -42,6 +42,58 @@
 #include "src/cpp/ext/otel/otel_plugin.h"
 
 namespace grpc {
+
+namespace internal {
+
+bool CsmServerSelector(const grpc_core::ChannelArgs& args) {
+  return args.GetBool(GRPC_ARG_XDS_ENABLED_SERVER).value_or(false);
+}
+
+bool CsmChannelTargetSelector(absl::string_view target) {
+  auto uri = grpc_core::URI::Parse(target);
+  if (!uri.ok()) {
+    gpr_log(GPR_ERROR, "Failed to parse URI: %s", std::string(target).c_str());
+    return false;
+  }
+  // CSM channels should have an "xds" scheme
+  if (uri->scheme() != "xds") {
+    return false;
+  }
+  // If set, the authority should be TD
+  if (!uri->authority().empty() &&
+      uri->authority() != "traffic-director-global.xds.googleapis.com") {
+    return false;
+  }
+  return true;
+}
+
+class CsmOpenTelemetryPluginOption
+    : public grpc::internal::InternalOpenTelemetryPluginOption {
+ public:
+  CsmOpenTelemetryPluginOption()
+      : labels_injector_(std::make_unique<internal::ServiceMeshLabelsInjector>(
+            google::cloud::otel::MakeResourceDetector()
+                ->Detect()
+                .GetAttributes())) {}
+
+  bool IsActiveOnClientChannel(absl::string_view target) const override {
+    return CsmChannelTargetSelector(target);
+  }
+
+  bool IsActiveOnServer(const grpc_core::ChannelArgs& args) const override {
+    return CsmServerSelector(args);
+  }
+
+  const grpc::internal::LabelsInjector* labels_injector() const override {
+    return labels_injector_.get();
+  }
+
+ private:
+  std::unique_ptr<internal::ServiceMeshLabelsInjector> labels_injector_;
+};
+
+}  // namespace internal
+
 namespace experimental {
 
 //
@@ -78,9 +130,7 @@
 }
 
 absl::StatusOr<CsmObservability> CsmObservabilityBuilder::BuildAndRegister() {
-  builder_->SetServerSelector([](const grpc_core::ChannelArgs& args) {
-    return args.GetBool(GRPC_ARG_XDS_ENABLED_SERVER).value_or(false);
-  });
+  builder_->SetServerSelector(internal::CsmServerSelector);
   builder_->SetTargetSelector(internal::CsmChannelTargetSelector);
   builder_->SetLabelsInjector(
       std::make_unique<internal::ServiceMeshLabelsInjector>(
@@ -91,27 +141,10 @@
   return CsmObservability();
 }
 
-}  // namespace experimental
-
-namespace internal {
-
-bool CsmChannelTargetSelector(absl::string_view target) {
-  auto uri = grpc_core::URI::Parse(target);
-  if (!uri.ok()) {
-    gpr_log(GPR_ERROR, "Failed to parse URI: %s", std::string(target).c_str());
-    return false;
-  }
-  // CSM channels should have an "xds" scheme
-  if (uri->scheme() != "xds") {
-    return false;
-  }
-  // If set, the authority should be TD
-  if (!uri->authority().empty() &&
-      uri->authority() != "traffic-director-global.xds.googleapis.com") {
-    return false;
-  }
-  return true;
+std::unique_ptr<OpenTelemetryPluginOption> MakeCsmOpenTelemetryPluginOption() {
+  return std::make_unique<grpc::internal::CsmOpenTelemetryPluginOption>();
 }
 
-}  // namespace internal
+}  // namespace experimental
+
 }  // namespace grpc
diff --git a/src/cpp/ext/csm/metadata_exchange.cc b/src/cpp/ext/csm/metadata_exchange.cc
index c1beadd..e0a3482 100644
--- a/src/cpp/ext/csm/metadata_exchange.cc
+++ b/src/cpp/ext/csm/metadata_exchange.cc
@@ -404,7 +404,7 @@
 }
 
 std::unique_ptr<LabelsIterable> ServiceMeshLabelsInjector::GetLabels(
-    grpc_metadata_batch* incoming_initial_metadata) {
+    grpc_metadata_batch* incoming_initial_metadata) const {
   auto peer_metadata =
       incoming_initial_metadata->Take(grpc_core::XEnvoyPeerMetadata());
   return std::make_unique<MeshLabelsIterable>(
@@ -414,7 +414,7 @@
 
 void ServiceMeshLabelsInjector::AddLabels(
     grpc_metadata_batch* outgoing_initial_metadata,
-    LabelsIterable* labels_from_incoming_metadata) {
+    LabelsIterable* labels_from_incoming_metadata) const {
   // On the server, if the labels from incoming metadata did not have a
   // non-empty base64 encoded "x-envoy-peer-metadata", do not perform metadata
   // exchange.
diff --git a/src/cpp/ext/csm/metadata_exchange.h b/src/cpp/ext/csm/metadata_exchange.h
index 9205b39..c2c24ff 100644
--- a/src/cpp/ext/csm/metadata_exchange.h
+++ b/src/cpp/ext/csm/metadata_exchange.h
@@ -43,12 +43,12 @@
   // Read the incoming initial metadata to get the set of labels to be added to
   // metrics.
   std::unique_ptr<LabelsIterable> GetLabels(
-      grpc_metadata_batch* incoming_initial_metadata) override;
+      grpc_metadata_batch* incoming_initial_metadata) const override;
 
   // Modify the outgoing initial metadata with metadata information to be sent
   // to the peer.
   void AddLabels(grpc_metadata_batch* outgoing_initial_metadata,
-                 LabelsIterable* labels_from_incoming_metadata) override;
+                 LabelsIterable* labels_from_incoming_metadata) const override;
 
  private:
   std::vector<std::pair<absl::string_view, std::string>> local_labels_;
diff --git a/src/cpp/ext/filters/census/server_call_tracer.cc b/src/cpp/ext/filters/census/server_call_tracer.cc
index 20971ab..33534b9 100644
--- a/src/cpp/ext/filters/census/server_call_tracer.cc
+++ b/src/cpp/ext/filters/census/server_call_tracer.cc
@@ -265,7 +265,7 @@
 
 grpc_core::ServerCallTracer*
 OpenCensusServerCallTracerFactory::CreateNewServerCallTracer(
-    grpc_core::Arena* arena) {
+    grpc_core::Arena* arena, const grpc_core::ChannelArgs& /*args*/) {
   return arena->ManagedNew<OpenCensusServerCallTracer>();
 }
 
diff --git a/src/cpp/ext/filters/census/server_call_tracer.h b/src/cpp/ext/filters/census/server_call_tracer.h
index 7eb95c6..4a11b4e 100644
--- a/src/cpp/ext/filters/census/server_call_tracer.h
+++ b/src/cpp/ext/filters/census/server_call_tracer.h
@@ -31,7 +31,7 @@
     : public grpc_core::ServerCallTracerFactory {
  public:
   grpc_core::ServerCallTracer* CreateNewServerCallTracer(
-      grpc_core::Arena* arena) override;
+      grpc_core::Arena* arena, const grpc_core::ChannelArgs& /*args*/) override;
 };
 
 }  // namespace internal
diff --git a/src/cpp/ext/otel/key_value_iterable.h b/src/cpp/ext/otel/key_value_iterable.h
index 9395326..89d0e6a 100644
--- a/src/cpp/ext/otel/key_value_iterable.h
+++ b/src/cpp/ext/otel/key_value_iterable.h
@@ -50,9 +50,13 @@
  public:
   explicit KeyValueIterable(
       LabelsIterable* injected_labels_iterable,
+      const std::vector<std::unique_ptr<LabelsIterable>>&
+          injected_labels_from_plugin_options,
       absl::Span<const std::pair<absl::string_view, absl::string_view>>
           additional_labels)
       : injected_labels_iterable_(injected_labels_iterable),
+        injected_labels_from_plugin_options_(
+            injected_labels_from_plugin_options),
         additional_labels_(additional_labels) {}
 
   bool ForEachKeyValue(opentelemetry::nostd::function_ref<
@@ -68,6 +72,18 @@
         }
       }
     }
+    for (const auto& plugin_option_injected_iterable :
+         injected_labels_from_plugin_options_) {
+      if (plugin_option_injected_iterable != nullptr) {
+        plugin_option_injected_iterable->ResetIteratorPosition();
+        while (const auto& pair = plugin_option_injected_iterable->Next()) {
+          if (!callback(AbslStrViewToOpenTelemetryStrView(pair->first),
+                        AbslStrViewToOpenTelemetryStrView(pair->second))) {
+            return false;
+          }
+        }
+      }
+    }
     for (const auto& pair : additional_labels_) {
       if (!callback(AbslStrViewToOpenTelemetryStrView(pair.first),
                     AbslStrViewToOpenTelemetryStrView(pair.second))) {
@@ -78,14 +94,23 @@
   }
 
   size_t size() const noexcept override {
-    return (injected_labels_iterable_ != nullptr
-                ? injected_labels_iterable_->Size()
-                : 0) +
-           additional_labels_.size();
+    size_t size = injected_labels_iterable_ != nullptr
+                      ? injected_labels_iterable_->Size()
+                      : 0;
+    for (const auto& plugin_option_injected_iterable :
+         injected_labels_from_plugin_options_) {
+      if (plugin_option_injected_iterable != nullptr) {
+        size += plugin_option_injected_iterable->Size();
+      }
+    }
+    size += additional_labels_.size();
+    return size;
   }
 
  private:
   LabelsIterable* injected_labels_iterable_;
+  const std::vector<std::unique_ptr<LabelsIterable>>&
+      injected_labels_from_plugin_options_;
   absl::Span<const std::pair<absl::string_view, absl::string_view>>
       additional_labels_;
 };
diff --git a/src/cpp/ext/otel/otel_call_tracer.h b/src/cpp/ext/otel/otel_call_tracer.h
index 7544bc6..8b90c92 100644
--- a/src/cpp/ext/otel/otel_call_tracer.h
+++ b/src/cpp/ext/otel/otel_call_tracer.h
@@ -98,6 +98,8 @@
     // Start time (for measuring latency).
     absl::Time start_time_;
     std::unique_ptr<LabelsIterable> injected_labels_;
+    std::vector<std::unique_ptr<LabelsIterable>>
+        injected_labels_from_plugin_options_;
   };
 
   explicit OpenTelemetryCallTracer(OpenTelemetryClientFilter* parent,
diff --git a/src/cpp/ext/otel/otel_client_filter.cc b/src/cpp/ext/otel/otel_client_filter.cc
index 640699d..4587b38 100644
--- a/src/cpp/ext/otel/otel_client_filter.cc
+++ b/src/cpp/ext/otel/otel_client_filter.cc
@@ -73,14 +73,8 @@
 
 absl::StatusOr<OpenTelemetryClientFilter> OpenTelemetryClientFilter::Create(
     const grpc_core::ChannelArgs& args, ChannelFilter::Args /*filter_args*/) {
-  std::string target = args.GetOwnedString(GRPC_ARG_SERVER_URI).value_or("");
-  // Use the original target string only if a filter on the attribute is not
-  // registered or if the filter returns true, otherwise use "other".
-  if (OpenTelemetryPluginState().target_attribute_filter == nullptr ||
-      OpenTelemetryPluginState().target_attribute_filter(target)) {
-    return OpenTelemetryClientFilter(std::move(target));
-  }
-  return OpenTelemetryClientFilter("other");
+  return OpenTelemetryClientFilter(
+      args.GetOwnedString(GRPC_ARG_SERVER_URI).value_or(""));
 }
 
 grpc_core::ArenaPromise<grpc_core::ServerMetadataHandle>
@@ -106,6 +100,19 @@
   return next_promise_factory(std::move(call_args));
 }
 
+OpenTelemetryClientFilter::OpenTelemetryClientFilter(std::string target)
+    : active_plugin_options_view_(
+          ActivePluginOptionsView::MakeForClient(target)) {
+  // Use the original target string only if a filter on the attribute is not
+  // registered or if the filter returns true, otherwise use "other".
+  if (OpenTelemetryPluginState().target_attribute_filter == nullptr ||
+      OpenTelemetryPluginState().target_attribute_filter(target)) {
+    filtered_target_ = std::move(target);
+  } else {
+    filtered_target_ = "other";
+  }
+}
+
 //
 // OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer
 //
@@ -120,11 +127,11 @@
     std::array<std::pair<absl::string_view, absl::string_view>, 2>
         additional_labels = {
             {{OpenTelemetryMethodKey(), parent_->MethodForStats()},
-             {OpenTelemetryTargetKey(), parent_->parent_->target()}}};
+             {OpenTelemetryTargetKey(), parent_->parent_->filtered_target()}}};
     // We might not have all the injected labels that we want at this point, so
     // avoid recording a subset of injected labels here.
     OpenTelemetryPluginState().client.attempt.started->Add(
-        1, KeyValueIterable(/*injected_labels_iterable=*/nullptr,
+        1, KeyValueIterable(/*injected_labels_iterable=*/nullptr, {},
                             additional_labels));
   }
 }
@@ -135,6 +142,15 @@
     injected_labels_ = OpenTelemetryPluginState().labels_injector->GetLabels(
         recv_initial_metadata);
   }
+  parent_->parent_->active_plugin_options_view().ForEach(
+      [&](const InternalOpenTelemetryPluginOption& plugin_option,
+          size_t /*index*/) {
+        auto* labels_injector = plugin_option.labels_injector();
+        if (labels_injector != nullptr) {
+          injected_labels_from_plugin_options_.push_back(
+              labels_injector->GetLabels(recv_initial_metadata));
+        }
+      });
 }
 
 void OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer::
@@ -143,6 +159,14 @@
     OpenTelemetryPluginState().labels_injector->AddLabels(send_initial_metadata,
                                                           nullptr);
   }
+  parent_->parent_->active_plugin_options_view().ForEach(
+      [&](const InternalOpenTelemetryPluginOption& plugin_option,
+          size_t /*index*/) {
+        auto* labels_injector = plugin_option.labels_injector();
+        if (labels_injector != nullptr) {
+          labels_injector->AddLabels(send_initial_metadata, nullptr);
+        }
+      });
 }
 
 void OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer::RecordSendMessage(
@@ -178,11 +202,13 @@
   std::array<std::pair<absl::string_view, absl::string_view>, 3>
       additional_labels = {
           {{OpenTelemetryMethodKey(), parent_->MethodForStats()},
-           {OpenTelemetryTargetKey(), parent_->parent_->target()},
+           {OpenTelemetryTargetKey(), parent_->parent_->filtered_target()},
            {OpenTelemetryStatusKey(),
             grpc_status_code_to_string(
                 static_cast<grpc_status_code>(status.code()))}}};
-  KeyValueIterable labels(injected_labels_.get(), additional_labels);
+  KeyValueIterable labels(injected_labels_.get(),
+                          injected_labels_from_plugin_options_,
+                          additional_labels);
   if (OpenTelemetryPluginState().client.attempt.duration != nullptr) {
     OpenTelemetryPluginState().client.attempt.duration->Record(
         absl::ToDoubleSeconds(absl::Now() - start_time_), labels,
diff --git a/src/cpp/ext/otel/otel_client_filter.h b/src/cpp/ext/otel/otel_client_filter.h
index 9205da4..7c0ddcc 100644
--- a/src/cpp/ext/otel/otel_client_filter.h
+++ b/src/cpp/ext/otel/otel_client_filter.h
@@ -32,6 +32,7 @@
 #include "src/core/lib/channel/promise_based_filter.h"
 #include "src/core/lib/promise/arena_promise.h"
 #include "src/core/lib/transport/transport.h"
+#include "src/cpp/ext/otel/otel_plugin.h"
 
 namespace grpc {
 namespace internal {
@@ -48,13 +49,17 @@
       grpc_core::CallArgs call_args,
       grpc_core::NextPromiseFactory next_promise_factory) override;
 
-  absl::string_view target() const { return target_; }
+  absl::string_view filtered_target() const { return filtered_target_; }
+
+  const ActivePluginOptionsView& active_plugin_options_view() const {
+    return active_plugin_options_view_;
+  }
 
  private:
-  explicit OpenTelemetryClientFilter(std::string target)
-      : target_(std::move(target)) {}
+  explicit OpenTelemetryClientFilter(std::string target);
 
-  std::string target_;
+  std::string filtered_target_;
+  ActivePluginOptionsView active_plugin_options_view_;
 };
 
 }  // namespace internal
diff --git a/src/cpp/ext/otel/otel_plugin.cc b/src/cpp/ext/otel/otel_plugin.cc
index 3a7c8bd..51b5061 100644
--- a/src/cpp/ext/otel/otel_plugin.cc
+++ b/src/cpp/ext/otel/otel_plugin.cc
@@ -89,6 +89,8 @@
 OpenTelemetryPluginBuilderImpl::OpenTelemetryPluginBuilderImpl()
     : metrics_(BaseMetrics()) {}
 
+OpenTelemetryPluginBuilderImpl::~OpenTelemetryPluginBuilderImpl() = default;
+
 OpenTelemetryPluginBuilderImpl&
 OpenTelemetryPluginBuilderImpl::SetMeterProvider(
     std::shared_ptr<opentelemetry::metrics::MeterProvider> meter_provider) {
@@ -153,6 +155,14 @@
   return *this;
 }
 
+OpenTelemetryPluginBuilderImpl& OpenTelemetryPluginBuilderImpl::AddPluginOption(
+    std::unique_ptr<InternalOpenTelemetryPluginOption> option) {
+  // We allow a limit of 64 plugin options to be registered at this time.
+  GPR_ASSERT(plugin_options_.size() < 64);
+  plugin_options_.push_back(std::move(option));
+  return *this;
+}
+
 void OpenTelemetryPluginBuilderImpl::BuildAndRegisterGlobal() {
   opentelemetry::nostd::shared_ptr<opentelemetry::metrics::MeterProvider>
       meter_provider = meter_provider_;
@@ -239,6 +249,7 @@
   g_otel_plugin_state_->generic_method_attribute_filter =
       std::move(generic_method_attribute_filter_);
   g_otel_plugin_state_->meter_provider = std::move(meter_provider);
+  g_otel_plugin_state_->plugin_options = std::move(plugin_options_);
   grpc_core::ServerCallTracerFactory::RegisterGlobal(
       new grpc::internal::OpenTelemetryServerCallTracerFactory());
   grpc_core::CoreConfiguration::RegisterBuilder(
@@ -287,6 +298,8 @@
 OpenTelemetryPluginBuilder::OpenTelemetryPluginBuilder()
     : impl_(std::make_unique<internal::OpenTelemetryPluginBuilderImpl>()) {}
 
+OpenTelemetryPluginBuilder::~OpenTelemetryPluginBuilder() = default;
+
 OpenTelemetryPluginBuilder& OpenTelemetryPluginBuilder::SetMeterProvider(
     std::shared_ptr<opentelemetry::metrics::MeterProvider> meter_provider) {
   impl_->SetMeterProvider(std::move(meter_provider));
@@ -310,6 +323,15 @@
   return *this;
 }
 
+OpenTelemetryPluginBuilder& OpenTelemetryPluginBuilder::AddPluginOption(
+    std::unique_ptr<OpenTelemetryPluginOption> option) {
+  impl_->AddPluginOption(
+      std::unique_ptr<grpc::internal::InternalOpenTelemetryPluginOption>(
+          static_cast<grpc::internal::InternalOpenTelemetryPluginOption*>(
+              option.release())));
+  return *this;
+}
+
 void OpenTelemetryPluginBuilder::BuildAndRegisterGlobal() {
   impl_->BuildAndRegisterGlobal();
 }
diff --git a/src/cpp/ext/otel/otel_plugin.h b/src/cpp/ext/otel/otel_plugin.h
index 3476850..0aa3177 100644
--- a/src/cpp/ext/otel/otel_plugin.h
+++ b/src/cpp/ext/otel/otel_plugin.h
@@ -24,6 +24,7 @@
 #include <stddef.h>
 #include <stdint.h>
 
+#include <bitset>
 #include <memory>
 #include <string>
 #include <utility>
@@ -36,6 +37,8 @@
 #include "opentelemetry/metrics/sync_instruments.h"
 #include "opentelemetry/nostd/shared_ptr.h"
 
+#include <grpcpp/ext/otel_plugin.h>
+
 #include "src/core/lib/channel/channel_args.h"
 #include "src/core/lib/transport/metadata_batch.h"
 
@@ -67,14 +70,27 @@
   // Read the incoming initial metadata to get the set of labels to be added to
   // metrics.
   virtual std::unique_ptr<LabelsIterable> GetLabels(
-      grpc_metadata_batch* incoming_initial_metadata) = 0;
+      grpc_metadata_batch* incoming_initial_metadata) const = 0;
 
   // Modify the outgoing initial metadata with metadata information to be sent
   // to the peer. On the server side, \a labels_from_incoming_metadata returned
   // from `GetLabels` should be provided as input here. On the client side, this
   // should be nullptr.
-  virtual void AddLabels(grpc_metadata_batch* outgoing_initial_metadata,
-                         LabelsIterable* labels_from_incoming_metadata) = 0;
+  virtual void AddLabels(
+      grpc_metadata_batch* outgoing_initial_metadata,
+      LabelsIterable* labels_from_incoming_metadata) const = 0;
+};
+
+class InternalOpenTelemetryPluginOption
+    : public grpc::experimental::OpenTelemetryPluginOption {
+ public:
+  ~InternalOpenTelemetryPluginOption() override = default;
+  // Determines whether a plugin option is active on a given channel target
+  virtual bool IsActiveOnClientChannel(absl::string_view target) const = 0;
+  // Determines whether a plugin option is active on a given server
+  virtual bool IsActiveOnServer(const grpc_core::ChannelArgs& args) const = 0;
+  // Returns the LabelsInjector used by this plugin option, nullptr if none.
+  virtual const grpc::internal::LabelsInjector* labels_injector() const = 0;
 };
 
 struct OpenTelemetryPluginState {
@@ -107,6 +123,8 @@
       generic_method_attribute_filter;
   absl::AnyInvocable<bool(const grpc_core::ChannelArgs& /*args*/) const>
       server_selector;
+  std::vector<std::unique_ptr<InternalOpenTelemetryPluginOption>>
+      plugin_options;
 };
 
 const struct OpenTelemetryPluginState& OpenTelemetryPluginState();
@@ -119,6 +137,7 @@
 class OpenTelemetryPluginBuilderImpl {
  public:
   OpenTelemetryPluginBuilderImpl();
+  ~OpenTelemetryPluginBuilderImpl();
   // If `SetMeterProvider()` is not called, no metrics are collected.
   OpenTelemetryPluginBuilderImpl& SetMeterProvider(
       std::shared_ptr<opentelemetry::metrics::MeterProvider> meter_provider);
@@ -166,6 +185,8 @@
   OpenTelemetryPluginBuilderImpl& SetGenericMethodAttributeFilter(
       absl::AnyInvocable<bool(absl::string_view /*generic_method*/) const>
           generic_method_attribute_filter);
+  OpenTelemetryPluginBuilderImpl& AddPluginOption(
+      std::unique_ptr<InternalOpenTelemetryPluginOption> option);
   void BuildAndRegisterGlobal();
 
  private:
@@ -179,6 +200,52 @@
       generic_method_attribute_filter_;
   absl::AnyInvocable<bool(const grpc_core::ChannelArgs& /*args*/) const>
       server_selector_;
+  std::vector<std::unique_ptr<InternalOpenTelemetryPluginOption>>
+      plugin_options_;
+};
+
+// Creates a convenience wrapper to help iterate over only those plugin options
+// that are active over a given channel/server.
+class ActivePluginOptionsView {
+ public:
+  static ActivePluginOptionsView MakeForClient(absl::string_view target) {
+    return ActivePluginOptionsView(
+        [target](const InternalOpenTelemetryPluginOption& plugin_option) {
+          return plugin_option.IsActiveOnClientChannel(target);
+        });
+  }
+
+  static ActivePluginOptionsView MakeForServer(
+      const grpc_core::ChannelArgs& args) {
+    return ActivePluginOptionsView(
+        [&args](const InternalOpenTelemetryPluginOption& plugin_option) {
+          return plugin_option.IsActiveOnServer(args);
+        });
+  }
+
+  void ForEach(
+      absl::FunctionRef<void(const InternalOpenTelemetryPluginOption&, size_t)>
+          func) const {
+    for (size_t i = 0; i < OpenTelemetryPluginState().plugin_options.size();
+         ++i) {
+      const auto& plugin_option = OpenTelemetryPluginState().plugin_options[i];
+      if (active_mask_[i]) func(*plugin_option, i);
+    }
+  }
+
+ private:
+  explicit ActivePluginOptionsView(
+      absl::FunctionRef<bool(const InternalOpenTelemetryPluginOption&)> func) {
+    for (size_t i = 0; i < OpenTelemetryPluginState().plugin_options.size();
+         ++i) {
+      const auto& plugin_option = OpenTelemetryPluginState().plugin_options[i];
+      if (plugin_option != nullptr && func(*plugin_option)) {
+        active_mask_.set(i);
+      }
+    }
+  }
+
+  std::bitset<64> active_mask_;
 };
 
 }  // namespace internal
diff --git a/src/cpp/ext/otel/otel_server_call_tracer.cc b/src/cpp/ext/otel/otel_server_call_tracer.cc
index ab7f3a9..7fa40c6 100644
--- a/src/cpp/ext/otel/otel_server_call_tracer.cc
+++ b/src/cpp/ext/otel/otel_server_call_tracer.cc
@@ -56,7 +56,12 @@
 
 class OpenTelemetryServerCallTracer : public grpc_core::ServerCallTracer {
  public:
-  OpenTelemetryServerCallTracer() : start_time_(absl::Now()) {}
+  explicit OpenTelemetryServerCallTracer(const grpc_core::ChannelArgs& args)
+      : start_time_(absl::Now()),
+        active_plugin_options_view_(
+            ActivePluginOptionsView::MakeForServer(args)),
+        injected_labels_from_plugin_options_(
+            OpenTelemetryPluginState().plugin_options.size()) {}
 
   std::string TraceId() override {
     // Not implemented
@@ -81,6 +86,16 @@
       OpenTelemetryPluginState().labels_injector->AddLabels(
           send_initial_metadata, injected_labels_.get());
     }
+    active_plugin_options_view_.ForEach(
+        [&](const InternalOpenTelemetryPluginOption& plugin_option,
+            size_t index) {
+          auto* labels_injector = plugin_option.labels_injector();
+          if (labels_injector != nullptr) {
+            labels_injector->AddLabels(
+                send_initial_metadata,
+                injected_labels_from_plugin_options_[index].get());
+          }
+        });
   }
 
   void RecordSendTrailingMetadata(
@@ -148,6 +163,11 @@
   grpc_core::Slice path_;
   std::unique_ptr<LabelsIterable> injected_labels_;
   bool registered_method_;
+  ActivePluginOptionsView active_plugin_options_view_;
+  // TODO(yashykt): It's wasteful to do this per call. When we re-haul the stats
+  // infrastructure, this should move to be done per server.
+  std::vector<std::unique_ptr<LabelsIterable>>
+      injected_labels_from_plugin_options_;
 };
 
 void OpenTelemetryServerCallTracer::RecordReceivedInitialMetadata(
@@ -158,6 +178,15 @@
     injected_labels_ = OpenTelemetryPluginState().labels_injector->GetLabels(
         recv_initial_metadata);
   }
+  active_plugin_options_view_.ForEach(
+      [&](const InternalOpenTelemetryPluginOption& plugin_option,
+          size_t index) {
+        auto* labels_injector = plugin_option.labels_injector();
+        if (labels_injector != nullptr) {
+          injected_labels_from_plugin_options_[index] =
+              labels_injector->GetLabels(recv_initial_metadata);
+        }
+      });
   registered_method_ =
       recv_initial_metadata->get(grpc_core::GrpcRegisteredMethod())
           .value_or(nullptr) != nullptr;
@@ -167,7 +196,7 @@
     // We might not have all the injected labels that we want at this point, so
     // avoid recording a subset of injected labels here.
     OpenTelemetryPluginState().server.call.started->Add(
-        1, KeyValueIterable(/*injected_labels_iterable=*/nullptr,
+        1, KeyValueIterable(/*injected_labels_iterable=*/nullptr, {},
                             additional_labels));
   }
 }
@@ -186,7 +215,9 @@
           {{OpenTelemetryMethodKey(), MethodForStats()},
            {OpenTelemetryStatusKey(),
             grpc_status_code_to_string(final_info->final_status)}}};
-  KeyValueIterable labels(injected_labels_.get(), additional_labels);
+  KeyValueIterable labels(injected_labels_.get(),
+                          injected_labels_from_plugin_options_,
+                          additional_labels);
   if (OpenTelemetryPluginState().server.call.duration != nullptr) {
     OpenTelemetryPluginState().server.call.duration->Record(
         absl::ToDoubleSeconds(elapsed_time_), labels,
@@ -216,8 +247,8 @@
 
 grpc_core::ServerCallTracer*
 OpenTelemetryServerCallTracerFactory::CreateNewServerCallTracer(
-    grpc_core::Arena* arena) {
-  return arena->ManagedNew<OpenTelemetryServerCallTracer>();
+    grpc_core::Arena* arena, const grpc_core::ChannelArgs& args) {
+  return arena->ManagedNew<OpenTelemetryServerCallTracer>(args);
 }
 
 bool OpenTelemetryServerCallTracerFactory::IsServerTraced(
diff --git a/src/cpp/ext/otel/otel_server_call_tracer.h b/src/cpp/ext/otel/otel_server_call_tracer.h
index 2ce422e..c8cea43 100644
--- a/src/cpp/ext/otel/otel_server_call_tracer.h
+++ b/src/cpp/ext/otel/otel_server_call_tracer.h
@@ -32,7 +32,8 @@
     : public grpc_core::ServerCallTracerFactory {
  public:
   grpc_core::ServerCallTracer* CreateNewServerCallTracer(
-      grpc_core::Arena* arena) override;
+      grpc_core::Arena* arena,
+      const grpc_core::ChannelArgs& channel_args) override;
 
   bool IsServerTraced(const grpc_core::ChannelArgs& args) override;
 };
diff --git a/src/python/grpcio_observability/grpc_observability/server_call_tracer.cc b/src/python/grpcio_observability/grpc_observability/server_call_tracer.cc
index 722211a..6c1e68b 100644
--- a/src/python/grpcio_observability/grpc_observability/server_call_tracer.cc
+++ b/src/python/grpcio_observability/grpc_observability/server_call_tracer.cc
@@ -261,10 +261,11 @@
 
 grpc_core::ServerCallTracer*
 PythonOpenCensusServerCallTracerFactory::CreateNewServerCallTracer(
-    grpc_core::Arena* arena) {
+    grpc_core::Arena* arena, const grpc_core::ChannelArgs& channel_args) {
   // We don't use arena here to to ensure that memory is allocated and freed in
   // the same DLL in Windows.
   (void)arena;
+  (void)channel_args;
   return new PythonOpenCensusServerCallTracer();
 }
 
diff --git a/src/python/grpcio_observability/grpc_observability/server_call_tracer.h b/src/python/grpcio_observability/grpc_observability/server_call_tracer.h
index a4354b6..3792ae0 100644
--- a/src/python/grpcio_observability/grpc_observability/server_call_tracer.h
+++ b/src/python/grpcio_observability/grpc_observability/server_call_tracer.h
@@ -30,7 +30,8 @@
     : public grpc_core::ServerCallTracerFactory {
  public:
   grpc_core::ServerCallTracer* CreateNewServerCallTracer(
-      grpc_core::Arena* arena) override;
+      grpc_core::Arena* arena,
+      const grpc_core::ChannelArgs& channel_args) override;
 };
 
 inline absl::string_view GetMethod(const grpc_core::Slice& path) {
diff --git a/test/core/channel/server_call_tracer_factory_test.cc b/test/core/channel/server_call_tracer_factory_test.cc
index 09e7985..7786215 100644
--- a/test/core/channel/server_call_tracer_factory_test.cc
+++ b/test/core/channel/server_call_tracer_factory_test.cc
@@ -26,7 +26,8 @@
 
 class TestServerCallTracerFactory : public ServerCallTracerFactory {
  public:
-  ServerCallTracer* CreateNewServerCallTracer(Arena* /*arena*/) override {
+  ServerCallTracer* CreateNewServerCallTracer(
+      Arena* /*arena*/, const ChannelArgs& /*args*/) override {
     Crash("Not implemented");
   }
 };
diff --git a/test/core/end2end/tests/http2_stats.cc b/test/core/end2end/tests/http2_stats.cc
index 5986039..14f8dbe 100644
--- a/test/core/end2end/tests/http2_stats.cc
+++ b/test/core/end2end/tests/http2_stats.cc
@@ -200,7 +200,8 @@
 
 class FakeServerCallTracerFactory : public ServerCallTracerFactory {
  public:
-  ServerCallTracer* CreateNewServerCallTracer(Arena* arena) override {
+  ServerCallTracer* CreateNewServerCallTracer(
+      Arena* arena, const ChannelArgs& /*args*/) override {
     return arena->ManagedNew<FakeServerCallTracer>();
   }
 };
diff --git a/test/cpp/ext/csm/BUILD b/test/cpp/ext/csm/BUILD
index ed51117..3d505e0 100644
--- a/test/cpp/ext/csm/BUILD
+++ b/test/cpp/ext/csm/BUILD
@@ -36,6 +36,8 @@
     ],
     deps = [
         "//:grpc++",
+        "//:grpcpp_csm_observability",
+        "//:grpcpp_otel_plugin",
         "//src/cpp/ext/csm:csm_observability",
         "//src/cpp/ext/otel:otel_plugin",
         "//test/core/util:grpc_test_util",
diff --git a/test/cpp/ext/csm/csm_observability_test.cc b/test/cpp/ext/csm/csm_observability_test.cc
index 54e3e5c..c6825c8 100644
--- a/test/cpp/ext/csm/csm_observability_test.cc
+++ b/test/cpp/ext/csm/csm_observability_test.cc
@@ -22,6 +22,7 @@
 #include "gtest/gtest.h"
 
 #include <grpcpp/ext/csm_observability.h>
+#include <grpcpp/ext/otel_plugin.h>
 
 #include "src/core/lib/gprpp/env.h"
 #include "test/core/util/test_config.h"
@@ -62,6 +63,12 @@
       "xds://traffic-director-global.xds.googleapis.com/foo"));
 }
 
+TEST(CsmPluginOptionTest, Basic) {
+  experimental::OpenTelemetryPluginBuilder()
+      .AddPluginOption(experimental::MakeCsmOpenTelemetryPluginOption())
+      .BuildAndRegisterGlobal();
+}
+
 }  // namespace
 }  // namespace testing
 }  // namespace grpc
diff --git a/test/cpp/ext/otel/otel_plugin_test.cc b/test/cpp/ext/otel/otel_plugin_test.cc
index e763cd7..994ed6c 100644
--- a/test/cpp/ext/otel/otel_plugin_test.cc
+++ b/test/cpp/ext/otel/otel_plugin_test.cc
@@ -653,6 +653,282 @@
   EXPECT_EQ(*status_value, "UNIMPLEMENTED");
 }
 
+using OpenTelemetryPluginOptionEnd2EndTest = OpenTelemetryPluginEnd2EndTest;
+
+class SimpleLabelIterable : public grpc::internal::LabelsIterable {
+ public:
+  explicit SimpleLabelIterable(
+      std::pair<absl::string_view, absl::string_view> label)
+      : label_(label) {}
+
+  absl::optional<std::pair<absl::string_view, absl::string_view>> Next()
+      override {
+    if (iterated_) {
+      return absl::nullopt;
+    }
+    iterated_ = true;
+    return label_;
+  }
+
+  size_t Size() const override { return 1; }
+
+  void ResetIteratorPosition() override { iterated_ = false; }
+
+ private:
+  bool iterated_ = false;
+  std::pair<absl::string_view, absl::string_view> label_;
+};
+
+class CustomLabelInjector : public grpc::internal::LabelsInjector {
+ public:
+  explicit CustomLabelInjector(std::pair<std::string, std::string> label)
+      : label_(std::move(label)) {}
+  ~CustomLabelInjector() override {}
+
+  std::unique_ptr<grpc::internal::LabelsIterable> GetLabels(
+      grpc_metadata_batch* /*incoming_initial_metadata*/) const override {
+    return std::make_unique<SimpleLabelIterable>(label_);
+  }
+
+  void AddLabels(
+      grpc_metadata_batch* /*outgoing_initial_metadata*/,
+      grpc::internal::LabelsIterable* /*labels_from_incoming_metadata*/)
+      const override {}
+
+ private:
+  std::pair<std::string, std::string> label_;
+};
+
+class CustomPluginOption
+    : public grpc::internal::InternalOpenTelemetryPluginOption {
+ public:
+  CustomPluginOption(bool enabled_on_client, bool enabled_on_server,
+                     std::pair<std::string, std::string> label)
+      : enabled_on_client_(enabled_on_client),
+        enabled_on_server_(enabled_on_server),
+        label_injector_(
+            std::make_unique<CustomLabelInjector>(std::move(label))) {}
+
+  ~CustomPluginOption() override {}
+
+  bool IsActiveOnClientChannel(absl::string_view /*target*/) const override {
+    return enabled_on_client_;
+  }
+
+  bool IsActiveOnServer(const grpc_core::ChannelArgs& /*args*/) const override {
+    return enabled_on_server_;
+  }
+
+  const grpc::internal::LabelsInjector* labels_injector() const override {
+    return label_injector_.get();
+  }
+
+ private:
+  bool enabled_on_client_;
+  bool enabled_on_server_;
+  std::unique_ptr<CustomLabelInjector> label_injector_;
+};
+
+TEST_F(OpenTelemetryPluginOptionEnd2EndTest, Basic) {
+  std::vector<
+      std::unique_ptr<grpc::internal::InternalOpenTelemetryPluginOption>>
+      plugin_option_list;
+  plugin_option_list.emplace_back(std::make_unique<CustomPluginOption>(
+      /*enabled_on_client*/ true, /*enabled_on_server*/ true,
+      std::make_pair("key", "value")));
+  Init({grpc::experimental::OpenTelemetryPluginBuilder::
+            kClientAttemptDurationInstrumentName,
+        grpc::experimental::OpenTelemetryPluginBuilder::
+            kServerCallDurationInstrumentName},
+       /*resource=*/opentelemetry::sdk::resource::Resource::Create({}),
+       /*labels_injector=*/nullptr,
+       /*test_no_meter_provider=*/false,
+       /*target_selector=*/absl::AnyInvocable<bool(absl::string_view) const>(),
+       /*target_attribute_filter=*/
+       absl::AnyInvocable<bool(absl::string_view) const>(),
+       /*generic_method_attribute_filter=*/
+       absl::AnyInvocable<bool(absl::string_view /*generic_method*/) const>(),
+       /*plugin_options=*/std::move(plugin_option_list));
+  SendRPC();
+  auto data = ReadCurrentMetricsData(
+      [&](const absl::flat_hash_map<
+          std::string,
+          std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>&
+              data) {
+        return !data.contains("grpc.client.attempt.duration") ||
+               !data.contains("grpc.server.call.duration");
+      });
+  // Verify client side metric
+  ASSERT_EQ(data["grpc.client.attempt.duration"].size(), 1);
+  const auto& client_attributes =
+      data["grpc.client.attempt.duration"][0].attributes.GetAttributes();
+  EXPECT_EQ(client_attributes.size(), 4);
+  EXPECT_EQ(absl::get<std::string>(client_attributes.at("key")), "value");
+  // Verify server side metric
+  ASSERT_EQ(data["grpc.server.call.duration"].size(), 1);
+  const auto& server_attributes =
+      data["grpc.server.call.duration"][0].attributes.GetAttributes();
+  EXPECT_EQ(server_attributes.size(), 3);
+  EXPECT_EQ(absl::get<std::string>(server_attributes.at("key")), "value");
+}
+
+TEST_F(OpenTelemetryPluginOptionEnd2EndTest, ClientOnlyPluginOption) {
+  std::vector<
+      std::unique_ptr<grpc::internal::InternalOpenTelemetryPluginOption>>
+      plugin_option_list;
+  plugin_option_list.emplace_back(std::make_unique<CustomPluginOption>(
+      /*enabled_on_client*/ true, /*enabled_on_server*/ false,
+      std::make_pair("key", "value")));
+  Init({grpc::experimental::OpenTelemetryPluginBuilder::
+            kClientAttemptDurationInstrumentName,
+        grpc::experimental::OpenTelemetryPluginBuilder::
+            kServerCallDurationInstrumentName},
+       /*resource=*/opentelemetry::sdk::resource::Resource::Create({}),
+       /*labels_injector=*/nullptr,
+       /*test_no_meter_provider=*/false,
+       /*target_selector=*/absl::AnyInvocable<bool(absl::string_view) const>(),
+       /*target_attribute_filter=*/
+       absl::AnyInvocable<bool(absl::string_view) const>(),
+       /*generic_method_attribute_filter=*/
+       absl::AnyInvocable<bool(absl::string_view /*generic_method*/) const>(),
+       /*plugin_options=*/std::move(plugin_option_list));
+  SendRPC();
+  auto data = ReadCurrentMetricsData(
+      [&](const absl::flat_hash_map<
+          std::string,
+          std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>&
+              data) {
+        return !data.contains("grpc.client.attempt.duration") ||
+               !data.contains("grpc.server.call.duration");
+      });
+  // Verify client side metric
+  ASSERT_EQ(data["grpc.client.attempt.duration"].size(), 1);
+  const auto& client_attributes =
+      data["grpc.client.attempt.duration"][0].attributes.GetAttributes();
+  EXPECT_EQ(client_attributes.size(), 4);
+  EXPECT_EQ(absl::get<std::string>(client_attributes.at("key")), "value");
+  // Verify server side metric
+  ASSERT_EQ(data["grpc.server.call.duration"].size(), 1);
+  const auto& server_attributes =
+      data["grpc.server.call.duration"][0].attributes.GetAttributes();
+  EXPECT_EQ(server_attributes.size(), 2);
+  EXPECT_THAT(server_attributes,
+              ::testing::Not(::testing::Contains(::testing::Key("key"))));
+}
+
+TEST_F(OpenTelemetryPluginOptionEnd2EndTest, ServerOnlyPluginOption) {
+  std::vector<
+      std::unique_ptr<grpc::internal::InternalOpenTelemetryPluginOption>>
+      plugin_option_list;
+  plugin_option_list.emplace_back(std::make_unique<CustomPluginOption>(
+      /*enabled_on_client*/ false, /*enabled_on_server*/ true,
+      std::make_pair("key", "value")));
+  Init({grpc::experimental::OpenTelemetryPluginBuilder::
+            kClientAttemptDurationInstrumentName,
+        grpc::experimental::OpenTelemetryPluginBuilder::
+            kServerCallDurationInstrumentName},
+       /*resource=*/opentelemetry::sdk::resource::Resource::Create({}),
+       /*labels_injector=*/nullptr,
+       /*test_no_meter_provider=*/false,
+       /*target_selector=*/absl::AnyInvocable<bool(absl::string_view) const>(),
+       /*target_attribute_filter=*/
+       absl::AnyInvocable<bool(absl::string_view) const>(),
+       /*generic_method_attribute_filter=*/
+       absl::AnyInvocable<bool(absl::string_view /*generic_method*/) const>(),
+       /*plugin_options=*/std::move(plugin_option_list));
+  SendRPC();
+  auto data = ReadCurrentMetricsData(
+      [&](const absl::flat_hash_map<
+          std::string,
+          std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>&
+              data) {
+        return !data.contains("grpc.client.attempt.duration") ||
+               !data.contains("grpc.server.call.duration");
+      });
+  // Verify client side metric
+  ASSERT_EQ(data["grpc.client.attempt.duration"].size(), 1);
+  const auto& attributes =
+      data["grpc.client.attempt.duration"][0].attributes.GetAttributes();
+  EXPECT_EQ(attributes.size(), 3);
+  EXPECT_THAT(attributes,
+              ::testing::Not(::testing::Contains(::testing::Key("key"))));
+  // Verify server side metric
+  ASSERT_EQ(data["grpc.server.call.duration"].size(), 1);
+  const auto& server_attributes =
+      data["grpc.server.call.duration"][0].attributes.GetAttributes();
+  EXPECT_EQ(server_attributes.size(), 3);
+  EXPECT_EQ(absl::get<std::string>(server_attributes.at("key")), "value");
+}
+
+TEST_F(OpenTelemetryPluginOptionEnd2EndTest,
+       MultipleEnabledAndDisabledPluginOptions) {
+  std::vector<
+      std::unique_ptr<grpc::internal::InternalOpenTelemetryPluginOption>>
+      plugin_option_list;
+  plugin_option_list.reserve(5);
+  plugin_option_list.emplace_back(std::make_unique<CustomPluginOption>(
+      /*enabled_on_client*/ true, /*enabled_on_server*/ true,
+      std::make_pair("key1", "value1")));
+  plugin_option_list.emplace_back(std::make_unique<CustomPluginOption>(
+      /*enabled_on_client*/ true, /*enabled_on_server*/ false,
+      std::make_pair("key2", "value2")));
+  plugin_option_list.emplace_back(std::make_unique<CustomPluginOption>(
+      /*enabled_on_client*/ true, /*enabled_on_server*/ false,
+      std::make_pair("key3", "value3")));
+  plugin_option_list.emplace_back(std::make_unique<CustomPluginOption>(
+      /*enabled_on_client*/ false, /*enabled_on_server*/ true,
+      std::make_pair("key4", "value4")));
+  plugin_option_list.emplace_back(std::make_unique<CustomPluginOption>(
+      /*enabled_on_client*/ false, /*enabled_on_server*/ true,
+      std::make_pair("key5", "value5")));
+  Init({grpc::experimental::OpenTelemetryPluginBuilder::
+            kClientAttemptDurationInstrumentName,
+        grpc::experimental::OpenTelemetryPluginBuilder::
+            kServerCallDurationInstrumentName},
+       /*resource=*/opentelemetry::sdk::resource::Resource::Create({}),
+       /*labels_injector=*/nullptr,
+       /*test_no_meter_provider=*/false,
+       /*target_selector=*/absl::AnyInvocable<bool(absl::string_view) const>(),
+       /*target_attribute_filter=*/
+       absl::AnyInvocable<bool(absl::string_view) const>(),
+       /*generic_method_attribute_filter=*/
+       absl::AnyInvocable<bool(absl::string_view /*generic_method*/) const>(),
+       /*plugin_options=*/std::move(plugin_option_list));
+  SendRPC();
+  auto data = ReadCurrentMetricsData(
+      [&](const absl::flat_hash_map<
+          std::string,
+          std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>&
+              data) {
+        return !data.contains("grpc.client.attempt.duration") ||
+               !data.contains("grpc.server.call.duration");
+      });
+  // Verify client side metric
+  ASSERT_EQ(data["grpc.client.attempt.duration"].size(), 1);
+  const auto& client_attributes =
+      data["grpc.client.attempt.duration"][0].attributes.GetAttributes();
+  EXPECT_EQ(client_attributes.size(), 6);
+  EXPECT_EQ(absl::get<std::string>(client_attributes.at("key1")), "value1");
+  EXPECT_EQ(absl::get<std::string>(client_attributes.at("key2")), "value2");
+  EXPECT_EQ(absl::get<std::string>(client_attributes.at("key3")), "value3");
+  EXPECT_THAT(client_attributes,
+              ::testing::Not(::testing::Contains(::testing::Key("key4"))));
+  EXPECT_THAT(client_attributes,
+              ::testing::Not(::testing::Contains(::testing::Key("key5"))));
+  // Verify server side metric
+  ASSERT_EQ(data["grpc.server.call.duration"].size(), 1);
+  const auto& server_attributes =
+      data["grpc.server.call.duration"][0].attributes.GetAttributes();
+  EXPECT_EQ(server_attributes.size(), 5);
+  EXPECT_EQ(absl::get<std::string>(server_attributes.at("key1")), "value1");
+  EXPECT_THAT(server_attributes,
+              ::testing::Not(::testing::Contains(::testing::Key("key2"))));
+  EXPECT_THAT(server_attributes,
+              ::testing::Not(::testing::Contains(::testing::Key("key3"))));
+  EXPECT_EQ(absl::get<std::string>(server_attributes.at("key4")), "value4");
+  EXPECT_EQ(absl::get<std::string>(server_attributes.at("key5")), "value5");
+}
+
 }  // namespace
 }  // namespace testing
 }  // namespace grpc
diff --git a/test/cpp/ext/otel/otel_test_library.cc b/test/cpp/ext/otel/otel_test_library.cc
index bf53001..53aa350 100644
--- a/test/cpp/ext/otel/otel_test_library.cc
+++ b/test/cpp/ext/otel/otel_test_library.cc
@@ -48,7 +48,10 @@
     absl::AnyInvocable<bool(absl::string_view /*target*/) const>
         target_attribute_filter,
     absl::AnyInvocable<bool(absl::string_view /*generic_method*/) const>
-        generic_method_attribute_filter) {
+        generic_method_attribute_filter,
+    std::vector<
+        std::unique_ptr<grpc::internal::InternalOpenTelemetryPluginOption>>
+        plugin_options) {
   // We are resetting the MeterProvider and OpenTelemetry plugin at the start
   // of each test to avoid test results from one test carrying over to another
   // test. (Some measurements can get arbitrarily delayed.)
@@ -76,6 +79,9 @@
   ot_builder.SetTargetAttributeFilter(std::move(target_attribute_filter));
   ot_builder.SetGenericMethodAttributeFilter(
       std::move(generic_method_attribute_filter));
+  for (auto& option : plugin_options) {
+    ot_builder.AddPluginOption(std::move(option));
+  }
   ot_builder.BuildAndRegisterGlobal();
   grpc_init();
   grpc::ServerBuilder builder;
diff --git a/test/cpp/ext/otel/otel_test_library.h b/test/cpp/ext/otel/otel_test_library.h
index 1076d29..629d8c1 100644
--- a/test/cpp/ext/otel/otel_test_library.h
+++ b/test/cpp/ext/otel/otel_test_library.h
@@ -70,7 +70,10 @@
               absl::AnyInvocable<bool(absl::string_view) const>(),
       absl::AnyInvocable<bool(absl::string_view /*generic_method*/) const>
           generic_method_attribute_filter = absl::AnyInvocable<
-              bool(absl::string_view /*generic_method*/) const>());
+              bool(absl::string_view /*generic_method*/) const>(),
+      std::vector<
+          std::unique_ptr<grpc::internal::InternalOpenTelemetryPluginOption>>
+          plugin_options = {});
 
   void TearDown() override;
 
diff --git a/test/cpp/interop/observability_client.cc b/test/cpp/interop/observability_client.cc
index a815f09..259b47d 100644
--- a/test/cpp/interop/observability_client.cc
+++ b/test/cpp/interop/observability_client.cc
@@ -30,10 +30,10 @@
 #include <grpcpp/channel.h>
 #include <grpcpp/client_context.h>
 #include <grpcpp/ext/gcp_observability.h>
+#include <grpcpp/ext/otel_plugin.h>
 
 #include "src/core/lib/gpr/string.h"
 #include "src/core/lib/gprpp/crash.h"
-#include "src/cpp/ext/otel/otel_plugin.h"
 #include "test/core/util/test_config.h"
 #include "test/cpp/interop/client_helper.h"
 #include "test/cpp/interop/interop_client.h"
@@ -236,7 +236,7 @@
     auto meter_provider =
         std::make_shared<opentelemetry::sdk::metrics::MeterProvider>();
     meter_provider->AddMetricReader(std::move(prometheus_exporter));
-    grpc::internal::OpenTelemetryPluginBuilderImpl otel_builder;
+    grpc::experimental::OpenTelemetryPluginBuilder otel_builder;
     otel_builder.SetMeterProvider(std::move(meter_provider));
     otel_builder.BuildAndRegisterGlobal();
   }