feat(spanner): add OpenTelemetry metrics for dynamic channel pool (#14613)

Split of https://github.com/googleapis/google-cloud-go/pull/14604

Internal reference: go/go-dcp-design

 ## Add OpenTelemetry metrics for the dynamic channel pool

  ### What
Adds OpenTelemetry metric support for the dynamic channel pool (DCP),
giving visibility into pool size, scaling activity, and RPC load.

  ### Metrics (prefix `spanner/dynamic_channel_pool/`)
  | Metric | Type | Description |
  | --- | --- | --- |
  | `num_channels` | gauge | Active channels in the pool |
  | `draining_channel_count` | gauge | Channels currently draining |
  | `max_allowed_channels` | gauge | Max channels allowed |
  | `active_rpc_count` | gauge | Active RPCs on the pool |
| `max_active_rpc_per_channel ` | gauge | Maximum number of RPCs
currently active on any channel in the dynamic channel pool.|
| `channel_pool_scaling` | counter | Channels added/removed, tagged
`direction=up\|down` |

All metrics carry the common attributes `client_id`, `database`,
`instance_id`, `library_version`.

  ### Enablement
Consistent with existing client metrics: emitted only when
`EnableOpenTelemetryMetrics()` is set. Uses
`ClientConfig.OpenTelemetryMeterProvider`, falling back to the global
OpenTelemetry MeterProvider when unset. The observable callback is
unregistered on `dynamicChannelPool.Close()`.
diff --git a/spanner/client.go b/spanner/client.go
index b9e0833..cc2ec01 100644
--- a/spanner/client.go
+++ b/spanner/client.go
@@ -498,7 +498,7 @@
 	dial := func(dialCtx context.Context) (gtransport.ConnPool, error) {
 		return gtransport.DialPool(dialCtx, allClientOpts(1, config.Compression, config.EnableDirectAccess, dcpOpts...)...)
 	}
-	dcp, err := newDynamicChannelPool(ctx, sc, config.DynamicChannelPoolConfig, dial)
+	dcp, err := newDynamicChannelPool(ctx, sc, config.DynamicChannelPoolConfig, config.OpenTelemetryMeterProvider, dial)
 	if err != nil {
 		return nil, nil, err
 	}
diff --git a/spanner/dcp_metrics.go b/spanner/dcp_metrics.go
new file mode 100644
index 0000000..8cbc3bd
--- /dev/null
+++ b/spanner/dcp_metrics.go
@@ -0,0 +1,159 @@
+// Copyright 2026 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package spanner
+
+import (
+	"context"
+	"log"
+
+	"cloud.google.com/go/spanner/internal"
+	"go.opentelemetry.io/otel"
+	"go.opentelemetry.io/otel/attribute"
+	"go.opentelemetry.io/otel/metric"
+)
+
+const dcpMetricsPrefix = metricsPrefix + "dynamic_channel_pool/"
+
+var attributeKeyDCPDirection = attribute.Key("direction")
+
+type dcpMetrics struct {
+	attrs []attribute.KeyValue
+
+	numChannels            metric.Int64ObservableGauge
+	drainingChannelCount   metric.Int64ObservableGauge
+	maxAllowedChannels     metric.Int64ObservableGauge
+	activeRPCCount         metric.Int64ObservableGauge
+	maxActiveRPCPerChannel metric.Int64ObservableGauge
+	channelPoolScaling     metric.Int64Counter
+	registration           metric.Registration
+}
+
+func newDCPMetrics(p *dynamicChannelPool, mp metric.MeterProvider) *dcpMetrics {
+	if !IsOpenTelemetryMetricsEnabled() {
+		return nil
+	}
+	if mp == nil {
+		mp = otel.GetMeterProvider()
+	}
+	_, instance, database, err := parseDatabaseName(p.sc.database)
+	if err != nil {
+		logf(p.sc.logger, "spanner_dcp: failed to parse database name for OpenTelemetry metrics: %v", err)
+		return nil
+	}
+	m := &dcpMetrics{
+		attrs: []attribute.KeyValue{
+			attributeKeyClientID.String(p.sc.id),
+			attributeKeyDatabase.String(database),
+			attributeKeyInstance.String(instance),
+			attributeKeyLibVersion.String(internal.Version),
+		},
+	}
+	meter := mp.Meter(OtInstrumentationScope, metric.WithInstrumentationVersion(internal.Version))
+	if m.numChannels, err = dcpInt64ObservableGauge(meter, p.sc.logger, "num_channels", "Number of active channels currently in the dynamic channel pool.", "{channel}"); err != nil {
+		return nil
+	}
+	if m.drainingChannelCount, err = dcpInt64ObservableGauge(meter, p.sc.logger, "draining_channel_count", "Number of channels currently draining in the dynamic channel pool.", "{channel}"); err != nil {
+		return nil
+	}
+	if m.maxAllowedChannels, err = dcpInt64ObservableGauge(meter, p.sc.logger, "max_allowed_channels", "Maximum number of channels allowed in the dynamic channel pool.", "{channel}"); err != nil {
+		return nil
+	}
+	if m.activeRPCCount, err = dcpInt64ObservableGauge(meter, p.sc.logger, "active_rpc_count", "Number of RPCs currently active on the dynamic channel pool.", "{rpc}"); err != nil {
+		return nil
+	}
+	if m.maxActiveRPCPerChannel, err = dcpInt64ObservableGauge(meter, p.sc.logger, "max_active_rpc_per_channel", "Maximum number of RPCs currently active on any channel in the dynamic channel pool.", "{rpc}"); err != nil {
+		return nil
+	}
+
+	m.channelPoolScaling, err = meter.Int64Counter(
+		dcpMetricsPrefix+"channel_pool_scaling",
+		metric.WithDescription("Number of channels added or removed by dynamic channel pool scaling."),
+		metric.WithUnit("{channel}"),
+	)
+	if err != nil {
+		logf(p.sc.logger, "Error during registering instrument for metric %s, error: %v", dcpMetricsPrefix+"channel_pool_scaling", err)
+		return nil
+	}
+
+	reg, err := meter.RegisterCallback(
+		func(ctx context.Context, o metric.Observer) error {
+			attrs := metric.WithAttributes(m.attrs...)
+			o.ObserveInt64(m.numChannels, int64(p.Num()), attrs)
+			o.ObserveInt64(m.drainingChannelCount, p.drainingCount.Load(), attrs)
+			o.ObserveInt64(m.maxAllowedChannels, int64(p.cfg.DCPMaxChannels), attrs)
+			o.ObserveInt64(m.activeRPCCount, int64(p.totalRPCLoad.Load()), attrs)
+			o.ObserveInt64(m.maxActiveRPCPerChannel, p.maxActiveRPCPerChannel(), attrs)
+			return nil
+		},
+		m.numChannels,
+		m.drainingChannelCount,
+		m.maxAllowedChannels,
+		m.activeRPCCount,
+		m.maxActiveRPCPerChannel,
+	)
+	if err != nil {
+		logf(p.sc.logger, "spanner_dcp: failed to register OpenTelemetry metric callback: %v", err)
+		return nil
+	}
+	m.registration = reg
+	return m
+}
+
+func (p *dynamicChannelPool) maxActiveRPCPerChannel() int64 {
+	var max int32
+	for _, e := range p.getEntries() {
+		if load := e.rpcLoad(); load > max {
+			max = load
+		}
+	}
+	return int64(max)
+}
+
+func dcpInt64ObservableGauge(meter metric.Meter, logger *log.Logger, name, desc, unit string) (metric.Int64ObservableGauge, error) {
+	fullName := dcpMetricsPrefix + name
+	instrument, err := meter.Int64ObservableGauge(fullName, metric.WithDescription(desc), metric.WithUnit(unit))
+	if err != nil {
+		logf(logger, "Error during registering instrument for metric %s, error: %v", fullName, err)
+	}
+	return instrument, err
+}
+
+func (m *dcpMetrics) recordScaleUp(ctx context.Context, channels int64) {
+	m.recordScaling(ctx, channels, "up")
+}
+
+func (m *dcpMetrics) recordScaleDown(ctx context.Context, channels int64) {
+	m.recordScaling(ctx, channels, "down")
+}
+
+func (m *dcpMetrics) recordScaling(ctx context.Context, channels int64, direction string) {
+	if m == nil || m.channelPoolScaling == nil || channels <= 0 {
+		return
+	}
+	attrs := make([]attribute.KeyValue, 0, len(m.attrs)+1)
+	attrs = append(attrs, m.attrs...)
+	attrs = append(attrs, attributeKeyDCPDirection.String(direction))
+	m.channelPoolScaling.Add(ctx, channels, metric.WithAttributes(attrs...))
+}
+
+func (m *dcpMetrics) close(logger *log.Logger) {
+	if m == nil || m.registration == nil {
+		return
+	}
+	if err := m.registration.Unregister(); err != nil {
+		logf(logger, "Failed to unregister callback from the OpenTelemetry meter, error : %v", err)
+	}
+	m.registration = nil
+}
diff --git a/spanner/dynamic_channel_pool.go b/spanner/dynamic_channel_pool.go
index 6b1e23b..27cc897 100644
--- a/spanner/dynamic_channel_pool.go
+++ b/spanner/dynamic_channel_pool.go
@@ -28,6 +28,7 @@
 	vkit "cloud.google.com/go/spanner/apiv1"
 	"cloud.google.com/go/spanner/apiv1/spannerpb"
 	"github.com/googleapis/gax-go/v2"
+	"go.opentelemetry.io/otel/metric"
 	gtransport "google.golang.org/api/transport/grpc"
 	"google.golang.org/grpc"
 	"google.golang.org/grpc/codes"
@@ -197,6 +198,7 @@
 	lowLoadRuns   int
 	monitorMu     sync.Mutex
 	primeSession  atomic.Value // string
+	metrics       *dcpMetrics
 
 	drainingCount atomic.Int64
 }
@@ -216,7 +218,7 @@
 }
 
 // newDynamicChannelPool creates the initial channel set and starts scale workers.
-func newDynamicChannelPool(ctx context.Context, sc *sessionClient, cfg DynamicChannelPoolConfig, dial func(context.Context) (gtransport.ConnPool, error)) (*dynamicChannelPool, error) {
+func newDynamicChannelPool(ctx context.Context, sc *sessionClient, cfg DynamicChannelPoolConfig, mp metric.MeterProvider, dial func(context.Context) (gtransport.ConnPool, error)) (*dynamicChannelPool, error) {
 	cfg, err := normalizeDCPConfig(cfg)
 	if err != nil {
 		return nil, err
@@ -245,6 +247,7 @@
 		entries = append(entries, e)
 	}
 	p.entries.Store(&entries)
+	p.metrics = newDCPMetrics(p, mp)
 	go p.scaleUpWorker()
 	go p.scaleDownMonitor()
 	return p, nil
@@ -296,7 +299,11 @@
 }
 
 func (p *dynamicChannelPool) Close() error {
-	p.stopOnce.Do(func() { p.cancel(); close(p.done) })
+	p.stopOnce.Do(func() {
+		p.metrics.close(p.sc.logger)
+		p.cancel()
+		close(p.done)
+	})
 	p.dialMu.Lock()
 	defer p.dialMu.Unlock()
 	entries := p.getEntries()
@@ -655,6 +662,7 @@
 	combined = append(combined, entries...)
 	combined = append(combined, newEntries...)
 	p.entries.Store(&combined)
+	p.metrics.recordScaleUp(p.ctx, int64(len(newEntries)))
 }
 
 func closeDCPEntries(entries []*dcpEntry) {
@@ -785,7 +793,9 @@
 	}
 	p.entries.Store(&keep)
 	p.dialMu.Unlock()
-	p.drainingCount.Add(int64(len(toDrain)))
+	removed := int64(len(toDrain))
+	p.drainingCount.Add(removed)
+	p.metrics.recordScaleDown(p.ctx, removed)
 	for e := range toDrain {
 		go p.waitForDrainAndClose(e)
 	}
diff --git a/spanner/dynamic_channel_pool_test.go b/spanner/dynamic_channel_pool_test.go
index 80b0716..46f86a2 100644
--- a/spanner/dynamic_channel_pool_test.go
+++ b/spanner/dynamic_channel_pool_test.go
@@ -16,11 +16,22 @@
 
 import (
 	"context"
+	"errors"
 	"fmt"
 	"sync"
+	"sync/atomic"
 	"testing"
 	"time"
 
+	"cloud.google.com/go/spanner/internal"
+	. "cloud.google.com/go/spanner/internal/testutil"
+	"go.opentelemetry.io/otel"
+	"go.opentelemetry.io/otel/attribute"
+	"go.opentelemetry.io/otel/metric"
+	"go.opentelemetry.io/otel/metric/noop"
+	sdkmetric "go.opentelemetry.io/otel/sdk/metric"
+	"go.opentelemetry.io/otel/sdk/metric/metricdata"
+	"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
 	"golang.org/x/sync/errgroup"
 	"google.golang.org/api/iterator"
 	"google.golang.org/api/option"
@@ -28,8 +39,6 @@
 	"google.golang.org/grpc"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
-
-	. "cloud.google.com/go/spanner/internal/testutil"
 )
 
 func testDCPConfig(initial, min, max int) DynamicChannelPoolConfig {
@@ -53,9 +62,15 @@
 
 func setupDCPMockedTestServer(t *testing.T, dcp DynamicChannelPoolConfig) (*MockedSpannerInMemTestServer, *Client, func()) {
 	t.Helper()
+	return setupDCPMockedTestServerWithMeterProvider(t, dcp, nil)
+}
+
+func setupDCPMockedTestServerWithMeterProvider(t *testing.T, dcp DynamicChannelPoolConfig, mp metric.MeterProvider) (*MockedSpannerInMemTestServer, *Client, func()) {
+	t.Helper()
 	server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{
-		DisableNativeMetrics:     true,
-		DynamicChannelPoolConfig: dcp,
+		DisableNativeMetrics:       true,
+		DynamicChannelPoolConfig:   dcp,
+		OpenTelemetryMeterProvider: mp,
 	})
 	addSelect1Result(server)
 	if client.sc.dynamicPool == nil {
@@ -65,6 +80,73 @@
 	return server, client, teardown
 }
 
+func newDCPManualReader() (*sdkmetric.ManualReader, *sdkmetric.MeterProvider) {
+	reader := sdkmetric.NewManualReader()
+	mp := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader))
+	return reader, mp
+}
+
+func enableOpenTelemetryMetricsForTest(t *testing.T) {
+	t.Helper()
+	setOpenTelemetryMetricsFlag(false)
+	t.Cleanup(func() { setOpenTelemetryMetricsFlag(false) })
+	EnableOpenTelemetryMetrics()
+}
+
+func collectDCPMetrics(t *testing.T, reader *sdkmetric.ManualReader) metricdata.ResourceMetrics {
+	t.Helper()
+	rm := metricdata.ResourceMetrics{}
+	if err := reader.Collect(context.Background(), &rm); err != nil {
+		t.Fatalf("Collect() failed: %v", err)
+	}
+	return rm
+}
+
+func findDCPMetric(rm metricdata.ResourceMetrics, name string) (metricdata.Metrics, bool) {
+	for _, sm := range rm.ScopeMetrics {
+		for _, m := range sm.Metrics {
+			if m.Name == name {
+				return m, true
+			}
+		}
+	}
+	return metricdata.Metrics{}, false
+}
+
+func requireDCPMetric(t *testing.T, rm metricdata.ResourceMetrics, name string) metricdata.Metrics {
+	t.Helper()
+	m, ok := findDCPMetric(rm, name)
+	if !ok {
+		t.Fatalf("metric %q not found in %+v", name, rm.ScopeMetrics)
+	}
+	return m
+}
+
+func requireDCPGaugeValue(t *testing.T, rm metricdata.ResourceMetrics, name string, want int64, attrs []attribute.KeyValue) {
+	t.Helper()
+	m := requireDCPMetric(t, rm, name)
+	gauge, ok := m.Data.(metricdata.Gauge[int64])
+	if !ok {
+		t.Fatalf("metric %q data type = %T, want metricdata.Gauge[int64]", name, m.Data)
+	}
+	if got, want := len(gauge.DataPoints), 1; got != want {
+		t.Fatalf("metric %q datapoints = %d, want %d", name, got, want)
+	}
+	if got := gauge.DataPoints[0].Value; got != want {
+		t.Fatalf("metric %q value = %d, want %d", name, got, want)
+	}
+	metricdatatest.AssertHasAttributes[metricdata.DataPoint[int64]](t, gauge.DataPoints[0], attrs...)
+}
+
+func dcpCommonAttrs(clientID string) []attribute.KeyValue {
+	return []attribute.KeyValue{
+		attributeKeyClientID.String(clientID),
+		attributeKeyDatabase.String("[DATABASE]"),
+		attributeKeyInstance.String("[INSTANCE]"),
+		attributeKeyLibVersion.String(internal.Version),
+	}
+}
+
 func drainDCPQuery(ctx context.Context, client *Client) error {
 	iter := client.Single().Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums))
 	defer iter.Stop()
@@ -253,6 +335,232 @@
 	}
 }
 
+func TestDynamicChannelPoolOTMetricsRequireOpenTelemetryMetricsEnabled(t *testing.T) {
+	setOpenTelemetryMetricsFlag(false)
+	t.Cleanup(func() { setOpenTelemetryMetricsFlag(false) })
+	reader, mp := newDCPManualReader()
+	_, _, teardown := setupDCPMockedTestServerWithMeterProvider(t, testDCPConfig(1, 1, 2), mp)
+	defer teardown()
+
+	rm := collectDCPMetrics(t, reader)
+	if _, ok := findDCPMetric(rm, "spanner/dynamic_channel_pool/num_channels"); ok {
+		t.Fatal("DCP metric exported without EnableOpenTelemetryMetrics")
+	}
+}
+
+func TestDynamicChannelPoolOTMetricsFallbackToGlobalMeterProvider(t *testing.T) {
+	enableOpenTelemetryMetricsForTest(t)
+	reader, mp := newDCPManualReader()
+	oldMP := otel.GetMeterProvider()
+	otel.SetMeterProvider(mp)
+	t.Cleanup(func() { otel.SetMeterProvider(oldMP) })
+	_, client, teardown := setupDCPMockedTestServer(t, testDCPConfig(1, 1, 2))
+	defer teardown()
+
+	rm := collectDCPMetrics(t, reader)
+	requireDCPGaugeValue(t, rm, "spanner/dynamic_channel_pool/num_channels", 1, dcpCommonAttrs(client.ClientID()))
+}
+
+func TestDynamicChannelPoolOTMetricsObserveGaugesWithCommonAttributes(t *testing.T) {
+	enableOpenTelemetryMetricsForTest(t)
+	reader, mp := newDCPManualReader()
+	cfg := testDCPConfig(2, 1, 4)
+	cfg.DCPScaleDownCheckInterval = time.Hour
+	_, client, teardown := setupDCPMockedTestServerWithMeterProvider(t, cfg, mp)
+	defer teardown()
+	entries := client.sc.dynamicPool.getEntries()
+	entries[0].unaryLoad.Store(3)
+	entries[1].streamLoad.Store(4)
+	client.sc.dynamicPool.totalRPCLoad.Store(7)
+
+	rm := collectDCPMetrics(t, reader)
+	attrs := dcpCommonAttrs(client.ClientID())
+	requireDCPGaugeValue(t, rm, "spanner/dynamic_channel_pool/num_channels", 2, attrs)
+	requireDCPGaugeValue(t, rm, "spanner/dynamic_channel_pool/draining_channel_count", 0, attrs)
+	requireDCPGaugeValue(t, rm, "spanner/dynamic_channel_pool/max_allowed_channels", 4, attrs)
+	requireDCPGaugeValue(t, rm, "spanner/dynamic_channel_pool/active_rpc_count", 7, attrs)
+	requireDCPGaugeValue(t, rm, "spanner/dynamic_channel_pool/max_active_rpc_per_channel", 4, attrs)
+	if _, ok := findDCPMetric(rm, "spanner/dynamic_channel_pool/max_rpc_per_channel"); ok {
+		t.Fatal("exported stale max_rpc_per_channel metric, want max_active_rpc_per_channel")
+	}
+}
+
+func TestDynamicChannelPoolOTMetricsScalingCounterUsesChannelDeltaAndDirection(t *testing.T) {
+	enableOpenTelemetryMetricsForTest(t)
+	reader, mp := newDCPManualReader()
+	cfg := testDCPConfig(1, 1, 4)
+	cfg.DCPScaleDownCheckInterval = time.Hour
+	_, client, teardown := setupDCPMockedTestServerWithMeterProvider(t, cfg, mp)
+	defer teardown()
+	p := client.sc.dynamicPool
+	p.setPrimeSession(client.sm.multiplexedSession.id)
+	p.getEntries()[0].unaryLoad.Store(3)
+	p.totalRPCLoad.Store(3)
+	p.scaleUp()
+	p.getEntries()[0].unaryLoad.Store(0)
+	p.totalRPCLoad.Store(0)
+	p.removeEntries(1)
+
+	rm := collectDCPMetrics(t, reader)
+	m := requireDCPMetric(t, rm, "spanner/dynamic_channel_pool/channel_pool_scaling")
+	sum, ok := m.Data.(metricdata.Sum[int64])
+	if !ok {
+		t.Fatalf("channel_pool_scaling data type = %T, want metricdata.Sum[int64]", m.Data)
+	}
+	attrs := dcpCommonAttrs(client.ClientID())
+	want := map[string]int64{"up": 2, "down": 1}
+	if got, want := len(sum.DataPoints), 2; got != want {
+		t.Fatalf("channel_pool_scaling datapoints = %d, want %d: %+v", got, want, sum.DataPoints)
+	}
+	for _, dp := range sum.DataPoints {
+		metricdatatest.AssertHasAttributes[metricdata.DataPoint[int64]](t, dp, attrs...)
+		direction, ok := dp.Attributes.Value(attribute.Key("direction"))
+		if !ok {
+			t.Fatalf("channel_pool_scaling datapoint missing direction attr: %+v", dp)
+		}
+		directionValue := direction.AsString()
+		if got, ok := want[directionValue]; !ok || dp.Value != got {
+			t.Fatalf("channel_pool_scaling{%s} = %d, want map %v", directionValue, dp.Value, want)
+		}
+		delete(want, directionValue)
+	}
+	if len(want) != 0 {
+		t.Fatalf("missing channel_pool_scaling directions: %v", want)
+	}
+}
+
+func TestDynamicChannelPoolOTMetricsCloseUnregistersCallback(t *testing.T) {
+	enableOpenTelemetryMetricsForTest(t)
+	reader, mp := newDCPManualReader()
+	cfg := testDCPConfig(1, 1, 2)
+	cfg.DCPScaleDownCheckInterval = time.Hour
+	_, client, teardown := setupDCPMockedTestServerWithMeterProvider(t, cfg, mp)
+	defer teardown()
+
+	rm := collectDCPMetrics(t, reader)
+	requireDCPGaugeValue(t, rm, "spanner/dynamic_channel_pool/num_channels", 1, dcpCommonAttrs(client.ClientID()))
+	client.sc.dynamicPool.Close()
+
+	rm = collectDCPMetrics(t, reader)
+	if _, ok := findDCPMetric(rm, "spanner/dynamic_channel_pool/num_channels"); ok {
+		t.Fatal("DCP metric still exported after dynamicChannelPool.Close")
+	}
+}
+
+func TestDynamicChannelPoolOTMetricsInstrumentErrorsDisableMetrics(t *testing.T) {
+	enableOpenTelemetryMetricsForTest(t)
+	_, client, teardown := setupDCPMockedTestServer(t, testDCPConfig(1, 1, 2))
+	defer teardown()
+	p := client.sc.dynamicPool
+
+	gaugeFailure := &failingDCPMeterProvider{meter: &failingDCPMeter{failGaugeName: dcpMetricsPrefix + "num_channels"}}
+	if got := newDCPMetrics(p, gaugeFailure); got != nil {
+		t.Fatalf("newDCPMetrics with gauge registration failure = %+v, want nil", got)
+	}
+
+	counterFailure := &failingDCPMeterProvider{meter: &failingDCPMeter{failCounterName: dcpMetricsPrefix + "channel_pool_scaling"}}
+	if got := newDCPMetrics(p, counterFailure); got != nil {
+		t.Fatalf("newDCPMetrics with counter registration failure = %+v, want nil", got)
+	}
+}
+
+func TestDynamicChannelPoolOTMetricsRecordScalingNoopsWithoutCounter(t *testing.T) {
+	defer func() {
+		if r := recover(); r != nil {
+			t.Fatalf("recordScaling with nil counter panicked: %v", r)
+		}
+	}()
+	(&dcpMetrics{}).recordScaling(context.Background(), 1, "up")
+}
+
+func TestDynamicChannelPoolCloseUnregistersMetricsOnce(t *testing.T) {
+	cfg := testDCPConfig(1, 1, 2)
+	cfg.DCPScaleDownCheckInterval = time.Hour
+	_, client, teardown := setupDCPMockedTestServer(t, cfg)
+	defer teardown()
+	reg := newBlockingMetricRegistration()
+	client.sc.dynamicPool.metrics = &dcpMetrics{registration: reg}
+
+	var wg sync.WaitGroup
+	for i := 0; i < 2; i++ {
+		wg.Add(1)
+		go func() {
+			defer wg.Done()
+			_ = client.sc.dynamicPool.Close()
+		}()
+	}
+
+	select {
+	case <-reg.entered:
+	case <-time.After(time.Second):
+		t.Fatal("timed out waiting for metric unregistration")
+	}
+	calledTwice := false
+	select {
+	case <-reg.entered:
+		calledTwice = true
+	case <-time.After(20 * time.Millisecond):
+	}
+	close(reg.release)
+	wg.Wait()
+	if calledTwice {
+		t.Fatal("metric unregistration called more than once")
+	}
+	if got := reg.count.Load(); got != 1 {
+		t.Fatalf("metric unregistration count = %d, want 1", got)
+	}
+}
+
+type failingDCPMeterProvider struct {
+	noop.MeterProvider
+	meter metric.Meter
+}
+
+func (p *failingDCPMeterProvider) Meter(string, ...metric.MeterOption) metric.Meter {
+	return p.meter
+}
+
+type failingDCPMeter struct {
+	noop.Meter
+	failGaugeName   string
+	failCounterName string
+}
+
+func (m *failingDCPMeter) Int64ObservableGauge(name string, opts ...metric.Int64ObservableGaugeOption) (metric.Int64ObservableGauge, error) {
+	if name == m.failGaugeName {
+		return nil, errors.New("test gauge registration failure")
+	}
+	return m.Meter.Int64ObservableGauge(name, opts...)
+}
+
+func (m *failingDCPMeter) Int64Counter(name string, opts ...metric.Int64CounterOption) (metric.Int64Counter, error) {
+	if name == m.failCounterName {
+		return nil, errors.New("test counter registration failure")
+	}
+	return m.Meter.Int64Counter(name, opts...)
+}
+
+type blockingMetricRegistration struct {
+	noop.Registration
+	entered chan struct{}
+	release chan struct{}
+	count   atomic.Int64
+}
+
+func newBlockingMetricRegistration() *blockingMetricRegistration {
+	return &blockingMetricRegistration{
+		entered: make(chan struct{}, 2),
+		release: make(chan struct{}),
+	}
+}
+
+func (r *blockingMetricRegistration) Unregister() error {
+	r.count.Add(1)
+	r.entered <- struct{}{}
+	<-r.release
+	return nil
+}
+
 type fakeDCPConnPool struct {
 	invokeErr   error
 	invokeCount int