blob: 8214f12236915367a68887092309ecee858699cf [file] [log] [blame]
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package service
import (
"context"
"fmt"
"net"
"strconv"
"strings"
"testing"
"time"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/protobuf/testing/protocmp"
"google.golang.org/protobuf/types/known/timestamppb"
channelzpb "google.golang.org/grpc/channelz/grpc_channelz_v1"
)
func init() {
channelz.TurnOn()
}
type s struct {
grpctest.Tester
}
func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}
const defaultTestTimeout = 10 * time.Second
func channelProtoToStruct(c *channelzpb.Channel) (*channelz.ChannelMetrics, error) {
cm := &channelz.ChannelMetrics{}
pdata := c.GetData()
var s connectivity.State
switch pdata.GetState().GetState() {
case channelzpb.ChannelConnectivityState_UNKNOWN:
// TODO: what should we set here?
case channelzpb.ChannelConnectivityState_IDLE:
s = connectivity.Idle
case channelzpb.ChannelConnectivityState_CONNECTING:
s = connectivity.Connecting
case channelzpb.ChannelConnectivityState_READY:
s = connectivity.Ready
case channelzpb.ChannelConnectivityState_TRANSIENT_FAILURE:
s = connectivity.TransientFailure
case channelzpb.ChannelConnectivityState_SHUTDOWN:
s = connectivity.Shutdown
}
cm.State.Store(&s)
tgt := pdata.GetTarget()
cm.Target.Store(&tgt)
cm.CallsStarted.Store(pdata.CallsStarted)
cm.CallsSucceeded.Store(pdata.CallsSucceeded)
cm.CallsFailed.Store(pdata.CallsFailed)
if err := pdata.GetLastCallStartedTimestamp().CheckValid(); err != nil {
return nil, err
}
cm.LastCallStartedTimestamp.Store(int64(pdata.GetLastCallStartedTimestamp().AsTime().UnixNano()))
return cm, nil
}
func convertSocketRefSliceToMap(sktRefs []*channelzpb.SocketRef) map[int64]string {
m := make(map[int64]string)
for _, sr := range sktRefs {
m[sr.SocketId] = sr.Name
}
return m
}
type OtherSecurityValue struct {
LocalCertificate []byte `protobuf:"bytes,1,opt,name=local_certificate,json=localCertificate,proto3" json:"local_certificate,omitempty"`
RemoteCertificate []byte `protobuf:"bytes,2,opt,name=remote_certificate,json=remoteCertificate,proto3" json:"remote_certificate,omitempty"`
}
func (m *OtherSecurityValue) Reset() { *m = OtherSecurityValue{} }
func (m *OtherSecurityValue) String() string { return proto.CompactTextString(m) }
func (*OtherSecurityValue) ProtoMessage() {}
func init() {
// Ad-hoc registering the proto type here to facilitate UnmarshalAny of OtherSecurityValue.
proto.RegisterType((*OtherSecurityValue)(nil), "grpc.credentials.OtherChannelzSecurityValue")
}
func (s) TestGetTopChannels(t *testing.T) {
tcs := []*channelz.ChannelMetrics{
channelz.NewChannelMetricForTesting(
connectivity.Connecting,
"test.channelz:1234",
6,
2,
3,
time.Now().UTC().UnixNano(),
),
channelz.NewChannelMetricForTesting(
connectivity.Connecting,
"test.channelz:1234",
1,
2,
3,
time.Now().UTC().UnixNano(),
),
channelz.NewChannelMetricForTesting(
connectivity.Shutdown,
"test.channelz:8888",
0,
0,
0,
0,
),
}
for _, c := range tcs {
cz := channelz.RegisterChannel(nil, "test channel")
cz.ChannelMetrics.CopyFrom(c)
defer channelz.RemoveEntry(cz.ID)
}
s := newCZServer()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
resp, _ := s.GetTopChannels(ctx, &channelzpb.GetTopChannelsRequest{StartChannelId: 0})
if !resp.GetEnd() {
t.Fatalf("resp.GetEnd() want true, got %v", resp.GetEnd())
}
for i, c := range resp.GetChannel() {
channel, err := channelProtoToStruct(c)
if err != nil {
t.Fatal(err)
}
if diff := cmp.Diff(tcs[i], channel, protocmp.Transform()); diff != "" {
t.Fatalf("unexpected channel, diff (-want +got):\n%s", diff)
}
}
for i := 0; i < 50; i++ {
cz := channelz.RegisterChannel(nil, "")
defer channelz.RemoveEntry(cz.ID)
}
resp, _ = s.GetTopChannels(ctx, &channelzpb.GetTopChannelsRequest{StartChannelId: 0})
if resp.GetEnd() {
t.Fatalf("resp.GetEnd() want false, got %v", resp.GetEnd())
}
}
func (s) TestGetServers(t *testing.T) {
ss := []*channelz.ServerMetrics{
channelz.NewServerMetricsForTesting(
6,
2,
3,
time.Now().UnixNano(),
),
channelz.NewServerMetricsForTesting(
1,
2,
3,
time.Now().UnixNano(),
),
channelz.NewServerMetricsForTesting(
1,
0,
0,
time.Now().UnixNano(),
),
}
firstID := int64(0)
for i, s := range ss {
svr := channelz.RegisterServer("")
if i == 0 {
firstID = svr.ID
}
svr.ServerMetrics.CopyFrom(s)
defer channelz.RemoveEntry(svr.ID)
}
svr := newCZServer()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
resp, _ := svr.GetServers(ctx, &channelzpb.GetServersRequest{StartServerId: 0})
if !resp.GetEnd() {
t.Fatalf("resp.GetEnd() want true, got %v", resp.GetEnd())
}
serversWant := []*channelzpb.Server{
{
Ref: &channelzpb.ServerRef{ServerId: firstID, Name: ""},
Data: &channelzpb.ServerData{
CallsStarted: 6,
CallsSucceeded: 2,
CallsFailed: 3,
LastCallStartedTimestamp: timestamppb.New(time.Unix(0, ss[0].LastCallStartedTimestamp.Load())),
},
},
{
Ref: &channelzpb.ServerRef{ServerId: firstID + 1, Name: ""},
Data: &channelzpb.ServerData{
CallsStarted: 1,
CallsSucceeded: 2,
CallsFailed: 3,
LastCallStartedTimestamp: timestamppb.New(time.Unix(0, ss[1].LastCallStartedTimestamp.Load())),
},
},
{
Ref: &channelzpb.ServerRef{ServerId: firstID + 2, Name: ""},
Data: &channelzpb.ServerData{
CallsStarted: 1,
CallsSucceeded: 0,
CallsFailed: 0,
LastCallStartedTimestamp: timestamppb.New(time.Unix(0, ss[2].LastCallStartedTimestamp.Load())),
},
},
}
if diff := cmp.Diff(serversWant, resp.GetServer(), protocmp.Transform()); diff != "" {
t.Fatalf("unexpected server, diff (-want +got):\n%s", diff)
}
for i := 0; i < 50; i++ {
id := channelz.RegisterServer("").ID
defer channelz.RemoveEntry(id)
}
resp, _ = svr.GetServers(ctx, &channelzpb.GetServersRequest{StartServerId: 0})
if resp.GetEnd() {
t.Fatalf("resp.GetEnd() want false, got %v", resp.GetEnd())
}
}
func (s) TestGetServerSockets(t *testing.T) {
svrID := channelz.RegisterServer("")
defer channelz.RemoveEntry(svrID.ID)
refNames := []string{"listen socket 1", "normal socket 1", "normal socket 2"}
ids := make([]int64, 3)
ids[0] = channelz.RegisterSocket(&channelz.Socket{SocketType: channelz.SocketTypeListen, Parent: svrID, RefName: refNames[0]}).ID
ids[1] = channelz.RegisterSocket(&channelz.Socket{SocketType: channelz.SocketTypeNormal, Parent: svrID, RefName: refNames[1]}).ID
ids[2] = channelz.RegisterSocket(&channelz.Socket{SocketType: channelz.SocketTypeNormal, Parent: svrID, RefName: refNames[2]}).ID
for _, id := range ids {
defer channelz.RemoveEntry(id)
}
svr := newCZServer()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
resp, _ := svr.GetServerSockets(ctx, &channelzpb.GetServerSocketsRequest{ServerId: svrID.ID, StartSocketId: 0})
if !resp.GetEnd() {
t.Fatalf("resp.GetEnd() want: true, got: %v", resp.GetEnd())
}
// GetServerSockets only return normal sockets.
want := map[int64]string{
ids[1]: refNames[1],
ids[2]: refNames[2],
}
if got := convertSocketRefSliceToMap(resp.GetSocketRef()); !cmp.Equal(got, want) {
t.Fatalf("GetServerSockets want: %#v, got: %#v (resp=%v)", want, got, proto.MarshalTextString(resp))
}
for i := 0; i < 50; i++ {
id := channelz.RegisterSocket(&channelz.Socket{SocketType: channelz.SocketTypeNormal, Parent: svrID})
defer channelz.RemoveEntry(id.ID)
}
resp, _ = svr.GetServerSockets(ctx, &channelzpb.GetServerSocketsRequest{ServerId: svrID.ID, StartSocketId: 0})
if resp.GetEnd() {
t.Fatalf("resp.GetEnd() want false, got %v", resp.GetEnd())
}
}
// This test makes a GetServerSockets with a non-zero start ID, and expect only
// sockets with ID >= the given start ID.
func (s) TestGetServerSocketsNonZeroStartID(t *testing.T) {
svrID := channelz.RegisterServer("test server")
defer channelz.RemoveEntry(svrID.ID)
refNames := []string{"listen socket 1", "normal socket 1", "normal socket 2"}
ids := make([]int64, 3)
ids[0] = channelz.RegisterSocket(&channelz.Socket{SocketType: channelz.SocketTypeListen, Parent: svrID, RefName: refNames[0]}).ID
ids[1] = channelz.RegisterSocket(&channelz.Socket{SocketType: channelz.SocketTypeNormal, Parent: svrID, RefName: refNames[1]}).ID
ids[2] = channelz.RegisterSocket(&channelz.Socket{SocketType: channelz.SocketTypeNormal, Parent: svrID, RefName: refNames[2]}).ID
for _, id := range ids {
defer channelz.RemoveEntry(id)
}
svr := newCZServer()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
// Make GetServerSockets with startID = ids[1]+1, so socket-1 won't be
// included in the response.
resp, _ := svr.GetServerSockets(ctx, &channelzpb.GetServerSocketsRequest{ServerId: svrID.ID, StartSocketId: ids[1] + 1})
if !resp.GetEnd() {
t.Fatalf("resp.GetEnd() want: true, got: %v", resp.GetEnd())
}
// GetServerSockets only return normal socket-2, socket-1 should be
// filtered by start ID.
want := map[int64]string{
ids[2]: refNames[2],
}
if !cmp.Equal(convertSocketRefSliceToMap(resp.GetSocketRef()), want) {
t.Fatalf("GetServerSockets want: %#v, got: %#v", want, resp.GetSocketRef())
}
}
func (s) TestGetChannel(t *testing.T) {
refNames := []string{"top channel 1", "nested channel 1", "sub channel 2", "nested channel 3"}
cids := make([]*channelz.Channel, 3)
cids[0] = channelz.RegisterChannel(nil, refNames[0])
channelz.AddTraceEvent(logger, cids[0], 0, &channelz.TraceEvent{
Desc: "Channel Created",
Severity: channelz.CtInfo,
})
cids[1] = channelz.RegisterChannel(cids[0], refNames[1])
channelz.AddTraceEvent(logger, cids[1], 0, &channelz.TraceEvent{
Desc: "Channel Created",
Severity: channelz.CtInfo,
Parent: &channelz.TraceEvent{
Desc: fmt.Sprintf("Nested Channel(id:%d) created", cids[1].ID),
Severity: channelz.CtInfo,
},
})
subChan := channelz.RegisterSubChannel(cids[0], refNames[2])
channelz.AddTraceEvent(logger, subChan, 0, &channelz.TraceEvent{
Desc: "SubChannel Created",
Severity: channelz.CtInfo,
Parent: &channelz.TraceEvent{
Desc: fmt.Sprintf("SubChannel(id:%d) created", subChan.ID),
Severity: channelz.CtInfo,
},
})
defer channelz.RemoveEntry(subChan.ID)
cids[2] = channelz.RegisterChannel(cids[1], refNames[3])
channelz.AddTraceEvent(logger, cids[2], 0, &channelz.TraceEvent{
Desc: "Channel Created",
Severity: channelz.CtInfo,
Parent: &channelz.TraceEvent{
Desc: fmt.Sprintf("Nested Channel(id:%d) created", cids[2].ID),
Severity: channelz.CtInfo,
},
})
channelz.AddTraceEvent(logger, cids[0], 0, &channelz.TraceEvent{
Desc: fmt.Sprintf("Channel Connectivity change to %v", connectivity.Ready),
Severity: channelz.CtInfo,
})
channelz.AddTraceEvent(logger, cids[0], 0, &channelz.TraceEvent{
Desc: "Resolver returns an empty address list",
Severity: channelz.CtWarning,
})
for _, id := range cids {
defer channelz.RemoveEntry(id.ID)
}
svr := newCZServer()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
resp, _ := svr.GetChannel(ctx, &channelzpb.GetChannelRequest{ChannelId: cids[0].ID})
metrics := resp.GetChannel()
subChans := metrics.GetSubchannelRef()
if len(subChans) != 1 || subChans[0].GetName() != refNames[2] || subChans[0].GetSubchannelId() != subChan.ID {
t.Fatalf("metrics.GetSubChannelRef() want %#v, got %#v", []*channelzpb.SubchannelRef{{SubchannelId: subChan.ID, Name: refNames[2]}}, subChans)
}
nestedChans := metrics.GetChannelRef()
if len(nestedChans) != 1 || nestedChans[0].GetName() != refNames[1] || nestedChans[0].GetChannelId() != cids[1].ID {
t.Fatalf("metrics.GetChannelRef() want %#v, got %#v", []*channelzpb.ChannelRef{{ChannelId: cids[1].ID, Name: refNames[1]}}, nestedChans)
}
trace := metrics.GetData().GetTrace()
want := []struct {
desc string
severity channelzpb.ChannelTraceEvent_Severity
childID int64
childRef string
}{
{desc: "Channel Created", severity: channelzpb.ChannelTraceEvent_CT_INFO},
{desc: fmt.Sprintf("Nested Channel(id:%d) created", cids[1].ID), severity: channelzpb.ChannelTraceEvent_CT_INFO, childID: cids[1].ID, childRef: refNames[1]},
{desc: fmt.Sprintf("SubChannel(id:%d) created", subChan.ID), severity: channelzpb.ChannelTraceEvent_CT_INFO, childID: subChan.ID, childRef: refNames[2]},
{desc: fmt.Sprintf("Channel Connectivity change to %v", connectivity.Ready), severity: channelzpb.ChannelTraceEvent_CT_INFO},
{desc: "Resolver returns an empty address list", severity: channelzpb.ChannelTraceEvent_CT_WARNING},
}
for i, e := range trace.Events {
if !strings.Contains(e.GetDescription(), want[i].desc) {
t.Fatalf("trace: GetDescription want %#v, got %#v", want[i].desc, e.GetDescription())
}
if e.GetSeverity() != want[i].severity {
t.Fatalf("trace: GetSeverity want %#v, got %#v", want[i].severity, e.GetSeverity())
}
if want[i].childID == 0 && (e.GetChannelRef() != nil || e.GetSubchannelRef() != nil) {
t.Fatalf("trace: GetChannelRef() should return nil, as there is no reference")
}
if e.GetChannelRef().GetChannelId() != want[i].childID || e.GetChannelRef().GetName() != want[i].childRef {
if e.GetSubchannelRef().GetSubchannelId() != want[i].childID || e.GetSubchannelRef().GetName() != want[i].childRef {
t.Fatalf("trace: GetChannelRef/GetSubchannelRef want (child ID: %d, child name: %q), got %#v and %#v", want[i].childID, want[i].childRef, e.GetChannelRef(), e.GetSubchannelRef())
}
}
}
resp, _ = svr.GetChannel(ctx, &channelzpb.GetChannelRequest{ChannelId: cids[1].ID})
metrics = resp.GetChannel()
nestedChans = metrics.GetChannelRef()
if len(nestedChans) != 1 || nestedChans[0].GetName() != refNames[3] || nestedChans[0].GetChannelId() != cids[2].ID {
t.Fatalf("metrics.GetChannelRef() want %#v, got %#v", []*channelzpb.ChannelRef{{ChannelId: cids[2].ID, Name: refNames[3]}}, nestedChans)
}
}
func (s) TestGetSubChannel(t *testing.T) {
var (
subchanCreated = "SubChannel Created"
subchanConnectivityChange = fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.Ready)
subChanPickNewAddress = fmt.Sprintf("Subchannel picks a new address %q to connect", "0.0.0.0")
)
refNames := []string{"top channel 1", "sub channel 1", "socket 1", "socket 2"}
chann := channelz.RegisterChannel(nil, refNames[0])
defer channelz.RemoveEntry(chann.ID)
channelz.AddTraceEvent(logger, chann, 0, &channelz.TraceEvent{
Desc: "Channel Created",
Severity: channelz.CtInfo,
})
subChan := channelz.RegisterSubChannel(chann, refNames[1])
defer channelz.RemoveEntry(subChan.ID)
channelz.AddTraceEvent(logger, subChan, 0, &channelz.TraceEvent{
Desc: subchanCreated,
Severity: channelz.CtInfo,
Parent: &channelz.TraceEvent{
Desc: fmt.Sprintf("Nested Channel(id:%d) created", chann.ID),
Severity: channelz.CtInfo,
},
})
skt1 := channelz.RegisterSocket(&channelz.Socket{SocketType: channelz.SocketTypeNormal, Parent: subChan, RefName: refNames[2]})
defer channelz.RemoveEntry(skt1.ID)
skt2 := channelz.RegisterSocket(&channelz.Socket{SocketType: channelz.SocketTypeNormal, Parent: subChan, RefName: refNames[3]})
defer channelz.RemoveEntry(skt2.ID)
channelz.AddTraceEvent(logger, subChan, 0, &channelz.TraceEvent{
Desc: subchanConnectivityChange,
Severity: channelz.CtInfo,
})
channelz.AddTraceEvent(logger, subChan, 0, &channelz.TraceEvent{
Desc: subChanPickNewAddress,
Severity: channelz.CtInfo,
})
svr := newCZServer()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
resp, _ := svr.GetSubchannel(ctx, &channelzpb.GetSubchannelRequest{SubchannelId: subChan.ID})
metrics := resp.GetSubchannel()
want := map[int64]string{
skt1.ID: refNames[2],
skt2.ID: refNames[3],
}
if !cmp.Equal(convertSocketRefSliceToMap(metrics.GetSocketRef()), want) {
t.Fatalf("metrics.GetSocketRef() want %#v: got: %#v", want, metrics.GetSocketRef())
}
trace := metrics.GetData().GetTrace()
wantTrace := []struct {
desc string
severity channelzpb.ChannelTraceEvent_Severity
childID int64
childRef string
}{
{desc: subchanCreated, severity: channelzpb.ChannelTraceEvent_CT_INFO},
{desc: subchanConnectivityChange, severity: channelzpb.ChannelTraceEvent_CT_INFO},
{desc: subChanPickNewAddress, severity: channelzpb.ChannelTraceEvent_CT_INFO},
}
for i, e := range trace.Events {
if e.GetDescription() != wantTrace[i].desc {
t.Fatalf("trace: GetDescription want %#v, got %#v", wantTrace[i].desc, e.GetDescription())
}
if e.GetSeverity() != wantTrace[i].severity {
t.Fatalf("trace: GetSeverity want %#v, got %#v", wantTrace[i].severity, e.GetSeverity())
}
if wantTrace[i].childID == 0 && (e.GetChannelRef() != nil || e.GetSubchannelRef() != nil) {
t.Fatalf("trace: GetChannelRef() should return nil, as there is no reference")
}
if e.GetChannelRef().GetChannelId() != wantTrace[i].childID || e.GetChannelRef().GetName() != wantTrace[i].childRef {
if e.GetSubchannelRef().GetSubchannelId() != wantTrace[i].childID || e.GetSubchannelRef().GetName() != wantTrace[i].childRef {
t.Fatalf("trace: GetChannelRef/GetSubchannelRef want (child ID: %d, child name: %q), got %#v and %#v", wantTrace[i].childID, wantTrace[i].childRef, e.GetChannelRef(), e.GetSubchannelRef())
}
}
}
}
type czSocket struct {
streamsStarted int64
streamsSucceeded int64
streamsFailed int64
messagesSent int64
messagesReceived int64
keepAlivesSent int64
lastLocalStreamCreatedTimestamp time.Time
lastRemoteStreamCreatedTimestamp time.Time
lastMessageSentTimestamp time.Time
lastMessageReceivedTimestamp time.Time
localFlowControlWindow int64
remoteFlowControlWindow int64
localAddr net.Addr
remoteAddr net.Addr
remoteName string
socketOptions *channelz.SocketOptionData
security credentials.ChannelzSecurityValue
}
func newSocket(cs czSocket) *channelz.Socket {
if cs.lastLocalStreamCreatedTimestamp.IsZero() {
cs.lastLocalStreamCreatedTimestamp = time.Unix(0, 0)
}
if cs.lastRemoteStreamCreatedTimestamp.IsZero() {
cs.lastRemoteStreamCreatedTimestamp = time.Unix(0, 0)
}
if cs.lastMessageSentTimestamp.IsZero() {
cs.lastMessageSentTimestamp = time.Unix(0, 0)
}
if cs.lastMessageReceivedTimestamp.IsZero() {
cs.lastMessageReceivedTimestamp = time.Unix(0, 0)
}
s := &channelz.Socket{
LocalAddr: cs.localAddr,
RemoteAddr: cs.remoteAddr,
RemoteName: cs.remoteName,
SocketOptions: cs.socketOptions,
Security: cs.security,
}
s.SocketMetrics.StreamsStarted.Store(cs.streamsStarted)
s.SocketMetrics.StreamsSucceeded.Store(cs.streamsSucceeded)
s.SocketMetrics.StreamsFailed.Store(cs.streamsFailed)
s.SocketMetrics.MessagesSent.Store(cs.messagesSent)
s.SocketMetrics.MessagesReceived.Store(cs.messagesReceived)
s.SocketMetrics.KeepAlivesSent.Store(cs.keepAlivesSent)
s.SocketMetrics.LastLocalStreamCreatedTimestamp.Store(cs.lastLocalStreamCreatedTimestamp.UnixNano())
s.SocketMetrics.LastRemoteStreamCreatedTimestamp.Store(cs.lastRemoteStreamCreatedTimestamp.UnixNano())
s.SocketMetrics.LastMessageSentTimestamp.Store(cs.lastMessageSentTimestamp.UnixNano())
s.SocketMetrics.LastMessageReceivedTimestamp.Store(cs.lastMessageReceivedTimestamp.UnixNano())
s.EphemeralMetrics = func() *channelz.EphemeralSocketMetrics {
return &channelz.EphemeralSocketMetrics{
LocalFlowControlWindow: cs.localFlowControlWindow,
RemoteFlowControlWindow: cs.remoteFlowControlWindow,
}
}
return s
}
func (s) TestGetSocket(t *testing.T) {
ss := []*channelz.Socket{newSocket(czSocket{
streamsStarted: 10,
streamsSucceeded: 2,
streamsFailed: 3,
messagesSent: 20,
messagesReceived: 10,
keepAlivesSent: 2,
lastLocalStreamCreatedTimestamp: time.Unix(0, 0),
lastRemoteStreamCreatedTimestamp: time.Unix(1, 0),
lastMessageSentTimestamp: time.Unix(2, 0),
lastMessageReceivedTimestamp: time.Unix(3, 0),
localFlowControlWindow: 65536,
remoteFlowControlWindow: 1024,
localAddr: &net.TCPAddr{IP: net.ParseIP("1.0.0.1"), Port: 10001},
remoteAddr: &net.TCPAddr{IP: net.ParseIP("12.0.0.1"), Port: 10002},
remoteName: "remote.remote",
}), newSocket(czSocket{
streamsStarted: 10,
streamsSucceeded: 2,
streamsFailed: 3,
messagesSent: 20,
messagesReceived: 10,
keepAlivesSent: 2,
lastLocalStreamCreatedTimestamp: time.Unix(0, 0),
lastRemoteStreamCreatedTimestamp: time.Unix(5, 0),
lastMessageSentTimestamp: time.Unix(6, 0),
lastMessageReceivedTimestamp: time.Unix(7, 0),
localFlowControlWindow: 65536,
remoteFlowControlWindow: 1024,
localAddr: &net.UnixAddr{Name: "file.path", Net: "unix"},
remoteAddr: &net.UnixAddr{Name: "another.path", Net: "unix"},
remoteName: "remote.remote",
}), newSocket(czSocket{
streamsStarted: 5,
streamsSucceeded: 2,
streamsFailed: 3,
messagesSent: 20,
messagesReceived: 10,
keepAlivesSent: 2,
lastLocalStreamCreatedTimestamp: time.Unix(10, 10),
lastRemoteStreamCreatedTimestamp: time.Unix(0, 0),
lastMessageSentTimestamp: time.Unix(0, 0),
lastMessageReceivedTimestamp: time.Unix(0, 0),
localFlowControlWindow: 65536,
remoteFlowControlWindow: 10240,
localAddr: &net.IPAddr{IP: net.ParseIP("1.0.0.1")},
remoteAddr: &net.IPAddr{IP: net.ParseIP("9.0.0.1")},
remoteName: "",
}), newSocket(czSocket{
localAddr: &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 10001},
}), newSocket(czSocket{
security: &credentials.TLSChannelzSecurityValue{
StandardName: "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256",
RemoteCertificate: []byte{48, 130, 2, 156, 48, 130, 2, 5, 160},
},
}), newSocket(czSocket{
security: &credentials.OtherChannelzSecurityValue{
Name: "XXXX",
},
}), newSocket(czSocket{
security: &credentials.OtherChannelzSecurityValue{
Name: "YYYY",
Value: &OtherSecurityValue{LocalCertificate: []byte{1, 2, 3}, RemoteCertificate: []byte{4, 5, 6}},
},
}),
}
otherSecVal, err := ptypes.MarshalAny(ss[6].Security.(*credentials.OtherChannelzSecurityValue).Value)
if err != nil {
t.Fatal("Error marshalling proto:", err)
}
svr := newCZServer()
skts := make([]*channelz.Socket, len(ss))
svrID := channelz.RegisterServer("")
defer channelz.RemoveEntry(svrID.ID)
for i, s := range ss {
s.Parent = svrID
s.RefName = strconv.Itoa(i)
skts[i] = channelz.RegisterSocket(s)
defer channelz.RemoveEntry(skts[i].ID)
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
emptyData := `data: {
last_local_stream_created_timestamp: {seconds: 0 nanos: 0}
last_remote_stream_created_timestamp: {seconds: 0 nanos: 0}
last_message_sent_timestamp: {seconds: 0 nanos: 0}
last_message_received_timestamp: {seconds: 0 nanos: 0}
local_flow_control_window: { value: 0 }
remote_flow_control_window: { value: 0 }
}`
want := []string{`
ref: {socket_id: ` + fmt.Sprint(skts[0].ID) + ` name: "0" }
data: {
streams_started: 10
streams_succeeded: 2
streams_failed: 3
messages_sent: 20
messages_received: 10
keep_alives_sent: 2
last_local_stream_created_timestamp: {seconds: 0 nanos: 0}
last_remote_stream_created_timestamp: {seconds: 1 nanos: 0}
last_message_sent_timestamp: {seconds: 2 nanos: 0}
last_message_received_timestamp: {seconds: 3 nanos: 0}
local_flow_control_window: { value: 65536 }
remote_flow_control_window: { value: 1024 }
}
local: { tcpip_address: { ip_address: "` + addr(skts[0].LocalAddr) + `" port: 10001 } }
remote: { tcpip_address: { ip_address: "` + addr(skts[0].RemoteAddr) + `" port: 10002 } }
remote_name: "remote.remote"`,
`
ref: {socket_id: ` + fmt.Sprint(skts[1].ID) + ` name: "1" }
data: {
streams_started: 10
streams_succeeded: 2
streams_failed: 3
messages_sent: 20
messages_received: 10
keep_alives_sent: 2
last_local_stream_created_timestamp: {seconds: 0 nanos: 0}
last_remote_stream_created_timestamp: {seconds: 5 nanos: 0}
last_message_sent_timestamp: {seconds: 6 nanos: 0}
last_message_received_timestamp: {seconds: 7 nanos: 0}
local_flow_control_window: { value: 65536 }
remote_flow_control_window: { value: 1024 }
}
local: { uds_address { filename: "file.path" } }
remote: { uds_address { filename: "another.path" } }
remote_name: "remote.remote"`,
`
ref: {socket_id: ` + fmt.Sprint(skts[2].ID) + ` name: "2" }
data: {
streams_started: 5
streams_succeeded: 2
streams_failed: 3
messages_sent: 20
messages_received: 10
keep_alives_sent: 2
last_local_stream_created_timestamp: {seconds: 10 nanos: 10}
last_remote_stream_created_timestamp: {seconds: 0 nanos: 0}
last_message_sent_timestamp: {seconds: 0 nanos: 0}
last_message_received_timestamp: {seconds: 0 nanos: 0}
local_flow_control_window: { value: 65536 }
remote_flow_control_window: { value: 10240 }
}
local: { tcpip_address: { ip_address: "` + addr(skts[2].LocalAddr) + `" } }
remote: { tcpip_address: { ip_address: "` + addr(skts[2].RemoteAddr) + `" } }
remote_name: ""`,
`
ref: {socket_id: ` + fmt.Sprint(skts[3].ID) + ` name: "3" }
local: { tcpip_address: { ip_address: "` + addr(skts[3].LocalAddr) + `" port: 10001 } }
` + emptyData,
`
ref: {socket_id: ` + fmt.Sprint(skts[4].ID) + ` name: "4" }
security: { tls: {
standard_name: "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256"
remote_certificate: "\x30\x82\x02\x9c\x30\x82\x02\x05\xa0"
} }
` + emptyData,
`
ref: {socket_id: ` + fmt.Sprint(skts[5].ID) + ` name: "5" }
security: { other: { name: "XXXX" } }
` + emptyData,
`
ref: {socket_id: ` + fmt.Sprint(skts[6].ID) + ` name: "6" }
security: { other: {
name: "YYYY"
value: {
type_url: "type.googleapis.com/grpc.credentials.OtherChannelzSecurityValue"
value: "` + escape(otherSecVal.Value) + `"
}
} }
` + emptyData,
}
for i := range ss {
resp, _ := svr.GetSocket(ctx, &channelzpb.GetSocketRequest{SocketId: skts[i].ID})
w := &channelzpb.Socket{}
if err := proto.UnmarshalText(want[i], w); err != nil {
t.Fatalf("Error unmarshalling %q: %v", want[i], err)
}
if diff := cmp.Diff(resp.GetSocket(), w, protocmp.Transform()); diff != "" {
t.Fatalf("Socket %v did not match expected. -got +want: %v", i, diff)
}
}
}
func escape(bs []byte) string {
ret := ""
for _, b := range bs {
ret += fmt.Sprintf("\\x%02x", b)
}
return ret
}
func addr(a net.Addr) string {
switch a := a.(type) {
case *net.TCPAddr:
return string(a.IP)
case *net.IPAddr:
return string(a.IP)
}
return ""
}