| /* |
| * |
| * Copyright 2019 gRPC authors. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| |
| package main |
| |
| import ( |
| "encoding/binary" |
| "encoding/json" |
| "fmt" |
| "os" |
| "sort" |
| "strings" |
| |
| ppb "google.golang.org/grpc/profiling/proto" |
| ) |
| |
| type jsonNode struct { |
| Name string `json:"name"` |
| Cat string `json:"cat"` |
| ID string `json:"id"` |
| Cname string `json:"cname"` |
| Phase string `json:"ph"` |
| Timestamp float64 `json:"ts"` |
| PID string `json:"pid"` |
| TID string `json:"tid"` |
| } |
| |
| // Catapult does not allow specifying colours manually; a 20-odd predefined |
| // labels are used (that don't make much sense outside the context of |
| // Chromium). See this for more details: |
| // |
| // https://github.com/catapult-project/catapult/blob/bef344f7017fc9e04f7049d0f58af6d9ce9f4ab6/tracing/tracing/base/color_scheme.html#L29 |
| func hashCname(tag string) string { |
| if strings.Contains(tag, "encoding") { |
| return "rail_response" |
| } |
| |
| if strings.Contains(tag, "compression") { |
| return "cq_build_passed" |
| } |
| |
| if strings.Contains(tag, "transport") { |
| if strings.Contains(tag, "blocking") { |
| return "rail_animation" |
| } |
| return "good" |
| } |
| |
| if strings.Contains(tag, "header") { |
| return "cq_build_attempt_failed" |
| } |
| |
| if tag == "/" { |
| return "heap_dump_stack_frame" |
| } |
| |
| if strings.Contains(tag, "flow") || strings.Contains(tag, "tmp") { |
| return "heap_dump_stack_frame" |
| } |
| |
| return "" |
| } |
| |
| // filterCounter identifies the counter-th instance of a timer of the type |
| // `filter` within a Stat. This, in conjunction with the counter data structure |
| // defined below, is used to draw flows between linked loopy writer/reader |
| // events with application goroutine events in trace-viewer. This is possible |
| // because enqueues and dequeues are ordered -- that is, the first dequeue must |
| // be dequeueing the first enqueue operation. |
| func filterCounter(stat *ppb.Stat, filter string, counter int) int { |
| localCounter := 0 |
| for i := 0; i < len(stat.Timers); i++ { |
| if stat.Timers[i].Tags == filter { |
| if localCounter == counter { |
| return i |
| } |
| localCounter++ |
| } |
| } |
| |
| return -1 |
| } |
| |
| // counter is state object used to store and retrieve the number of timers of a |
| // particular type that have been seen. |
| type counter struct { |
| c map[string]int |
| } |
| |
| func newCounter() *counter { |
| return &counter{c: make(map[string]int)} |
| } |
| |
| func (c *counter) GetAndInc(s string) int { |
| ret := c.c[s] |
| c.c[s]++ |
| return ret |
| } |
| |
| func catapultNs(sec int64, nsec int32) float64 { |
| return float64((sec * 1000000000) + int64(nsec)) |
| } |
| |
| // streamStatsCatapultJSONSingle processes a single proto Stat object to return |
| // an array of jsonNodes in trace-viewer's format. |
| func streamStatsCatapultJSONSingle(stat *ppb.Stat, baseSec int64, baseNsec int32) []jsonNode { |
| if len(stat.Timers) == 0 { |
| return nil |
| } |
| |
| connectionCounter := binary.BigEndian.Uint64(stat.Metadata[0:8]) |
| streamID := binary.BigEndian.Uint32(stat.Metadata[8:12]) |
| opid := fmt.Sprintf("/%s/%d/%d", stat.Tags, connectionCounter, streamID) |
| |
| var loopyReaderGoID, loopyWriterGoID int64 |
| for i := 0; i < len(stat.Timers) && (loopyReaderGoID == 0 || loopyWriterGoID == 0); i++ { |
| if strings.Contains(stat.Timers[i].Tags, "/loopyReader") { |
| loopyReaderGoID = stat.Timers[i].GoId |
| } else if strings.Contains(stat.Timers[i].Tags, "/loopyWriter") { |
| loopyWriterGoID = stat.Timers[i].GoId |
| } |
| } |
| |
| lrc, lwc := newCounter(), newCounter() |
| |
| var result []jsonNode |
| result = append(result, |
| jsonNode{ |
| Name: "loopyReaderTmp", |
| ID: opid, |
| Cname: hashCname("tmp"), |
| Phase: "i", |
| Timestamp: 0, |
| PID: fmt.Sprintf("/%s/%d/loopyReader", stat.Tags, connectionCounter), |
| TID: fmt.Sprintf("%d", loopyReaderGoID), |
| }, |
| jsonNode{ |
| Name: "loopyWriterTmp", |
| ID: opid, |
| Cname: hashCname("tmp"), |
| Phase: "i", |
| Timestamp: 0, |
| PID: fmt.Sprintf("/%s/%d/loopyWriter", stat.Tags, connectionCounter), |
| TID: fmt.Sprintf("%d", loopyWriterGoID), |
| }, |
| ) |
| |
| for i := 0; i < len(stat.Timers); i++ { |
| categories := stat.Tags |
| pid, tid := opid, fmt.Sprintf("%d", stat.Timers[i].GoId) |
| |
| if stat.Timers[i].GoId == loopyReaderGoID { |
| pid, tid = fmt.Sprintf("/%s/%d/loopyReader", stat.Tags, connectionCounter), fmt.Sprintf("%d", stat.Timers[i].GoId) |
| |
| var flowEndID int |
| var flowEndPID, flowEndTID string |
| switch stat.Timers[i].Tags { |
| case "/http2/recv/header": |
| flowEndID = filterCounter(stat, "/grpc/stream/recv/header", lrc.GetAndInc("/http2/recv/header")) |
| if flowEndID != -1 { |
| flowEndPID = opid |
| flowEndTID = fmt.Sprintf("%d", stat.Timers[flowEndID].GoId) |
| } else { |
| logger.Infof("cannot find %s/grpc/stream/recv/header for %s/http2/recv/header", opid, opid) |
| } |
| case "/http2/recv/dataFrame/loopyReader": |
| flowEndID = filterCounter(stat, "/recvAndDecompress", lrc.GetAndInc("/http2/recv/dataFrame/loopyReader")) |
| if flowEndID != -1 { |
| flowEndPID = opid |
| flowEndTID = fmt.Sprintf("%d", stat.Timers[flowEndID].GoId) |
| } else { |
| logger.Infof("cannot find %s/recvAndDecompress for %s/http2/recv/dataFrame/loopyReader", opid, opid) |
| } |
| default: |
| flowEndID = -1 |
| } |
| |
| if flowEndID != -1 { |
| flowID := fmt.Sprintf("lrc begin:/%d%s end:/%d%s begin:(%d, %s, %s) end:(%d, %s, %s)", connectionCounter, stat.Timers[i].Tags, connectionCounter, stat.Timers[flowEndID].Tags, i, pid, tid, flowEndID, flowEndPID, flowEndTID) |
| result = append(result, |
| jsonNode{ |
| Name: fmt.Sprintf("%s/flow", opid), |
| Cat: categories + ",flow", |
| ID: flowID, |
| Cname: hashCname("flow"), |
| Phase: "s", |
| Timestamp: catapultNs(stat.Timers[i].EndSec-baseSec, stat.Timers[i].EndNsec-baseNsec), |
| PID: pid, |
| TID: tid, |
| }, |
| jsonNode{ |
| Name: fmt.Sprintf("%s/flow", opid), |
| Cat: categories + ",flow", |
| ID: flowID, |
| Cname: hashCname("flow"), |
| Phase: "f", |
| Timestamp: catapultNs(stat.Timers[flowEndID].BeginSec-baseSec, stat.Timers[flowEndID].BeginNsec-baseNsec), |
| PID: flowEndPID, |
| TID: flowEndTID, |
| }, |
| ) |
| } |
| } else if stat.Timers[i].GoId == loopyWriterGoID { |
| pid, tid = fmt.Sprintf("/%s/%d/loopyWriter", stat.Tags, connectionCounter), fmt.Sprintf("%d", stat.Timers[i].GoId) |
| |
| var flowBeginID int |
| var flowBeginPID, flowBeginTID string |
| switch stat.Timers[i].Tags { |
| case "/http2/recv/header/loopyWriter/registerOutStream": |
| flowBeginID = filterCounter(stat, "/http2/recv/header", lwc.GetAndInc("/http2/recv/header/loopyWriter/registerOutStream")) |
| flowBeginPID = fmt.Sprintf("/%s/%d/loopyReader", stat.Tags, connectionCounter) |
| flowBeginTID = fmt.Sprintf("%d", loopyReaderGoID) |
| case "/http2/send/dataFrame/loopyWriter/preprocess": |
| flowBeginID = filterCounter(stat, "/transport/enqueue", lwc.GetAndInc("/http2/send/dataFrame/loopyWriter/preprocess")) |
| if flowBeginID != -1 { |
| flowBeginPID = opid |
| flowBeginTID = fmt.Sprintf("%d", stat.Timers[flowBeginID].GoId) |
| } else { |
| logger.Infof("cannot find /%d/transport/enqueue for /%d/http2/send/dataFrame/loopyWriter/preprocess", connectionCounter, connectionCounter) |
| } |
| default: |
| flowBeginID = -1 |
| } |
| |
| if flowBeginID != -1 { |
| flowID := fmt.Sprintf("lwc begin:/%d%s end:/%d%s begin:(%d, %s, %s) end:(%d, %s, %s)", connectionCounter, stat.Timers[flowBeginID].Tags, connectionCounter, stat.Timers[i].Tags, flowBeginID, flowBeginPID, flowBeginTID, i, pid, tid) |
| result = append(result, |
| jsonNode{ |
| Name: fmt.Sprintf("/%s/%d/%d/flow", stat.Tags, connectionCounter, streamID), |
| Cat: categories + ",flow", |
| ID: flowID, |
| Cname: hashCname("flow"), |
| Phase: "s", |
| Timestamp: catapultNs(stat.Timers[flowBeginID].EndSec-baseSec, stat.Timers[flowBeginID].EndNsec-baseNsec), |
| PID: flowBeginPID, |
| TID: flowBeginTID, |
| }, |
| jsonNode{ |
| Name: fmt.Sprintf("/%s/%d/%d/flow", stat.Tags, connectionCounter, streamID), |
| Cat: categories + ",flow", |
| ID: flowID, |
| Cname: hashCname("flow"), |
| Phase: "f", |
| Timestamp: catapultNs(stat.Timers[i].BeginSec-baseSec, stat.Timers[i].BeginNsec-baseNsec), |
| PID: pid, |
| TID: tid, |
| }, |
| ) |
| } |
| } |
| |
| result = append(result, |
| jsonNode{ |
| Name: fmt.Sprintf("%s%s", opid, stat.Timers[i].Tags), |
| Cat: categories, |
| ID: opid, |
| Cname: hashCname(stat.Timers[i].Tags), |
| Phase: "B", |
| Timestamp: catapultNs(stat.Timers[i].BeginSec-baseSec, stat.Timers[i].BeginNsec-baseNsec), |
| PID: pid, |
| TID: tid, |
| }, |
| jsonNode{ |
| Name: fmt.Sprintf("%s%s", opid, stat.Timers[i].Tags), |
| Cat: categories, |
| ID: opid, |
| Cname: hashCname(stat.Timers[i].Tags), |
| Phase: "E", |
| Timestamp: catapultNs(stat.Timers[i].EndSec-baseSec, stat.Timers[i].EndNsec-baseNsec), |
| PID: pid, |
| TID: tid, |
| }, |
| ) |
| } |
| |
| return result |
| } |
| |
| // timerBeginIsBefore compares two proto Timer objects to determine if the |
| // first comes before the second chronologically. |
| func timerBeginIsBefore(ti *ppb.Timer, tj *ppb.Timer) bool { |
| if ti.BeginSec == tj.BeginSec { |
| return ti.BeginNsec < tj.BeginNsec |
| } |
| return ti.BeginSec < tj.BeginSec |
| } |
| |
| // streamStatsCatapulJSON receives a *snapshot and the name of a JSON file to |
| // write to. The grpc-go profiling snapshot is processed and converted to a |
| // JSON format that can be understood by trace-viewer. |
| func streamStatsCatapultJSON(s *snapshot, streamStatsCatapultJSONFileName string) (err error) { |
| logger.Infof("calculating stream stats filters") |
| filterArray := strings.Split(*flagStreamStatsFilter, ",") |
| filter := make(map[string]bool) |
| for _, f := range filterArray { |
| filter[f] = true |
| } |
| |
| logger.Infof("filter stream stats for %s", *flagStreamStatsFilter) |
| var streamStats []*ppb.Stat |
| for _, stat := range s.StreamStats { |
| if _, ok := filter[stat.Tags]; ok { |
| streamStats = append(streamStats, stat) |
| } |
| } |
| |
| logger.Infof("sorting timers within all stats") |
| for id := range streamStats { |
| sort.Slice(streamStats[id].Timers, func(i, j int) bool { |
| return timerBeginIsBefore(streamStats[id].Timers[i], streamStats[id].Timers[j]) |
| }) |
| } |
| |
| logger.Infof("sorting stream stats") |
| sort.Slice(streamStats, func(i, j int) bool { |
| if len(streamStats[j].Timers) == 0 { |
| return true |
| } else if len(streamStats[i].Timers) == 0 { |
| return false |
| } |
| pi := binary.BigEndian.Uint64(streamStats[i].Metadata[0:8]) |
| pj := binary.BigEndian.Uint64(streamStats[j].Metadata[0:8]) |
| if pi == pj { |
| return timerBeginIsBefore(streamStats[i].Timers[0], streamStats[j].Timers[0]) |
| } |
| |
| return pi < pj |
| }) |
| |
| // Clip the last stat as it's from the /Profiling/GetStreamStats call that we |
| // made to retrieve the stats themselves. This likely happened millions of |
| // nanoseconds after the last stream we want to profile, so it'd just make |
| // the catapult graph less readable. |
| if len(streamStats) > 0 { |
| streamStats = streamStats[:len(streamStats)-1] |
| } |
| |
| // All timestamps use the earliest timestamp available as the reference. |
| logger.Infof("calculating the earliest timestamp across all timers") |
| var base *ppb.Timer |
| for _, stat := range streamStats { |
| for _, timer := range stat.Timers { |
| if base == nil || timerBeginIsBefore(base, timer) { |
| base = timer |
| } |
| } |
| } |
| |
| logger.Infof("converting %d stats to catapult JSON format", len(streamStats)) |
| var jsonNodes []jsonNode |
| for _, stat := range streamStats { |
| jsonNodes = append(jsonNodes, streamStatsCatapultJSONSingle(stat, base.BeginSec, base.BeginNsec)...) |
| } |
| |
| logger.Infof("marshalling catapult JSON") |
| b, err := json.Marshal(jsonNodes) |
| if err != nil { |
| logger.Errorf("cannot marshal JSON: %v", err) |
| return err |
| } |
| |
| logger.Infof("creating catapult JSON file") |
| streamStatsCatapultJSONFile, err := os.Create(streamStatsCatapultJSONFileName) |
| if err != nil { |
| logger.Errorf("cannot create file %s: %v", streamStatsCatapultJSONFileName, err) |
| return err |
| } |
| defer streamStatsCatapultJSONFile.Close() |
| |
| logger.Infof("writing catapult JSON to disk") |
| _, err = streamStatsCatapultJSONFile.Write(b) |
| if err != nil { |
| logger.Errorf("cannot write marshalled JSON: %v", err) |
| return err |
| } |
| |
| logger.Infof("successfully wrote catapult JSON file %s", streamStatsCatapultJSONFileName) |
| return nil |
| } |