blob: 16f69eb89f1eb9fdd1097a420a5001ca23f93305 [file] [log] [blame]
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
//go:generate protoc -I ../grpc_channelz_v1 --go_out=plugins=grpc,paths=source_relative:../grpc_channelz_v1 ../grpc_channelz_v1/channelz.proto
// Package service provides an implementation for channelz service server.
package service
import (
"net"
"github.com/golang/protobuf/ptypes"
wrpb "github.com/golang/protobuf/ptypes/wrappers"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/channelz"
pb "google.golang.org/grpc/channelz/grpc_channelz_v1"
"google.golang.org/grpc/connectivity"
)
// RegisterChannelzServiceToServer registers the channelz service to the given server.
func RegisterChannelzServiceToServer(s *grpc.Server) {
pb.RegisterChannelzServer(s, &serverImpl{})
}
func newCZServer() pb.ChannelzServer {
return &serverImpl{}
}
type serverImpl struct{}
func connectivityStateToProto(s connectivity.State) *pb.ChannelConnectivityState {
switch s {
case connectivity.Idle:
return &pb.ChannelConnectivityState{State: pb.ChannelConnectivityState_IDLE}
case connectivity.Connecting:
return &pb.ChannelConnectivityState{State: pb.ChannelConnectivityState_CONNECTING}
case connectivity.Ready:
return &pb.ChannelConnectivityState{State: pb.ChannelConnectivityState_READY}
case connectivity.TransientFailure:
return &pb.ChannelConnectivityState{State: pb.ChannelConnectivityState_TRANSIENT_FAILURE}
case connectivity.Shutdown:
return &pb.ChannelConnectivityState{State: pb.ChannelConnectivityState_SHUTDOWN}
default:
return &pb.ChannelConnectivityState{State: pb.ChannelConnectivityState_UNKNOWN}
}
}
func channelMetricToProto(cm *channelz.ChannelMetric) *pb.Channel {
c := &pb.Channel{}
c.Ref = &pb.ChannelRef{ChannelId: cm.ID, Name: cm.RefName}
c.Data = &pb.ChannelData{
State: connectivityStateToProto(cm.ChannelData.State),
Target: cm.ChannelData.Target,
CallsStarted: cm.ChannelData.CallsStarted,
CallsSucceeded: cm.ChannelData.CallsSucceeded,
CallsFailed: cm.ChannelData.CallsFailed,
}
if ts, err := ptypes.TimestampProto(cm.ChannelData.LastCallStartedTimestamp); err == nil {
c.Data.LastCallStartedTimestamp = ts
}
nestedChans := make([]*pb.ChannelRef, 0, len(cm.NestedChans))
for id, ref := range cm.NestedChans {
nestedChans = append(nestedChans, &pb.ChannelRef{ChannelId: id, Name: ref})
}
c.ChannelRef = nestedChans
subChans := make([]*pb.SubchannelRef, 0, len(cm.SubChans))
for id, ref := range cm.SubChans {
subChans = append(subChans, &pb.SubchannelRef{SubchannelId: id, Name: ref})
}
c.SubchannelRef = subChans
sockets := make([]*pb.SocketRef, 0, len(cm.Sockets))
for id, ref := range cm.Sockets {
sockets = append(sockets, &pb.SocketRef{SocketId: id, Name: ref})
}
c.SocketRef = sockets
return c
}
func subChannelMetricToProto(cm *channelz.SubChannelMetric) *pb.Subchannel {
sc := &pb.Subchannel{}
sc.Ref = &pb.SubchannelRef{SubchannelId: cm.ID, Name: cm.RefName}
sc.Data = &pb.ChannelData{
State: connectivityStateToProto(cm.ChannelData.State),
Target: cm.ChannelData.Target,
CallsStarted: cm.ChannelData.CallsStarted,
CallsSucceeded: cm.ChannelData.CallsSucceeded,
CallsFailed: cm.ChannelData.CallsFailed,
}
if ts, err := ptypes.TimestampProto(cm.ChannelData.LastCallStartedTimestamp); err == nil {
sc.Data.LastCallStartedTimestamp = ts
}
nestedChans := make([]*pb.ChannelRef, 0, len(cm.NestedChans))
for id, ref := range cm.NestedChans {
nestedChans = append(nestedChans, &pb.ChannelRef{ChannelId: id, Name: ref})
}
sc.ChannelRef = nestedChans
subChans := make([]*pb.SubchannelRef, 0, len(cm.SubChans))
for id, ref := range cm.SubChans {
subChans = append(subChans, &pb.SubchannelRef{SubchannelId: id, Name: ref})
}
sc.SubchannelRef = subChans
sockets := make([]*pb.SocketRef, 0, len(cm.Sockets))
for id, ref := range cm.Sockets {
sockets = append(sockets, &pb.SocketRef{SocketId: id, Name: ref})
}
sc.SocketRef = sockets
return sc
}
func addrToProto(a net.Addr) *pb.Address {
switch a.Network() {
case "udp":
// TODO: Address_OtherAddress{}. Need proto def for Value.
case "ip":
// Note zone info is discarded through the conversion.
return &pb.Address{Address: &pb.Address_TcpipAddress{TcpipAddress: &pb.Address_TcpIpAddress{IpAddress: a.(*net.IPAddr).IP}}}
case "ip+net":
// Note mask info is discarded through the conversion.
return &pb.Address{Address: &pb.Address_TcpipAddress{TcpipAddress: &pb.Address_TcpIpAddress{IpAddress: a.(*net.IPNet).IP}}}
case "tcp":
// Note zone info is discarded through the conversion.
return &pb.Address{Address: &pb.Address_TcpipAddress{TcpipAddress: &pb.Address_TcpIpAddress{IpAddress: a.(*net.TCPAddr).IP, Port: int32(a.(*net.TCPAddr).Port)}}}
case "unix", "unixgram", "unixpacket":
return &pb.Address{Address: &pb.Address_UdsAddress_{UdsAddress: &pb.Address_UdsAddress{Filename: a.String()}}}
default:
}
return &pb.Address{}
}
func socketMetricToProto(sm *channelz.SocketMetric) *pb.Socket {
s := &pb.Socket{}
s.Ref = &pb.SocketRef{SocketId: sm.ID, Name: sm.RefName}
s.Data = &pb.SocketData{
StreamsStarted: sm.SocketData.StreamsStarted,
StreamsSucceeded: sm.SocketData.StreamsSucceeded,
StreamsFailed: sm.SocketData.StreamsFailed,
MessagesSent: sm.SocketData.MessagesSent,
MessagesReceived: sm.SocketData.MessagesReceived,
KeepAlivesSent: sm.SocketData.KeepAlivesSent,
}
if ts, err := ptypes.TimestampProto(sm.SocketData.LastLocalStreamCreatedTimestamp); err == nil {
s.Data.LastLocalStreamCreatedTimestamp = ts
}
if ts, err := ptypes.TimestampProto(sm.SocketData.LastRemoteStreamCreatedTimestamp); err == nil {
s.Data.LastRemoteStreamCreatedTimestamp = ts
}
if ts, err := ptypes.TimestampProto(sm.SocketData.LastMessageSentTimestamp); err == nil {
s.Data.LastMessageSentTimestamp = ts
}
if ts, err := ptypes.TimestampProto(sm.SocketData.LastMessageReceivedTimestamp); err == nil {
s.Data.LastMessageReceivedTimestamp = ts
}
s.Data.LocalFlowControlWindow = &wrpb.Int64Value{Value: sm.SocketData.LocalFlowControlWindow}
s.Data.RemoteFlowControlWindow = &wrpb.Int64Value{Value: sm.SocketData.RemoteFlowControlWindow}
if sm.SocketData.LocalAddr != nil {
s.Local = addrToProto(sm.SocketData.LocalAddr)
}
if sm.SocketData.RemoteAddr != nil {
s.Remote = addrToProto(sm.SocketData.RemoteAddr)
}
s.RemoteName = sm.SocketData.RemoteName
return s
}
func (s *serverImpl) GetTopChannels(ctx context.Context, req *pb.GetTopChannelsRequest) (*pb.GetTopChannelsResponse, error) {
metrics, end := channelz.GetTopChannels(req.GetStartChannelId())
resp := &pb.GetTopChannelsResponse{}
for _, m := range metrics {
resp.Channel = append(resp.Channel, channelMetricToProto(m))
}
resp.End = end
return resp, nil
}
func serverMetricToProto(sm *channelz.ServerMetric) *pb.Server {
s := &pb.Server{}
s.Ref = &pb.ServerRef{ServerId: sm.ID, Name: sm.RefName}
s.Data = &pb.ServerData{
CallsStarted: sm.ServerData.CallsStarted,
CallsSucceeded: sm.ServerData.CallsSucceeded,
CallsFailed: sm.ServerData.CallsFailed,
}
if ts, err := ptypes.TimestampProto(sm.ServerData.LastCallStartedTimestamp); err == nil {
s.Data.LastCallStartedTimestamp = ts
}
sockets := make([]*pb.SocketRef, 0, len(sm.ListenSockets))
for id, ref := range sm.ListenSockets {
sockets = append(sockets, &pb.SocketRef{SocketId: id, Name: ref})
}
s.ListenSocket = sockets
return s
}
func (s *serverImpl) GetServers(ctx context.Context, req *pb.GetServersRequest) (*pb.GetServersResponse, error) {
metrics, end := channelz.GetServers(req.GetStartServerId())
resp := &pb.GetServersResponse{}
for _, m := range metrics {
resp.Server = append(resp.Server, serverMetricToProto(m))
}
resp.End = end
return resp, nil
}
func (s *serverImpl) GetServerSockets(ctx context.Context, req *pb.GetServerSocketsRequest) (*pb.GetServerSocketsResponse, error) {
metrics, end := channelz.GetServerSockets(req.GetServerId(), req.GetStartSocketId())
resp := &pb.GetServerSocketsResponse{}
for _, m := range metrics {
resp.SocketRef = append(resp.SocketRef, &pb.SocketRef{SocketId: m.ID, Name: m.RefName})
}
resp.End = end
return resp, nil
}
func (s *serverImpl) GetChannel(ctx context.Context, req *pb.GetChannelRequest) (*pb.GetChannelResponse, error) {
var metric *channelz.ChannelMetric
if metric = channelz.GetChannel(req.GetChannelId()); metric == nil {
return &pb.GetChannelResponse{}, nil
}
resp := &pb.GetChannelResponse{Channel: channelMetricToProto(metric)}
return resp, nil
}
func (s *serverImpl) GetSubchannel(ctx context.Context, req *pb.GetSubchannelRequest) (*pb.GetSubchannelResponse, error) {
var metric *channelz.SubChannelMetric
if metric = channelz.GetSubChannel(req.GetSubchannelId()); metric == nil {
return &pb.GetSubchannelResponse{}, nil
}
resp := &pb.GetSubchannelResponse{Subchannel: subChannelMetricToProto(metric)}
return resp, nil
}
func (s *serverImpl) GetSocket(ctx context.Context, req *pb.GetSocketRequest) (*pb.GetSocketResponse, error) {
var metric *channelz.SocketMetric
if metric = channelz.GetSocket(req.GetSocketId()); metric == nil {
return &pb.GetSocketResponse{}, nil
}
resp := &pb.GetSocketResponse{Socket: socketMetricToProto(metric)}
return resp, nil
}