blob: 4b3848d0d1983c1788a44c7611384eef46e532c7 [file] [log] [blame]
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package main
import (
"encoding/binary"
"encoding/json"
"fmt"
"os"
"sort"
"strings"
ppb "google.golang.org/grpc/profiling/proto"
)
type jsonNode struct {
Name string `json:"name"`
Cat string `json:"cat"`
ID string `json:"id"`
Cname string `json:"cname"`
Phase string `json:"ph"`
Timestamp float64 `json:"ts"`
PID string `json:"pid"`
TID string `json:"tid"`
}
// Catapult does not allow specifying colours manually; a 20-odd predefined
// labels are used (that don't make much sense outside the context of
// Chromium). See this for more details:
//
// https://github.com/catapult-project/catapult/blob/bef344f7017fc9e04f7049d0f58af6d9ce9f4ab6/tracing/tracing/base/color_scheme.html#L29
func hashCname(tag string) string {
if strings.Contains(tag, "encoding") {
return "rail_response"
}
if strings.Contains(tag, "compression") {
return "cq_build_passed"
}
if strings.Contains(tag, "transport") {
if strings.Contains(tag, "blocking") {
return "rail_animation"
}
return "good"
}
if strings.Contains(tag, "header") {
return "cq_build_attempt_failed"
}
if tag == "/" {
return "heap_dump_stack_frame"
}
if strings.Contains(tag, "flow") || strings.Contains(tag, "tmp") {
return "heap_dump_stack_frame"
}
return ""
}
// filterCounter identifies the counter-th instance of a timer of the type
// `filter` within a Stat. This, in conjunction with the counter data structure
// defined below, is used to draw flows between linked loopy writer/reader
// events with application goroutine events in trace-viewer. This is possible
// because enqueues and dequeues are ordered -- that is, the first dequeue must
// be dequeueing the first enqueue operation.
func filterCounter(stat *ppb.Stat, filter string, counter int) int {
localCounter := 0
for i := 0; i < len(stat.Timers); i++ {
if stat.Timers[i].Tags == filter {
if localCounter == counter {
return i
}
localCounter++
}
}
return -1
}
// counter is state object used to store and retrieve the number of timers of a
// particular type that have been seen.
type counter struct {
c map[string]int
}
func newCounter() *counter {
return &counter{c: make(map[string]int)}
}
func (c *counter) GetAndInc(s string) int {
ret := c.c[s]
c.c[s]++
return ret
}
func catapultNs(sec int64, nsec int32) float64 {
return float64((sec * 1000000000) + int64(nsec))
}
// streamStatsCatapultJSONSingle processes a single proto Stat object to return
// an array of jsonNodes in trace-viewer's format.
func streamStatsCatapultJSONSingle(stat *ppb.Stat, baseSec int64, baseNsec int32) []jsonNode {
if len(stat.Timers) == 0 {
return nil
}
connectionCounter := binary.BigEndian.Uint64(stat.Metadata[0:8])
streamID := binary.BigEndian.Uint32(stat.Metadata[8:12])
opid := fmt.Sprintf("/%s/%d/%d", stat.Tags, connectionCounter, streamID)
var loopyReaderGoID, loopyWriterGoID int64
for i := 0; i < len(stat.Timers) && (loopyReaderGoID == 0 || loopyWriterGoID == 0); i++ {
if strings.Contains(stat.Timers[i].Tags, "/loopyReader") {
loopyReaderGoID = stat.Timers[i].GoId
} else if strings.Contains(stat.Timers[i].Tags, "/loopyWriter") {
loopyWriterGoID = stat.Timers[i].GoId
}
}
lrc, lwc := newCounter(), newCounter()
var result []jsonNode
result = append(result,
jsonNode{
Name: "loopyReaderTmp",
ID: opid,
Cname: hashCname("tmp"),
Phase: "i",
Timestamp: 0,
PID: fmt.Sprintf("/%s/%d/loopyReader", stat.Tags, connectionCounter),
TID: fmt.Sprintf("%d", loopyReaderGoID),
},
jsonNode{
Name: "loopyWriterTmp",
ID: opid,
Cname: hashCname("tmp"),
Phase: "i",
Timestamp: 0,
PID: fmt.Sprintf("/%s/%d/loopyWriter", stat.Tags, connectionCounter),
TID: fmt.Sprintf("%d", loopyWriterGoID),
},
)
for i := 0; i < len(stat.Timers); i++ {
categories := stat.Tags
pid, tid := opid, fmt.Sprintf("%d", stat.Timers[i].GoId)
if stat.Timers[i].GoId == loopyReaderGoID {
pid, tid = fmt.Sprintf("/%s/%d/loopyReader", stat.Tags, connectionCounter), fmt.Sprintf("%d", stat.Timers[i].GoId)
var flowEndID int
var flowEndPID, flowEndTID string
switch stat.Timers[i].Tags {
case "/http2/recv/header":
flowEndID = filterCounter(stat, "/grpc/stream/recv/header", lrc.GetAndInc("/http2/recv/header"))
if flowEndID != -1 {
flowEndPID = opid
flowEndTID = fmt.Sprintf("%d", stat.Timers[flowEndID].GoId)
} else {
logger.Infof("cannot find %s/grpc/stream/recv/header for %s/http2/recv/header", opid, opid)
}
case "/http2/recv/dataFrame/loopyReader":
flowEndID = filterCounter(stat, "/recvAndDecompress", lrc.GetAndInc("/http2/recv/dataFrame/loopyReader"))
if flowEndID != -1 {
flowEndPID = opid
flowEndTID = fmt.Sprintf("%d", stat.Timers[flowEndID].GoId)
} else {
logger.Infof("cannot find %s/recvAndDecompress for %s/http2/recv/dataFrame/loopyReader", opid, opid)
}
default:
flowEndID = -1
}
if flowEndID != -1 {
flowID := fmt.Sprintf("lrc begin:/%d%s end:/%d%s begin:(%d, %s, %s) end:(%d, %s, %s)", connectionCounter, stat.Timers[i].Tags, connectionCounter, stat.Timers[flowEndID].Tags, i, pid, tid, flowEndID, flowEndPID, flowEndTID)
result = append(result,
jsonNode{
Name: fmt.Sprintf("%s/flow", opid),
Cat: categories + ",flow",
ID: flowID,
Cname: hashCname("flow"),
Phase: "s",
Timestamp: catapultNs(stat.Timers[i].EndSec-baseSec, stat.Timers[i].EndNsec-baseNsec),
PID: pid,
TID: tid,
},
jsonNode{
Name: fmt.Sprintf("%s/flow", opid),
Cat: categories + ",flow",
ID: flowID,
Cname: hashCname("flow"),
Phase: "f",
Timestamp: catapultNs(stat.Timers[flowEndID].BeginSec-baseSec, stat.Timers[flowEndID].BeginNsec-baseNsec),
PID: flowEndPID,
TID: flowEndTID,
},
)
}
} else if stat.Timers[i].GoId == loopyWriterGoID {
pid, tid = fmt.Sprintf("/%s/%d/loopyWriter", stat.Tags, connectionCounter), fmt.Sprintf("%d", stat.Timers[i].GoId)
var flowBeginID int
var flowBeginPID, flowBeginTID string
switch stat.Timers[i].Tags {
case "/http2/recv/header/loopyWriter/registerOutStream":
flowBeginID = filterCounter(stat, "/http2/recv/header", lwc.GetAndInc("/http2/recv/header/loopyWriter/registerOutStream"))
flowBeginPID = fmt.Sprintf("/%s/%d/loopyReader", stat.Tags, connectionCounter)
flowBeginTID = fmt.Sprintf("%d", loopyReaderGoID)
case "/http2/send/dataFrame/loopyWriter/preprocess":
flowBeginID = filterCounter(stat, "/transport/enqueue", lwc.GetAndInc("/http2/send/dataFrame/loopyWriter/preprocess"))
if flowBeginID != -1 {
flowBeginPID = opid
flowBeginTID = fmt.Sprintf("%d", stat.Timers[flowBeginID].GoId)
} else {
logger.Infof("cannot find /%d/transport/enqueue for /%d/http2/send/dataFrame/loopyWriter/preprocess", connectionCounter, connectionCounter)
}
default:
flowBeginID = -1
}
if flowBeginID != -1 {
flowID := fmt.Sprintf("lwc begin:/%d%s end:/%d%s begin:(%d, %s, %s) end:(%d, %s, %s)", connectionCounter, stat.Timers[flowBeginID].Tags, connectionCounter, stat.Timers[i].Tags, flowBeginID, flowBeginPID, flowBeginTID, i, pid, tid)
result = append(result,
jsonNode{
Name: fmt.Sprintf("/%s/%d/%d/flow", stat.Tags, connectionCounter, streamID),
Cat: categories + ",flow",
ID: flowID,
Cname: hashCname("flow"),
Phase: "s",
Timestamp: catapultNs(stat.Timers[flowBeginID].EndSec-baseSec, stat.Timers[flowBeginID].EndNsec-baseNsec),
PID: flowBeginPID,
TID: flowBeginTID,
},
jsonNode{
Name: fmt.Sprintf("/%s/%d/%d/flow", stat.Tags, connectionCounter, streamID),
Cat: categories + ",flow",
ID: flowID,
Cname: hashCname("flow"),
Phase: "f",
Timestamp: catapultNs(stat.Timers[i].BeginSec-baseSec, stat.Timers[i].BeginNsec-baseNsec),
PID: pid,
TID: tid,
},
)
}
}
result = append(result,
jsonNode{
Name: fmt.Sprintf("%s%s", opid, stat.Timers[i].Tags),
Cat: categories,
ID: opid,
Cname: hashCname(stat.Timers[i].Tags),
Phase: "B",
Timestamp: catapultNs(stat.Timers[i].BeginSec-baseSec, stat.Timers[i].BeginNsec-baseNsec),
PID: pid,
TID: tid,
},
jsonNode{
Name: fmt.Sprintf("%s%s", opid, stat.Timers[i].Tags),
Cat: categories,
ID: opid,
Cname: hashCname(stat.Timers[i].Tags),
Phase: "E",
Timestamp: catapultNs(stat.Timers[i].EndSec-baseSec, stat.Timers[i].EndNsec-baseNsec),
PID: pid,
TID: tid,
},
)
}
return result
}
// timerBeginIsBefore compares two proto Timer objects to determine if the
// first comes before the second chronologically.
func timerBeginIsBefore(ti *ppb.Timer, tj *ppb.Timer) bool {
if ti.BeginSec == tj.BeginSec {
return ti.BeginNsec < tj.BeginNsec
}
return ti.BeginSec < tj.BeginSec
}
// streamStatsCatapulJSON receives a *snapshot and the name of a JSON file to
// write to. The grpc-go profiling snapshot is processed and converted to a
// JSON format that can be understood by trace-viewer.
func streamStatsCatapultJSON(s *snapshot, streamStatsCatapultJSONFileName string) (err error) {
logger.Infof("calculating stream stats filters")
filterArray := strings.Split(*flagStreamStatsFilter, ",")
filter := make(map[string]bool)
for _, f := range filterArray {
filter[f] = true
}
logger.Infof("filter stream stats for %s", *flagStreamStatsFilter)
var streamStats []*ppb.Stat
for _, stat := range s.StreamStats {
if _, ok := filter[stat.Tags]; ok {
streamStats = append(streamStats, stat)
}
}
logger.Infof("sorting timers within all stats")
for id := range streamStats {
sort.Slice(streamStats[id].Timers, func(i, j int) bool {
return timerBeginIsBefore(streamStats[id].Timers[i], streamStats[id].Timers[j])
})
}
logger.Infof("sorting stream stats")
sort.Slice(streamStats, func(i, j int) bool {
if len(streamStats[j].Timers) == 0 {
return true
} else if len(streamStats[i].Timers) == 0 {
return false
}
pi := binary.BigEndian.Uint64(streamStats[i].Metadata[0:8])
pj := binary.BigEndian.Uint64(streamStats[j].Metadata[0:8])
if pi == pj {
return timerBeginIsBefore(streamStats[i].Timers[0], streamStats[j].Timers[0])
}
return pi < pj
})
// Clip the last stat as it's from the /Profiling/GetStreamStats call that we
// made to retrieve the stats themselves. This likely happened millions of
// nanoseconds after the last stream we want to profile, so it'd just make
// the catapult graph less readable.
if len(streamStats) > 0 {
streamStats = streamStats[:len(streamStats)-1]
}
// All timestamps use the earliest timestamp available as the reference.
logger.Infof("calculating the earliest timestamp across all timers")
var base *ppb.Timer
for _, stat := range streamStats {
for _, timer := range stat.Timers {
if base == nil || timerBeginIsBefore(base, timer) {
base = timer
}
}
}
logger.Infof("converting %d stats to catapult JSON format", len(streamStats))
var jsonNodes []jsonNode
for _, stat := range streamStats {
jsonNodes = append(jsonNodes, streamStatsCatapultJSONSingle(stat, base.BeginSec, base.BeginNsec)...)
}
logger.Infof("marshalling catapult JSON")
b, err := json.Marshal(jsonNodes)
if err != nil {
logger.Errorf("cannot marshal JSON: %v", err)
return err
}
logger.Infof("creating catapult JSON file")
streamStatsCatapultJSONFile, err := os.Create(streamStatsCatapultJSONFileName)
if err != nil {
logger.Errorf("cannot create file %s: %v", streamStatsCatapultJSONFileName, err)
return err
}
defer streamStatsCatapultJSONFile.Close()
logger.Infof("writing catapult JSON to disk")
_, err = streamStatsCatapultJSONFile.Write(b)
if err != nil {
logger.Errorf("cannot write marshalled JSON: %v", err)
return err
}
logger.Infof("successfully wrote catapult JSON file %s", streamStatsCatapultJSONFileName)
return nil
}