feat(spanner): add OpenTelemetry metrics for dynamic channel pool (#14613)
Split of https://github.com/googleapis/google-cloud-go/pull/14604
Internal reference: go/go-dcp-design
## Add OpenTelemetry metrics for the dynamic channel pool
### What
Adds OpenTelemetry metric support for the dynamic channel pool (DCP),
giving visibility into pool size, scaling activity, and RPC load.
### Metrics (prefix `spanner/dynamic_channel_pool/`)
| Metric | Type | Description |
| --- | --- | --- |
| `num_channels` | gauge | Active channels in the pool |
| `draining_channel_count` | gauge | Channels currently draining |
| `max_allowed_channels` | gauge | Max channels allowed |
| `active_rpc_count` | gauge | Active RPCs on the pool |
| `max_active_rpc_per_channel ` | gauge | Maximum number of RPCs
currently active on any channel in the dynamic channel pool.|
| `channel_pool_scaling` | counter | Channels added/removed, tagged
`direction=up\|down` |
All metrics carry the common attributes `client_id`, `database`,
`instance_id`, `library_version`.
### Enablement
Consistent with existing client metrics: emitted only when
`EnableOpenTelemetryMetrics()` is set. Uses
`ClientConfig.OpenTelemetryMeterProvider`, falling back to the global
OpenTelemetry MeterProvider when unset. The observable callback is
unregistered on `dynamicChannelPool.Close()`.
diff --git a/spanner/client.go b/spanner/client.go
index b9e0833..cc2ec01 100644
--- a/spanner/client.go
+++ b/spanner/client.go
@@ -498,7 +498,7 @@
dial := func(dialCtx context.Context) (gtransport.ConnPool, error) {
return gtransport.DialPool(dialCtx, allClientOpts(1, config.Compression, config.EnableDirectAccess, dcpOpts...)...)
}
- dcp, err := newDynamicChannelPool(ctx, sc, config.DynamicChannelPoolConfig, dial)
+ dcp, err := newDynamicChannelPool(ctx, sc, config.DynamicChannelPoolConfig, config.OpenTelemetryMeterProvider, dial)
if err != nil {
return nil, nil, err
}
diff --git a/spanner/dcp_metrics.go b/spanner/dcp_metrics.go
new file mode 100644
index 0000000..8cbc3bd
--- /dev/null
+++ b/spanner/dcp_metrics.go
@@ -0,0 +1,159 @@
+// Copyright 2026 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package spanner
+
+import (
+ "context"
+ "log"
+
+ "cloud.google.com/go/spanner/internal"
+ "go.opentelemetry.io/otel"
+ "go.opentelemetry.io/otel/attribute"
+ "go.opentelemetry.io/otel/metric"
+)
+
+const dcpMetricsPrefix = metricsPrefix + "dynamic_channel_pool/"
+
+var attributeKeyDCPDirection = attribute.Key("direction")
+
+type dcpMetrics struct {
+ attrs []attribute.KeyValue
+
+ numChannels metric.Int64ObservableGauge
+ drainingChannelCount metric.Int64ObservableGauge
+ maxAllowedChannels metric.Int64ObservableGauge
+ activeRPCCount metric.Int64ObservableGauge
+ maxActiveRPCPerChannel metric.Int64ObservableGauge
+ channelPoolScaling metric.Int64Counter
+ registration metric.Registration
+}
+
+func newDCPMetrics(p *dynamicChannelPool, mp metric.MeterProvider) *dcpMetrics {
+ if !IsOpenTelemetryMetricsEnabled() {
+ return nil
+ }
+ if mp == nil {
+ mp = otel.GetMeterProvider()
+ }
+ _, instance, database, err := parseDatabaseName(p.sc.database)
+ if err != nil {
+ logf(p.sc.logger, "spanner_dcp: failed to parse database name for OpenTelemetry metrics: %v", err)
+ return nil
+ }
+ m := &dcpMetrics{
+ attrs: []attribute.KeyValue{
+ attributeKeyClientID.String(p.sc.id),
+ attributeKeyDatabase.String(database),
+ attributeKeyInstance.String(instance),
+ attributeKeyLibVersion.String(internal.Version),
+ },
+ }
+ meter := mp.Meter(OtInstrumentationScope, metric.WithInstrumentationVersion(internal.Version))
+ if m.numChannels, err = dcpInt64ObservableGauge(meter, p.sc.logger, "num_channels", "Number of active channels currently in the dynamic channel pool.", "{channel}"); err != nil {
+ return nil
+ }
+ if m.drainingChannelCount, err = dcpInt64ObservableGauge(meter, p.sc.logger, "draining_channel_count", "Number of channels currently draining in the dynamic channel pool.", "{channel}"); err != nil {
+ return nil
+ }
+ if m.maxAllowedChannels, err = dcpInt64ObservableGauge(meter, p.sc.logger, "max_allowed_channels", "Maximum number of channels allowed in the dynamic channel pool.", "{channel}"); err != nil {
+ return nil
+ }
+ if m.activeRPCCount, err = dcpInt64ObservableGauge(meter, p.sc.logger, "active_rpc_count", "Number of RPCs currently active on the dynamic channel pool.", "{rpc}"); err != nil {
+ return nil
+ }
+ if m.maxActiveRPCPerChannel, err = dcpInt64ObservableGauge(meter, p.sc.logger, "max_active_rpc_per_channel", "Maximum number of RPCs currently active on any channel in the dynamic channel pool.", "{rpc}"); err != nil {
+ return nil
+ }
+
+ m.channelPoolScaling, err = meter.Int64Counter(
+ dcpMetricsPrefix+"channel_pool_scaling",
+ metric.WithDescription("Number of channels added or removed by dynamic channel pool scaling."),
+ metric.WithUnit("{channel}"),
+ )
+ if err != nil {
+ logf(p.sc.logger, "Error during registering instrument for metric %s, error: %v", dcpMetricsPrefix+"channel_pool_scaling", err)
+ return nil
+ }
+
+ reg, err := meter.RegisterCallback(
+ func(ctx context.Context, o metric.Observer) error {
+ attrs := metric.WithAttributes(m.attrs...)
+ o.ObserveInt64(m.numChannels, int64(p.Num()), attrs)
+ o.ObserveInt64(m.drainingChannelCount, p.drainingCount.Load(), attrs)
+ o.ObserveInt64(m.maxAllowedChannels, int64(p.cfg.DCPMaxChannels), attrs)
+ o.ObserveInt64(m.activeRPCCount, int64(p.totalRPCLoad.Load()), attrs)
+ o.ObserveInt64(m.maxActiveRPCPerChannel, p.maxActiveRPCPerChannel(), attrs)
+ return nil
+ },
+ m.numChannels,
+ m.drainingChannelCount,
+ m.maxAllowedChannels,
+ m.activeRPCCount,
+ m.maxActiveRPCPerChannel,
+ )
+ if err != nil {
+ logf(p.sc.logger, "spanner_dcp: failed to register OpenTelemetry metric callback: %v", err)
+ return nil
+ }
+ m.registration = reg
+ return m
+}
+
+func (p *dynamicChannelPool) maxActiveRPCPerChannel() int64 {
+ var max int32
+ for _, e := range p.getEntries() {
+ if load := e.rpcLoad(); load > max {
+ max = load
+ }
+ }
+ return int64(max)
+}
+
+func dcpInt64ObservableGauge(meter metric.Meter, logger *log.Logger, name, desc, unit string) (metric.Int64ObservableGauge, error) {
+ fullName := dcpMetricsPrefix + name
+ instrument, err := meter.Int64ObservableGauge(fullName, metric.WithDescription(desc), metric.WithUnit(unit))
+ if err != nil {
+ logf(logger, "Error during registering instrument for metric %s, error: %v", fullName, err)
+ }
+ return instrument, err
+}
+
+func (m *dcpMetrics) recordScaleUp(ctx context.Context, channels int64) {
+ m.recordScaling(ctx, channels, "up")
+}
+
+func (m *dcpMetrics) recordScaleDown(ctx context.Context, channels int64) {
+ m.recordScaling(ctx, channels, "down")
+}
+
+func (m *dcpMetrics) recordScaling(ctx context.Context, channels int64, direction string) {
+ if m == nil || m.channelPoolScaling == nil || channels <= 0 {
+ return
+ }
+ attrs := make([]attribute.KeyValue, 0, len(m.attrs)+1)
+ attrs = append(attrs, m.attrs...)
+ attrs = append(attrs, attributeKeyDCPDirection.String(direction))
+ m.channelPoolScaling.Add(ctx, channels, metric.WithAttributes(attrs...))
+}
+
+func (m *dcpMetrics) close(logger *log.Logger) {
+ if m == nil || m.registration == nil {
+ return
+ }
+ if err := m.registration.Unregister(); err != nil {
+ logf(logger, "Failed to unregister callback from the OpenTelemetry meter, error : %v", err)
+ }
+ m.registration = nil
+}
diff --git a/spanner/dynamic_channel_pool.go b/spanner/dynamic_channel_pool.go
index 6b1e23b..27cc897 100644
--- a/spanner/dynamic_channel_pool.go
+++ b/spanner/dynamic_channel_pool.go
@@ -28,6 +28,7 @@
vkit "cloud.google.com/go/spanner/apiv1"
"cloud.google.com/go/spanner/apiv1/spannerpb"
"github.com/googleapis/gax-go/v2"
+ "go.opentelemetry.io/otel/metric"
gtransport "google.golang.org/api/transport/grpc"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
@@ -197,6 +198,7 @@
lowLoadRuns int
monitorMu sync.Mutex
primeSession atomic.Value // string
+ metrics *dcpMetrics
drainingCount atomic.Int64
}
@@ -216,7 +218,7 @@
}
// newDynamicChannelPool creates the initial channel set and starts scale workers.
-func newDynamicChannelPool(ctx context.Context, sc *sessionClient, cfg DynamicChannelPoolConfig, dial func(context.Context) (gtransport.ConnPool, error)) (*dynamicChannelPool, error) {
+func newDynamicChannelPool(ctx context.Context, sc *sessionClient, cfg DynamicChannelPoolConfig, mp metric.MeterProvider, dial func(context.Context) (gtransport.ConnPool, error)) (*dynamicChannelPool, error) {
cfg, err := normalizeDCPConfig(cfg)
if err != nil {
return nil, err
@@ -245,6 +247,7 @@
entries = append(entries, e)
}
p.entries.Store(&entries)
+ p.metrics = newDCPMetrics(p, mp)
go p.scaleUpWorker()
go p.scaleDownMonitor()
return p, nil
@@ -296,7 +299,11 @@
}
func (p *dynamicChannelPool) Close() error {
- p.stopOnce.Do(func() { p.cancel(); close(p.done) })
+ p.stopOnce.Do(func() {
+ p.metrics.close(p.sc.logger)
+ p.cancel()
+ close(p.done)
+ })
p.dialMu.Lock()
defer p.dialMu.Unlock()
entries := p.getEntries()
@@ -655,6 +662,7 @@
combined = append(combined, entries...)
combined = append(combined, newEntries...)
p.entries.Store(&combined)
+ p.metrics.recordScaleUp(p.ctx, int64(len(newEntries)))
}
func closeDCPEntries(entries []*dcpEntry) {
@@ -785,7 +793,9 @@
}
p.entries.Store(&keep)
p.dialMu.Unlock()
- p.drainingCount.Add(int64(len(toDrain)))
+ removed := int64(len(toDrain))
+ p.drainingCount.Add(removed)
+ p.metrics.recordScaleDown(p.ctx, removed)
for e := range toDrain {
go p.waitForDrainAndClose(e)
}
diff --git a/spanner/dynamic_channel_pool_test.go b/spanner/dynamic_channel_pool_test.go
index 80b0716..46f86a2 100644
--- a/spanner/dynamic_channel_pool_test.go
+++ b/spanner/dynamic_channel_pool_test.go
@@ -16,11 +16,22 @@
import (
"context"
+ "errors"
"fmt"
"sync"
+ "sync/atomic"
"testing"
"time"
+ "cloud.google.com/go/spanner/internal"
+ . "cloud.google.com/go/spanner/internal/testutil"
+ "go.opentelemetry.io/otel"
+ "go.opentelemetry.io/otel/attribute"
+ "go.opentelemetry.io/otel/metric"
+ "go.opentelemetry.io/otel/metric/noop"
+ sdkmetric "go.opentelemetry.io/otel/sdk/metric"
+ "go.opentelemetry.io/otel/sdk/metric/metricdata"
+ "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
"golang.org/x/sync/errgroup"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
@@ -28,8 +39,6 @@
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
-
- . "cloud.google.com/go/spanner/internal/testutil"
)
func testDCPConfig(initial, min, max int) DynamicChannelPoolConfig {
@@ -53,9 +62,15 @@
func setupDCPMockedTestServer(t *testing.T, dcp DynamicChannelPoolConfig) (*MockedSpannerInMemTestServer, *Client, func()) {
t.Helper()
+ return setupDCPMockedTestServerWithMeterProvider(t, dcp, nil)
+}
+
+func setupDCPMockedTestServerWithMeterProvider(t *testing.T, dcp DynamicChannelPoolConfig, mp metric.MeterProvider) (*MockedSpannerInMemTestServer, *Client, func()) {
+ t.Helper()
server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{
- DisableNativeMetrics: true,
- DynamicChannelPoolConfig: dcp,
+ DisableNativeMetrics: true,
+ DynamicChannelPoolConfig: dcp,
+ OpenTelemetryMeterProvider: mp,
})
addSelect1Result(server)
if client.sc.dynamicPool == nil {
@@ -65,6 +80,73 @@
return server, client, teardown
}
+func newDCPManualReader() (*sdkmetric.ManualReader, *sdkmetric.MeterProvider) {
+ reader := sdkmetric.NewManualReader()
+ mp := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader))
+ return reader, mp
+}
+
+func enableOpenTelemetryMetricsForTest(t *testing.T) {
+ t.Helper()
+ setOpenTelemetryMetricsFlag(false)
+ t.Cleanup(func() { setOpenTelemetryMetricsFlag(false) })
+ EnableOpenTelemetryMetrics()
+}
+
+func collectDCPMetrics(t *testing.T, reader *sdkmetric.ManualReader) metricdata.ResourceMetrics {
+ t.Helper()
+ rm := metricdata.ResourceMetrics{}
+ if err := reader.Collect(context.Background(), &rm); err != nil {
+ t.Fatalf("Collect() failed: %v", err)
+ }
+ return rm
+}
+
+func findDCPMetric(rm metricdata.ResourceMetrics, name string) (metricdata.Metrics, bool) {
+ for _, sm := range rm.ScopeMetrics {
+ for _, m := range sm.Metrics {
+ if m.Name == name {
+ return m, true
+ }
+ }
+ }
+ return metricdata.Metrics{}, false
+}
+
+func requireDCPMetric(t *testing.T, rm metricdata.ResourceMetrics, name string) metricdata.Metrics {
+ t.Helper()
+ m, ok := findDCPMetric(rm, name)
+ if !ok {
+ t.Fatalf("metric %q not found in %+v", name, rm.ScopeMetrics)
+ }
+ return m
+}
+
+func requireDCPGaugeValue(t *testing.T, rm metricdata.ResourceMetrics, name string, want int64, attrs []attribute.KeyValue) {
+ t.Helper()
+ m := requireDCPMetric(t, rm, name)
+ gauge, ok := m.Data.(metricdata.Gauge[int64])
+ if !ok {
+ t.Fatalf("metric %q data type = %T, want metricdata.Gauge[int64]", name, m.Data)
+ }
+ if got, want := len(gauge.DataPoints), 1; got != want {
+ t.Fatalf("metric %q datapoints = %d, want %d", name, got, want)
+ }
+ if got := gauge.DataPoints[0].Value; got != want {
+ t.Fatalf("metric %q value = %d, want %d", name, got, want)
+ }
+ metricdatatest.AssertHasAttributes[metricdata.DataPoint[int64]](t, gauge.DataPoints[0], attrs...)
+}
+
+func dcpCommonAttrs(clientID string) []attribute.KeyValue {
+ return []attribute.KeyValue{
+ attributeKeyClientID.String(clientID),
+ attributeKeyDatabase.String("[DATABASE]"),
+ attributeKeyInstance.String("[INSTANCE]"),
+ attributeKeyLibVersion.String(internal.Version),
+ }
+}
+
func drainDCPQuery(ctx context.Context, client *Client) error {
iter := client.Single().Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums))
defer iter.Stop()
@@ -253,6 +335,232 @@
}
}
+func TestDynamicChannelPoolOTMetricsRequireOpenTelemetryMetricsEnabled(t *testing.T) {
+ setOpenTelemetryMetricsFlag(false)
+ t.Cleanup(func() { setOpenTelemetryMetricsFlag(false) })
+ reader, mp := newDCPManualReader()
+ _, _, teardown := setupDCPMockedTestServerWithMeterProvider(t, testDCPConfig(1, 1, 2), mp)
+ defer teardown()
+
+ rm := collectDCPMetrics(t, reader)
+ if _, ok := findDCPMetric(rm, "spanner/dynamic_channel_pool/num_channels"); ok {
+ t.Fatal("DCP metric exported without EnableOpenTelemetryMetrics")
+ }
+}
+
+func TestDynamicChannelPoolOTMetricsFallbackToGlobalMeterProvider(t *testing.T) {
+ enableOpenTelemetryMetricsForTest(t)
+ reader, mp := newDCPManualReader()
+ oldMP := otel.GetMeterProvider()
+ otel.SetMeterProvider(mp)
+ t.Cleanup(func() { otel.SetMeterProvider(oldMP) })
+ _, client, teardown := setupDCPMockedTestServer(t, testDCPConfig(1, 1, 2))
+ defer teardown()
+
+ rm := collectDCPMetrics(t, reader)
+ requireDCPGaugeValue(t, rm, "spanner/dynamic_channel_pool/num_channels", 1, dcpCommonAttrs(client.ClientID()))
+}
+
+func TestDynamicChannelPoolOTMetricsObserveGaugesWithCommonAttributes(t *testing.T) {
+ enableOpenTelemetryMetricsForTest(t)
+ reader, mp := newDCPManualReader()
+ cfg := testDCPConfig(2, 1, 4)
+ cfg.DCPScaleDownCheckInterval = time.Hour
+ _, client, teardown := setupDCPMockedTestServerWithMeterProvider(t, cfg, mp)
+ defer teardown()
+ entries := client.sc.dynamicPool.getEntries()
+ entries[0].unaryLoad.Store(3)
+ entries[1].streamLoad.Store(4)
+ client.sc.dynamicPool.totalRPCLoad.Store(7)
+
+ rm := collectDCPMetrics(t, reader)
+ attrs := dcpCommonAttrs(client.ClientID())
+ requireDCPGaugeValue(t, rm, "spanner/dynamic_channel_pool/num_channels", 2, attrs)
+ requireDCPGaugeValue(t, rm, "spanner/dynamic_channel_pool/draining_channel_count", 0, attrs)
+ requireDCPGaugeValue(t, rm, "spanner/dynamic_channel_pool/max_allowed_channels", 4, attrs)
+ requireDCPGaugeValue(t, rm, "spanner/dynamic_channel_pool/active_rpc_count", 7, attrs)
+ requireDCPGaugeValue(t, rm, "spanner/dynamic_channel_pool/max_active_rpc_per_channel", 4, attrs)
+ if _, ok := findDCPMetric(rm, "spanner/dynamic_channel_pool/max_rpc_per_channel"); ok {
+ t.Fatal("exported stale max_rpc_per_channel metric, want max_active_rpc_per_channel")
+ }
+}
+
+func TestDynamicChannelPoolOTMetricsScalingCounterUsesChannelDeltaAndDirection(t *testing.T) {
+ enableOpenTelemetryMetricsForTest(t)
+ reader, mp := newDCPManualReader()
+ cfg := testDCPConfig(1, 1, 4)
+ cfg.DCPScaleDownCheckInterval = time.Hour
+ _, client, teardown := setupDCPMockedTestServerWithMeterProvider(t, cfg, mp)
+ defer teardown()
+ p := client.sc.dynamicPool
+ p.setPrimeSession(client.sm.multiplexedSession.id)
+ p.getEntries()[0].unaryLoad.Store(3)
+ p.totalRPCLoad.Store(3)
+ p.scaleUp()
+ p.getEntries()[0].unaryLoad.Store(0)
+ p.totalRPCLoad.Store(0)
+ p.removeEntries(1)
+
+ rm := collectDCPMetrics(t, reader)
+ m := requireDCPMetric(t, rm, "spanner/dynamic_channel_pool/channel_pool_scaling")
+ sum, ok := m.Data.(metricdata.Sum[int64])
+ if !ok {
+ t.Fatalf("channel_pool_scaling data type = %T, want metricdata.Sum[int64]", m.Data)
+ }
+ attrs := dcpCommonAttrs(client.ClientID())
+ want := map[string]int64{"up": 2, "down": 1}
+ if got, want := len(sum.DataPoints), 2; got != want {
+ t.Fatalf("channel_pool_scaling datapoints = %d, want %d: %+v", got, want, sum.DataPoints)
+ }
+ for _, dp := range sum.DataPoints {
+ metricdatatest.AssertHasAttributes[metricdata.DataPoint[int64]](t, dp, attrs...)
+ direction, ok := dp.Attributes.Value(attribute.Key("direction"))
+ if !ok {
+ t.Fatalf("channel_pool_scaling datapoint missing direction attr: %+v", dp)
+ }
+ directionValue := direction.AsString()
+ if got, ok := want[directionValue]; !ok || dp.Value != got {
+ t.Fatalf("channel_pool_scaling{%s} = %d, want map %v", directionValue, dp.Value, want)
+ }
+ delete(want, directionValue)
+ }
+ if len(want) != 0 {
+ t.Fatalf("missing channel_pool_scaling directions: %v", want)
+ }
+}
+
+func TestDynamicChannelPoolOTMetricsCloseUnregistersCallback(t *testing.T) {
+ enableOpenTelemetryMetricsForTest(t)
+ reader, mp := newDCPManualReader()
+ cfg := testDCPConfig(1, 1, 2)
+ cfg.DCPScaleDownCheckInterval = time.Hour
+ _, client, teardown := setupDCPMockedTestServerWithMeterProvider(t, cfg, mp)
+ defer teardown()
+
+ rm := collectDCPMetrics(t, reader)
+ requireDCPGaugeValue(t, rm, "spanner/dynamic_channel_pool/num_channels", 1, dcpCommonAttrs(client.ClientID()))
+ client.sc.dynamicPool.Close()
+
+ rm = collectDCPMetrics(t, reader)
+ if _, ok := findDCPMetric(rm, "spanner/dynamic_channel_pool/num_channels"); ok {
+ t.Fatal("DCP metric still exported after dynamicChannelPool.Close")
+ }
+}
+
+func TestDynamicChannelPoolOTMetricsInstrumentErrorsDisableMetrics(t *testing.T) {
+ enableOpenTelemetryMetricsForTest(t)
+ _, client, teardown := setupDCPMockedTestServer(t, testDCPConfig(1, 1, 2))
+ defer teardown()
+ p := client.sc.dynamicPool
+
+ gaugeFailure := &failingDCPMeterProvider{meter: &failingDCPMeter{failGaugeName: dcpMetricsPrefix + "num_channels"}}
+ if got := newDCPMetrics(p, gaugeFailure); got != nil {
+ t.Fatalf("newDCPMetrics with gauge registration failure = %+v, want nil", got)
+ }
+
+ counterFailure := &failingDCPMeterProvider{meter: &failingDCPMeter{failCounterName: dcpMetricsPrefix + "channel_pool_scaling"}}
+ if got := newDCPMetrics(p, counterFailure); got != nil {
+ t.Fatalf("newDCPMetrics with counter registration failure = %+v, want nil", got)
+ }
+}
+
+func TestDynamicChannelPoolOTMetricsRecordScalingNoopsWithoutCounter(t *testing.T) {
+ defer func() {
+ if r := recover(); r != nil {
+ t.Fatalf("recordScaling with nil counter panicked: %v", r)
+ }
+ }()
+ (&dcpMetrics{}).recordScaling(context.Background(), 1, "up")
+}
+
+func TestDynamicChannelPoolCloseUnregistersMetricsOnce(t *testing.T) {
+ cfg := testDCPConfig(1, 1, 2)
+ cfg.DCPScaleDownCheckInterval = time.Hour
+ _, client, teardown := setupDCPMockedTestServer(t, cfg)
+ defer teardown()
+ reg := newBlockingMetricRegistration()
+ client.sc.dynamicPool.metrics = &dcpMetrics{registration: reg}
+
+ var wg sync.WaitGroup
+ for i := 0; i < 2; i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ _ = client.sc.dynamicPool.Close()
+ }()
+ }
+
+ select {
+ case <-reg.entered:
+ case <-time.After(time.Second):
+ t.Fatal("timed out waiting for metric unregistration")
+ }
+ calledTwice := false
+ select {
+ case <-reg.entered:
+ calledTwice = true
+ case <-time.After(20 * time.Millisecond):
+ }
+ close(reg.release)
+ wg.Wait()
+ if calledTwice {
+ t.Fatal("metric unregistration called more than once")
+ }
+ if got := reg.count.Load(); got != 1 {
+ t.Fatalf("metric unregistration count = %d, want 1", got)
+ }
+}
+
+type failingDCPMeterProvider struct {
+ noop.MeterProvider
+ meter metric.Meter
+}
+
+func (p *failingDCPMeterProvider) Meter(string, ...metric.MeterOption) metric.Meter {
+ return p.meter
+}
+
+type failingDCPMeter struct {
+ noop.Meter
+ failGaugeName string
+ failCounterName string
+}
+
+func (m *failingDCPMeter) Int64ObservableGauge(name string, opts ...metric.Int64ObservableGaugeOption) (metric.Int64ObservableGauge, error) {
+ if name == m.failGaugeName {
+ return nil, errors.New("test gauge registration failure")
+ }
+ return m.Meter.Int64ObservableGauge(name, opts...)
+}
+
+func (m *failingDCPMeter) Int64Counter(name string, opts ...metric.Int64CounterOption) (metric.Int64Counter, error) {
+ if name == m.failCounterName {
+ return nil, errors.New("test counter registration failure")
+ }
+ return m.Meter.Int64Counter(name, opts...)
+}
+
+type blockingMetricRegistration struct {
+ noop.Registration
+ entered chan struct{}
+ release chan struct{}
+ count atomic.Int64
+}
+
+func newBlockingMetricRegistration() *blockingMetricRegistration {
+ return &blockingMetricRegistration{
+ entered: make(chan struct{}, 2),
+ release: make(chan struct{}),
+ }
+}
+
+func (r *blockingMetricRegistration) Unregister() error {
+ r.count.Add(1)
+ r.entered <- struct{}{}
+ <-r.release
+ return nil
+}
+
type fakeDCPConnPool struct {
invokeErr error
invokeCount int