| // Copyright 2018, OpenCensus Authors |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| // Package jaeger contains an OpenCensus tracing exporter for Jaeger. |
| package jaeger // import "go.opencensus.io/exporter/jaeger" |
| |
| import ( |
| "bytes" |
| "encoding/binary" |
| "errors" |
| "fmt" |
| "io" |
| "io/ioutil" |
| "log" |
| "net/http" |
| |
| "github.com/apache/thrift/lib/go/thrift" |
| gen "go.opencensus.io/exporter/jaeger/internal/gen-go/jaeger" |
| "go.opencensus.io/trace" |
| "google.golang.org/api/support/bundler" |
| ) |
| |
| const defaultServiceName = "OpenCensus" |
| |
| // Options are the options to be used when initializing a Jaeger exporter. |
| type Options struct { |
| // Endpoint is the Jaeger HTTP Thrift endpoint. |
| // For example, http://localhost:14268. |
| // |
| // Deprecated: Use CollectorEndpoint instead. |
| Endpoint string |
| |
| // CollectorEndpoint is the full url to the Jaeger HTTP Thrift collector. |
| // For example, http://localhost:14268/api/traces |
| CollectorEndpoint string |
| |
| // AgentEndpoint instructs exporter to send spans to jaeger-agent at this address. |
| // For example, localhost:6831. |
| AgentEndpoint string |
| |
| // OnError is the hook to be called when there is |
| // an error occurred when uploading the stats data. |
| // If no custom hook is set, errors are logged. |
| // Optional. |
| OnError func(err error) |
| |
| // Username to be used if basic auth is required. |
| // Optional. |
| Username string |
| |
| // Password to be used if basic auth is required. |
| // Optional. |
| Password string |
| |
| // ServiceName is the Jaeger service name. |
| // Deprecated: Specify Process instead. |
| ServiceName string |
| |
| // Process contains the information about the exporting process. |
| Process Process |
| |
| //BufferMaxCount defines the total number of traces that can be buffered in memory |
| BufferMaxCount int |
| } |
| |
| // NewExporter returns a trace.Exporter implementation that exports |
| // the collected spans to Jaeger. |
| func NewExporter(o Options) (*Exporter, error) { |
| if o.Endpoint == "" && o.CollectorEndpoint == "" && o.AgentEndpoint == "" { |
| return nil, errors.New("missing endpoint for Jaeger exporter") |
| } |
| |
| var endpoint string |
| var client *agentClientUDP |
| var err error |
| if o.Endpoint != "" { |
| endpoint = o.Endpoint + "/api/traces?format=jaeger.thrift" |
| log.Printf("Endpoint has been deprecated. Please use CollectorEndpoint instead.") |
| } else if o.CollectorEndpoint != "" { |
| endpoint = o.CollectorEndpoint |
| } else { |
| client, err = newAgentClientUDP(o.AgentEndpoint, udpPacketMaxLength) |
| if err != nil { |
| return nil, err |
| } |
| } |
| onError := func(err error) { |
| if o.OnError != nil { |
| o.OnError(err) |
| return |
| } |
| log.Printf("Error when uploading spans to Jaeger: %v", err) |
| } |
| service := o.Process.ServiceName |
| if service == "" && o.ServiceName != "" { |
| // fallback to old service name if specified |
| service = o.ServiceName |
| } else if service == "" { |
| service = defaultServiceName |
| } |
| tags := make([]*gen.Tag, len(o.Process.Tags)) |
| for i, tag := range o.Process.Tags { |
| tags[i] = attributeToTag(tag.key, tag.value) |
| } |
| e := &Exporter{ |
| endpoint: endpoint, |
| agentEndpoint: o.AgentEndpoint, |
| client: client, |
| username: o.Username, |
| password: o.Password, |
| process: &gen.Process{ |
| ServiceName: service, |
| Tags: tags, |
| }, |
| } |
| bundler := bundler.NewBundler((*gen.Span)(nil), func(bundle interface{}) { |
| if err := e.upload(bundle.([]*gen.Span)); err != nil { |
| onError(err) |
| } |
| }) |
| |
| // Set BufferedByteLimit with the total number of spans that are permissible to be held in memory. |
| // This needs to be done since the size of messages is always set to 1. Failing to set this would allow |
| // 1G messages to be held in memory since that is the default value of BufferedByteLimit. |
| if o.BufferMaxCount != 0 { |
| bundler.BufferedByteLimit = o.BufferMaxCount |
| } |
| |
| e.bundler = bundler |
| return e, nil |
| } |
| |
| // Process contains the information exported to jaeger about the source |
| // of the trace data. |
| type Process struct { |
| // ServiceName is the Jaeger service name. |
| ServiceName string |
| |
| // Tags are added to Jaeger Process exports |
| Tags []Tag |
| } |
| |
| // Tag defines a key-value pair |
| // It is limited to the possible conversions to *jaeger.Tag by attributeToTag |
| type Tag struct { |
| key string |
| value interface{} |
| } |
| |
| // BoolTag creates a new tag of type bool, exported as jaeger.TagType_BOOL |
| func BoolTag(key string, value bool) Tag { |
| return Tag{key, value} |
| } |
| |
| // StringTag creates a new tag of type string, exported as jaeger.TagType_STRING |
| func StringTag(key string, value string) Tag { |
| return Tag{key, value} |
| } |
| |
| // Int64Tag creates a new tag of type int64, exported as jaeger.TagType_LONG |
| func Int64Tag(key string, value int64) Tag { |
| return Tag{key, value} |
| } |
| |
| // Exporter is an implementation of trace.Exporter that uploads spans to Jaeger. |
| type Exporter struct { |
| endpoint string |
| agentEndpoint string |
| process *gen.Process |
| bundler *bundler.Bundler |
| client *agentClientUDP |
| |
| username, password string |
| } |
| |
| var _ trace.Exporter = (*Exporter)(nil) |
| |
| // ExportSpan exports a SpanData to Jaeger. |
| func (e *Exporter) ExportSpan(data *trace.SpanData) { |
| e.bundler.Add(spanDataToThrift(data), 1) |
| // TODO(jbd): Handle oversized bundlers. |
| } |
| |
| // As per the OpenCensus Status code mapping in |
| // https://opencensus.io/tracing/span/status/ |
| // the status is OK if the code is 0. |
| const opencensusStatusCodeOK = 0 |
| |
| func spanDataToThrift(data *trace.SpanData) *gen.Span { |
| tags := make([]*gen.Tag, 0, len(data.Attributes)) |
| for k, v := range data.Attributes { |
| tag := attributeToTag(k, v) |
| if tag != nil { |
| tags = append(tags, tag) |
| } |
| } |
| |
| tags = append(tags, |
| attributeToTag("status.code", data.Status.Code), |
| attributeToTag("status.message", data.Status.Message), |
| ) |
| |
| // Ensure that if Status.Code is not OK, that we set the "error" tag on the Jaeger span. |
| // See Issue https://github.com/census-instrumentation/opencensus-go/issues/1041 |
| if data.Status.Code != opencensusStatusCodeOK { |
| tags = append(tags, attributeToTag("error", true)) |
| } |
| |
| var logs []*gen.Log |
| for _, a := range data.Annotations { |
| fields := make([]*gen.Tag, 0, len(a.Attributes)) |
| for k, v := range a.Attributes { |
| tag := attributeToTag(k, v) |
| if tag != nil { |
| fields = append(fields, tag) |
| } |
| } |
| fields = append(fields, attributeToTag("message", a.Message)) |
| logs = append(logs, &gen.Log{ |
| Timestamp: a.Time.UnixNano() / 1000, |
| Fields: fields, |
| }) |
| } |
| var refs []*gen.SpanRef |
| for _, link := range data.Links { |
| refs = append(refs, &gen.SpanRef{ |
| TraceIdHigh: bytesToInt64(link.TraceID[0:8]), |
| TraceIdLow: bytesToInt64(link.TraceID[8:16]), |
| SpanId: bytesToInt64(link.SpanID[:]), |
| }) |
| } |
| return &gen.Span{ |
| TraceIdHigh: bytesToInt64(data.TraceID[0:8]), |
| TraceIdLow: bytesToInt64(data.TraceID[8:16]), |
| SpanId: bytesToInt64(data.SpanID[:]), |
| ParentSpanId: bytesToInt64(data.ParentSpanID[:]), |
| OperationName: name(data), |
| Flags: int32(data.TraceOptions), |
| StartTime: data.StartTime.UnixNano() / 1000, |
| Duration: data.EndTime.Sub(data.StartTime).Nanoseconds() / 1000, |
| Tags: tags, |
| Logs: logs, |
| References: refs, |
| } |
| } |
| |
| func name(sd *trace.SpanData) string { |
| n := sd.Name |
| switch sd.SpanKind { |
| case trace.SpanKindClient: |
| n = "Sent." + n |
| case trace.SpanKindServer: |
| n = "Recv." + n |
| } |
| return n |
| } |
| |
| func attributeToTag(key string, a interface{}) *gen.Tag { |
| var tag *gen.Tag |
| switch value := a.(type) { |
| case bool: |
| tag = &gen.Tag{ |
| Key: key, |
| VBool: &value, |
| VType: gen.TagType_BOOL, |
| } |
| case string: |
| tag = &gen.Tag{ |
| Key: key, |
| VStr: &value, |
| VType: gen.TagType_STRING, |
| } |
| case int64: |
| tag = &gen.Tag{ |
| Key: key, |
| VLong: &value, |
| VType: gen.TagType_LONG, |
| } |
| case int32: |
| v := int64(value) |
| tag = &gen.Tag{ |
| Key: key, |
| VLong: &v, |
| VType: gen.TagType_LONG, |
| } |
| case float64: |
| v := float64(value) |
| tag = &gen.Tag{ |
| Key: key, |
| VDouble: &v, |
| VType: gen.TagType_DOUBLE, |
| } |
| } |
| return tag |
| } |
| |
| // Flush waits for exported trace spans to be uploaded. |
| // |
| // This is useful if your program is ending and you do not want to lose recent spans. |
| func (e *Exporter) Flush() { |
| e.bundler.Flush() |
| } |
| |
| func (e *Exporter) upload(spans []*gen.Span) error { |
| batch := &gen.Batch{ |
| Spans: spans, |
| Process: e.process, |
| } |
| if e.endpoint != "" { |
| return e.uploadCollector(batch) |
| } |
| return e.uploadAgent(batch) |
| } |
| |
| func (e *Exporter) uploadAgent(batch *gen.Batch) error { |
| return e.client.EmitBatch(batch) |
| } |
| |
| func (e *Exporter) uploadCollector(batch *gen.Batch) error { |
| body, err := serialize(batch) |
| if err != nil { |
| return err |
| } |
| req, err := http.NewRequest("POST", e.endpoint, body) |
| if err != nil { |
| return err |
| } |
| if e.username != "" && e.password != "" { |
| req.SetBasicAuth(e.username, e.password) |
| } |
| req.Header.Set("Content-Type", "application/x-thrift") |
| |
| resp, err := http.DefaultClient.Do(req) |
| if err != nil { |
| return err |
| } |
| |
| io.Copy(ioutil.Discard, resp.Body) |
| resp.Body.Close() |
| |
| if resp.StatusCode < 200 || resp.StatusCode >= 300 { |
| return fmt.Errorf("failed to upload traces; HTTP status code: %d", resp.StatusCode) |
| } |
| return nil |
| } |
| |
| func serialize(obj thrift.TStruct) (*bytes.Buffer, error) { |
| buf := thrift.NewTMemoryBuffer() |
| if err := obj.Write(thrift.NewTBinaryProtocolTransport(buf)); err != nil { |
| return nil, err |
| } |
| return buf.Buffer, nil |
| } |
| |
| func bytesToInt64(buf []byte) int64 { |
| u := binary.BigEndian.Uint64(buf) |
| return int64(u) |
| } |