Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • master
  • renovate/git.autistici.org-ai3-go-common-digest
  • renovate/github.com-miekg-dns-1.x
  • renovate/github.com-prometheus-client_golang-1.x
  • renovate/golang.org-x-crypto-0.x
  • renovate/golang.org-x-net-0.x
  • v2
  • v3
8 results

Target

Select target project
  • ai3/tools/acmeserver
  • godog/acmeserver
  • svp-bot/acmeserver
3 results
Select Git revision
  • lintian-fixes
  • master
  • renovate/github.com-miekg-dns-1.x
  • renovate/golang.org-x-crypto-digest
4 results
Show changes
Showing
with 152 additions and 1036 deletions
/*
Package idgenerator contains several Span and Trace ID generators which can be
used by the Zipkin tracer. Additional third party generators can be plugged in
if they adhere to the IDGenerator interface.
*/
package idgenerator
import (
"math/rand"
"sync"
"time"
"github.com/openzipkin/zipkin-go/model"
)
var (
seededIDGen = rand.New(rand.NewSource(time.Now().UnixNano()))
// NewSource returns a new pseudo-random Source seeded with the given value.
// Unlike the default Source used by top-level functions, this source is not
// safe for concurrent use by multiple goroutines. Hence the need for a mutex.
seededIDLock sync.Mutex
)
// IDGenerator interface can be used to provide the Zipkin Tracer with custom
// implementations to generate Span and Trace IDs.
type IDGenerator interface {
SpanID(traceID model.TraceID) model.ID // Generates a new Span ID
TraceID() model.TraceID // Generates a new Trace ID
}
// NewRandom64 returns an ID Generator which can generate 64 bit trace and span
// id's
func NewRandom64() IDGenerator {
return &randomID64{}
}
// NewRandom128 returns an ID Generator which can generate 128 bit trace and 64
// bit span id's
func NewRandom128() IDGenerator {
return &randomID128{}
}
// NewRandomTimestamped generates 128 bit time sortable traceid's and 64 bit
// spanid's.
func NewRandomTimestamped() IDGenerator {
return &randomTimestamped{}
}
// randomID64 can generate 64 bit traceid's and 64 bit spanid's.
type randomID64 struct{}
func (r *randomID64) TraceID() (id model.TraceID) {
seededIDLock.Lock()
id = model.TraceID{
Low: uint64(seededIDGen.Int63()),
}
seededIDLock.Unlock()
return
}
func (r *randomID64) SpanID(traceID model.TraceID) (id model.ID) {
if !traceID.Empty() {
return model.ID(traceID.Low)
}
seededIDLock.Lock()
id = model.ID(seededIDGen.Int63())
seededIDLock.Unlock()
return
}
// randomID128 can generate 128 bit traceid's and 64 bit spanid's.
type randomID128 struct{}
func (r *randomID128) TraceID() (id model.TraceID) {
seededIDLock.Lock()
id = model.TraceID{
High: uint64(seededIDGen.Int63()),
Low: uint64(seededIDGen.Int63()),
}
seededIDLock.Unlock()
return
}
func (r *randomID128) SpanID(traceID model.TraceID) (id model.ID) {
if !traceID.Empty() {
return model.ID(traceID.Low)
}
seededIDLock.Lock()
id = model.ID(seededIDGen.Int63())
seededIDLock.Unlock()
return
}
// randomTimestamped can generate 128 bit time sortable traceid's compatible
// with AWS X-Ray and 64 bit spanid's.
type randomTimestamped struct{}
func (t *randomTimestamped) TraceID() (id model.TraceID) {
seededIDLock.Lock()
id = model.TraceID{
High: uint64(time.Now().Unix()<<32) + uint64(seededIDGen.Int31()),
Low: uint64(seededIDGen.Int63()),
}
seededIDLock.Unlock()
return
}
func (t *randomTimestamped) SpanID(traceID model.TraceID) (id model.ID) {
if !traceID.Empty() {
return model.ID(traceID.Low)
}
seededIDLock.Lock()
id = model.ID(seededIDGen.Int63())
seededIDLock.Unlock()
return
}
// Copyright 2022 The OpenZipkin Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package model
import (
......
// Copyright 2022 The OpenZipkin Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/*
Package model contains the Zipkin V2 model which is used by the Zipkin Go
tracer implementation.
......
// Copyright 2022 The OpenZipkin Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package model
import "net"
import (
"encoding/json"
"net"
"strings"
)
// Endpoint holds the network context of a node in the service graph.
type Endpoint struct {
ServiceName string
IPv4 net.IP
IPv6 net.IP
Port uint16
}
// MarshalJSON exports our Endpoint into the correct format for the Zipkin V2 API.
func (e Endpoint) MarshalJSON() ([]byte, error) {
return json.Marshal(&struct {
ServiceName string `json:"serviceName,omitempty"`
IPv4 net.IP `json:"ipv4,omitempty"`
IPv6 net.IP `json:"ipv6,omitempty"`
Port uint16 `json:"port,omitempty"`
}{
strings.ToLower(e.ServiceName),
e.IPv4,
e.IPv6,
e.Port,
})
}
// Empty returns if all Endpoint properties are empty / unspecified.
......
// Copyright 2022 The OpenZipkin Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package model
// Kind clarifies context of timestamp, duration and remoteEndpoint in a span.
......
// Copyright 2022 The OpenZipkin Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package model
import (
"encoding/json"
"errors"
"strings"
"time"
)
......@@ -13,6 +28,25 @@ var (
ErrValidDurationRequired = errors.New("valid duration required")
)
// BaggageFields holds the interface for consumers needing to interact with
// the fields in application logic.
type BaggageFields interface {
// Get returns the values for a field identified by its key.
Get(key string) []string
// Add adds the provided values to a header designated by key. If not
// accepted by the baggage implementation, it will return false.
Add(key string, value ...string) bool
// Set sets the provided values to a header designated by key. If not
// accepted by the baggage implementation, it will return false.
Set(key string, value ...string) bool
// Delete removes the field data designated by key. If not accepted by the
// baggage implementation, it will return false.
Delete(key string) bool
// Iterate will iterate over the available fields and for each one it will
// trigger the callback function.
Iterate(f func(key string, values []string))
}
// SpanContext holds the context of a Span.
type SpanContext struct {
TraceID TraceID `json:"traceId"`
......@@ -21,6 +55,7 @@ type SpanContext struct {
Debug bool `json:"debug,omitempty"`
Sampled *bool `json:"-"`
Err error `json:"-"`
Baggage BaggageFields `json:"-"`
}
// SpanModel structure.
......@@ -73,6 +108,8 @@ func (s SpanModel) MarshalJSON() ([]byte, error) {
s.Duration += 500 * time.Nanosecond
}
s.Name = strings.ToLower(s.Name)
if s.LocalEndpoint.Empty() {
s.LocalEndpoint = nil
}
......
// Copyright 2022 The OpenZipkin Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package model
import (
......
// Copyright 2022 The OpenZipkin Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package model
import (
......
package zipkin
import (
"time"
"github.com/openzipkin/zipkin-go/model"
)
type noopSpan struct {
model.SpanContext
}
func (n *noopSpan) Context() model.SpanContext { return n.SpanContext }
func (n *noopSpan) SetName(string) {}
func (*noopSpan) SetRemoteEndpoint(*model.Endpoint) {}
func (*noopSpan) Annotate(time.Time, string) {}
func (*noopSpan) Tag(string, string) {}
func (*noopSpan) Finish() {}
func (*noopSpan) Flush() {}
/*
Package propagation holds the required function signatures for Injection and
Extraction as used by the Zipkin Tracer.
Subpackages of this package contain officially supported standard propagation
implementations.
*/
package propagation
import "github.com/openzipkin/zipkin-go/model"
// Extractor function signature
type Extractor func() (*model.SpanContext, error)
// Injector function signature
type Injector func(model.SpanContext) error
/*
Package http implements a HTTP reporter to send spans to Zipkin V2 collectors.
*/
package http
import (
"bytes"
"encoding/json"
"log"
"net/http"
"os"
"sync"
"time"
"github.com/openzipkin/zipkin-go/model"
"github.com/openzipkin/zipkin-go/reporter"
)
// defaults
const (
defaultTimeout = time.Second * 5 // timeout for http request in seconds
defaultBatchInterval = time.Second * 1 // BatchInterval in seconds
defaultBatchSize = 100
defaultMaxBacklog = 1000
)
// httpReporter will send spans to a Zipkin HTTP Collector using Zipkin V2 API.
type httpReporter struct {
url string
client *http.Client
logger *log.Logger
batchInterval time.Duration
batchSize int
maxBacklog int
sendMtx *sync.Mutex
batchMtx *sync.Mutex
batch []*model.SpanModel
spanC chan *model.SpanModel
quit chan struct{}
shutdown chan error
reqCallback RequestCallbackFn
}
// Send implements reporter
func (r *httpReporter) Send(s model.SpanModel) {
r.spanC <- &s
}
// Close implements reporter
func (r *httpReporter) Close() error {
close(r.quit)
return <-r.shutdown
}
func (r *httpReporter) loop() {
var (
nextSend = time.Now().Add(r.batchInterval)
ticker = time.NewTicker(r.batchInterval / 10)
tickerChan = ticker.C
)
defer ticker.Stop()
for {
select {
case span := <-r.spanC:
currentBatchSize := r.append(span)
if currentBatchSize >= r.batchSize {
nextSend = time.Now().Add(r.batchInterval)
go func() {
_ = r.sendBatch()
}()
}
case <-tickerChan:
if time.Now().After(nextSend) {
nextSend = time.Now().Add(r.batchInterval)
go func() {
_ = r.sendBatch()
}()
}
case <-r.quit:
r.shutdown <- r.sendBatch()
return
}
}
}
func (r *httpReporter) append(span *model.SpanModel) (newBatchSize int) {
r.batchMtx.Lock()
r.batch = append(r.batch, span)
if len(r.batch) > r.maxBacklog {
dispose := len(r.batch) - r.maxBacklog
r.logger.Printf("backlog too long, disposing %d spans", dispose)
r.batch = r.batch[dispose:]
}
newBatchSize = len(r.batch)
r.batchMtx.Unlock()
return
}
func (r *httpReporter) sendBatch() error {
// in order to prevent sending the same batch twice
r.sendMtx.Lock()
defer r.sendMtx.Unlock()
// Select all current spans in the batch to be sent
r.batchMtx.Lock()
sendBatch := r.batch[:]
r.batchMtx.Unlock()
if len(sendBatch) == 0 {
return nil
}
body, err := json.Marshal(sendBatch)
if err != nil {
r.logger.Printf("failed when marshalling the spans batch: %s\n", err.Error())
return err
}
req, err := http.NewRequest("POST", r.url, bytes.NewReader(body))
if err != nil {
r.logger.Printf("failed when creating the request: %s\n", err.Error())
return err
}
req.Header.Set("Content-Type", "application/json")
if r.reqCallback != nil {
r.reqCallback(req)
}
resp, err := r.client.Do(req)
if err != nil {
r.logger.Printf("failed to send the request: %s\n", err.Error())
return err
}
_ = resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode > 299 {
r.logger.Printf("failed the request with status code %d\n", resp.StatusCode)
}
// Remove sent spans from the batch even if they were not saved
r.batchMtx.Lock()
r.batch = r.batch[len(sendBatch):]
r.batchMtx.Unlock()
return nil
}
// RequestCallbackFn receives the initialized request from the Collector before
// sending it over the wire. This allows one to plug in additional headers or
// do other customization.
type RequestCallbackFn func(*http.Request)
// ReporterOption sets a parameter for the HTTP Reporter
type ReporterOption func(r *httpReporter)
// Timeout sets maximum timeout for http request.
func Timeout(duration time.Duration) ReporterOption {
return func(r *httpReporter) { r.client.Timeout = duration }
}
// BatchSize sets the maximum batch size, after which a collect will be
// triggered. The default batch size is 100 traces.
func BatchSize(n int) ReporterOption {
return func(r *httpReporter) { r.batchSize = n }
}
// MaxBacklog sets the maximum backlog size. When batch size reaches this
// threshold, spans from the beginning of the batch will be disposed.
func MaxBacklog(n int) ReporterOption {
return func(r *httpReporter) { r.maxBacklog = n }
}
// BatchInterval sets the maximum duration we will buffer traces before
// emitting them to the collector. The default batch interval is 1 second.
func BatchInterval(d time.Duration) ReporterOption {
return func(r *httpReporter) { r.batchInterval = d }
}
// Client sets a custom http client to use.
func Client(client *http.Client) ReporterOption {
return func(r *httpReporter) { r.client = client }
}
// RequestCallback registers a callback function to adjust the reporter
// *http.Request before it sends the request to Zipkin.
func RequestCallback(rc RequestCallbackFn) ReporterOption {
return func(r *httpReporter) { r.reqCallback = rc }
}
// NewReporter returns a new HTTP Reporter.
// url should be the endpoint to send the spans to, e.g.
// http://localhost:9411/api/v2/spans
func NewReporter(url string, opts ...ReporterOption) reporter.Reporter {
r := httpReporter{
url: url,
logger: log.New(os.Stderr, "", log.LstdFlags),
client: &http.Client{Timeout: defaultTimeout},
batchInterval: defaultBatchInterval,
batchSize: defaultBatchSize,
maxBacklog: defaultMaxBacklog,
batch: []*model.SpanModel{},
spanC: make(chan *model.SpanModel),
quit: make(chan struct{}, 1),
shutdown: make(chan error, 1),
sendMtx: &sync.Mutex{},
batchMtx: &sync.Mutex{},
}
for _, opt := range opts {
opt(&r)
}
go r.loop()
return &r
}
/*
Package reporter holds the Reporter interface which is used by the Zipkin
Tracer to send finished spans.
Subpackages of package reporter contain officially supported standard
reporter implementations.
*/
package reporter
import "github.com/openzipkin/zipkin-go/model"
// Reporter interface can be used to provide the Zipkin Tracer with custom
// implementations to publish Zipkin Span data.
type Reporter interface {
Send(model.SpanModel) // Send Span data to the reporter
Close() error // Close the reporter
}
type noopReporter struct{}
func (r *noopReporter) Send(model.SpanModel) {}
func (r *noopReporter) Close() error { return nil }
// NewNoopReporter returns a no-op Reporter implementation.
func NewNoopReporter() Reporter {
return &noopReporter{}
}
package zipkin
import (
"fmt"
"math"
"math/rand"
"sync"
"time"
)
// Sampler functions return if a Zipkin span should be sampled, based on its
// traceID.
type Sampler func(id uint64) bool
// NeverSample will always return false. If used by a service it will not allow
// the service to start traces but will still allow the service to participate
// in traces started upstream.
func NeverSample(_ uint64) bool { return false }
// AlwaysSample will always return true. If used by a service it will always start
// traces if no upstream trace has been propagated. If an incoming upstream trace
// is not sampled the service will adhere to this and only propagate the context.
func AlwaysSample(_ uint64) bool { return true }
// NewModuloSampler provides a generic type Sampler.
func NewModuloSampler(mod uint64) Sampler {
if mod < 2 {
return AlwaysSample
}
return func(id uint64) bool {
return (id % mod) == 0
}
}
// NewBoundarySampler is appropriate for high-traffic instrumentation who
// provision random trace ids, and make the sampling decision only once.
// It defends against nodes in the cluster selecting exactly the same ids.
func NewBoundarySampler(rate float64, salt int64) (Sampler, error) {
if rate == 0.0 {
return NeverSample, nil
}
if rate == 1.0 {
return AlwaysSample, nil
}
if rate < 0.0001 || rate > 1 {
return nil, fmt.Errorf("rate should be 0.0 or between 0.0001 and 1: was %f", rate)
}
var (
boundary = int64(rate * 10000)
usalt = uint64(salt)
)
return func(id uint64) bool {
return int64(math.Abs(float64(id^usalt)))%10000 < boundary
}, nil
}
// NewCountingSampler is appropriate for low-traffic instrumentation or
// those who do not provision random trace ids. It is not appropriate for
// collectors as the sampling decision isn't idempotent (consistent based
// on trace id).
func NewCountingSampler(rate float64) (Sampler, error) {
if rate == 0.0 {
return NeverSample, nil
}
if rate == 1.0 {
return AlwaysSample, nil
}
if rate < 0.01 || rate > 1 {
return nil, fmt.Errorf("rate should be 0.0 or between 0.01 and 1: was %f", rate)
}
var (
i = 0
outOf100 = int(rate*100 + math.Copysign(0.5, rate*100)) // for rounding float to int conversion instead of truncation
decisions = randomBitSet(100, outOf100, rand.New(rand.NewSource(time.Now().UnixNano())))
mtx = &sync.Mutex{}
)
return func(_ uint64) bool {
mtx.Lock()
result := decisions[i]
i++
if i == 100 {
i = 0
}
mtx.Unlock()
return result
}, nil
}
/**
* Reservoir sampling algorithm borrowed from Stack Overflow.
*
* http://stackoverflow.com/questions/12817946/generate-a-random-bitset-with-n-1s
*/
func randomBitSet(size int, cardinality int, rnd *rand.Rand) []bool {
result := make([]bool, size)
chosen := make([]int, cardinality)
var i int
for i = 0; i < cardinality; i++ {
chosen[i] = i
result[i] = true
}
for ; i < size; i++ {
j := rnd.Intn(i + 1)
if j < cardinality {
result[chosen[j]] = false
result[i] = true
chosen[j] = i
}
}
return result
}
package zipkin
import (
"time"
"github.com/openzipkin/zipkin-go/model"
)
// Span interface as returned by Tracer.StartSpan()
type Span interface {
// Context returns the Span's SpanContext.
Context() model.SpanContext
// SetName updates the Span's name.
SetName(string)
// SetRemoteEndpoint updates the Span's Remote Endpoint.
SetRemoteEndpoint(*model.Endpoint)
// Annotate adds a timed event to the Span.
Annotate(time.Time, string)
// Tag sets Tag with given key and value to the Span. If key already exists in
// the Span the value will be overridden except for error tags where the first
// value is persisted.
Tag(string, string)
// Finish the Span and send to Reporter. If DelaySend option was used at
// Span creation time, Finish will not send the Span to the Reporter. It then
// becomes the user's responsibility to get the Span reported (by using
// span.Flush).
Finish()
// Flush the Span to the Reporter (regardless of being finished or not).
// This can be used if the DelaySend SpanOption was set or when dealing with
// one-way RPC tracing where duration might not be measured.
Flush()
}
package zipkin
import (
"sync"
"sync/atomic"
"time"
"github.com/openzipkin/zipkin-go/model"
)
type spanImpl struct {
mtx sync.RWMutex
model.SpanModel
tracer *Tracer
mustCollect int32 // used as atomic bool (1 = true, 0 = false)
flushOnFinish bool
}
func (s *spanImpl) Context() model.SpanContext {
return s.SpanContext
}
func (s *spanImpl) SetName(name string) {
s.mtx.Lock()
s.Name = name
s.mtx.Unlock()
}
func (s *spanImpl) SetRemoteEndpoint(e *model.Endpoint) {
s.mtx.Lock()
if e == nil {
s.RemoteEndpoint = nil
} else {
s.RemoteEndpoint = &model.Endpoint{}
*s.RemoteEndpoint = *e
}
s.mtx.Unlock()
}
func (s *spanImpl) Annotate(t time.Time, value string) {
a := model.Annotation{
Timestamp: t,
Value: value,
}
s.mtx.Lock()
s.Annotations = append(s.Annotations, a)
s.mtx.Unlock()
}
func (s *spanImpl) Tag(key, value string) {
s.mtx.Lock()
if key == string(TagError) {
if _, found := s.Tags[key]; found {
s.mtx.Unlock()
return
}
}
s.Tags[key] = value
s.mtx.Unlock()
}
func (s *spanImpl) Finish() {
if atomic.CompareAndSwapInt32(&s.mustCollect, 1, 0) {
s.Duration = time.Since(s.Timestamp)
if s.flushOnFinish {
s.tracer.reporter.Send(s.SpanModel)
}
}
}
func (s *spanImpl) Flush() {
if s.SpanModel.Debug || (s.SpanModel.Sampled != nil && *s.SpanModel.Sampled) {
s.tracer.reporter.Send(s.SpanModel)
}
}
package zipkin
import (
"time"
"github.com/openzipkin/zipkin-go/model"
)
// SpanOption allows for functional options to adjust behavior and payload of
// the Span to be created with tracer.StartSpan().
type SpanOption func(t *Tracer, s *spanImpl)
// Kind sets the kind of the span being created..
func Kind(kind model.Kind) SpanOption {
return func(t *Tracer, s *spanImpl) {
s.Kind = kind
}
}
// Parent will use provided SpanContext as parent to the span being created.
func Parent(sc model.SpanContext) SpanOption {
return func(t *Tracer, s *spanImpl) {
if sc.Err != nil {
// encountered an extraction error
switch t.extractFailurePolicy {
case ExtractFailurePolicyRestart:
case ExtractFailurePolicyError:
panic(s.SpanContext.Err)
case ExtractFailurePolicyTagAndRestart:
s.Tags["error.extract"] = sc.Err.Error()
default:
panic(ErrInvalidExtractFailurePolicy)
}
/* don't use provided SpanContext, but restart trace */
return
}
s.SpanContext = sc
}
}
// StartTime uses a given start time for the span being created.
func StartTime(start time.Time) SpanOption {
return func(t *Tracer, s *spanImpl) {
s.Timestamp = start
}
}
// RemoteEndpoint sets the remote endpoint of the span being created.
func RemoteEndpoint(e *model.Endpoint) SpanOption {
return func(t *Tracer, s *spanImpl) {
s.RemoteEndpoint = e
}
}
// Tags sets initial tags for the span being created. If default tracer tags
// are present they will be overwritten on key collisions.
func Tags(tags map[string]string) SpanOption {
return func(t *Tracer, s *spanImpl) {
for k, v := range tags {
s.Tags[k] = v
}
}
}
// FlushOnFinish when set to false will disable span.Finish() to send the Span
// to the Reporter automatically (which is the default behavior). If set to
// false, having the Span be reported becomes the responsibility of the user.
// This is available if late tag data is expected to be only available after the
// required finish time of the Span.
func FlushOnFinish(b bool) SpanOption {
return func(t *Tracer, s *spanImpl) {
s.flushOnFinish = b
}
}
package zipkin
// Tag holds available types
type Tag string
// Common Tag values
const (
TagHTTPMethod Tag = "http.method"
TagHTTPPath Tag = "http.path"
TagHTTPUrl Tag = "http.url"
TagHTTPRoute Tag = "http.route"
TagHTTPStatusCode Tag = "http.status_code"
TagHTTPRequestSize Tag = "http.request.size"
TagHTTPResponseSize Tag = "http.response.size"
TagGRPCStatusCode Tag = "grpc.status_code"
TagSQLQuery Tag = "sql.query"
TagError Tag = "error"
)
// Set a standard Tag with a payload on provided Span.
func (t Tag) Set(s Span, value string) {
s.Tag(string(t), value)
}
package zipkin
import (
"context"
"sync/atomic"
"time"
"github.com/openzipkin/zipkin-go/idgenerator"
"github.com/openzipkin/zipkin-go/model"
"github.com/openzipkin/zipkin-go/propagation"
"github.com/openzipkin/zipkin-go/reporter"
)
// Tracer is our Zipkin tracer implementation. It should be initialized using
// the NewTracer method.
type Tracer struct {
defaultTags map[string]string
extractFailurePolicy ExtractFailurePolicy
sampler Sampler
generate idgenerator.IDGenerator
reporter reporter.Reporter
localEndpoint *model.Endpoint
noop int32 // used as atomic bool (1 = true, 0 = false)
sharedSpans bool
unsampledNoop bool
}
// NewTracer returns a new Zipkin Tracer.
func NewTracer(rep reporter.Reporter, opts ...TracerOption) (*Tracer, error) {
// set default tracer options
t := &Tracer{
defaultTags: make(map[string]string),
extractFailurePolicy: ExtractFailurePolicyRestart,
sampler: AlwaysSample,
generate: idgenerator.NewRandom64(),
reporter: rep,
localEndpoint: nil,
noop: 0,
sharedSpans: true,
unsampledNoop: false,
}
// if no reporter was provided we default to noop implementation.
if t.reporter == nil {
t.reporter = reporter.NewNoopReporter()
t.noop = 1
}
// process functional options
for _, opt := range opts {
if err := opt(t); err != nil {
return nil, err
}
}
return t, nil
}
// StartSpanFromContext creates and starts a span using the span found in
// context as parent. If no parent span is found a root span is created.
func (t *Tracer) StartSpanFromContext(ctx context.Context, name string, options ...SpanOption) (Span, context.Context) {
if parentSpan := SpanFromContext(ctx); parentSpan != nil {
options = append(options, Parent(parentSpan.Context()))
}
span := t.StartSpan(name, options...)
return span, NewContext(ctx, span)
}
// StartSpan creates and starts a span.
func (t *Tracer) StartSpan(name string, options ...SpanOption) Span {
if atomic.LoadInt32(&t.noop) == 1 {
return &noopSpan{}
}
s := &spanImpl{
SpanModel: model.SpanModel{
Kind: model.Undetermined,
Name: name,
LocalEndpoint: t.localEndpoint,
Annotations: make([]model.Annotation, 0),
Tags: make(map[string]string),
},
flushOnFinish: true,
tracer: t,
}
// add default tracer tags to span
for k, v := range t.defaultTags {
s.Tag(k, v)
}
// handle provided functional options
for _, option := range options {
option(t, s)
}
if s.TraceID.Empty() {
// create root span
s.SpanContext.TraceID = t.generate.TraceID()
s.SpanContext.ID = t.generate.SpanID(s.SpanContext.TraceID)
} else {
// valid parent context found
if t.sharedSpans && s.Kind == model.Server {
// join span
s.Shared = true
} else {
// regular child span
parentID := s.SpanContext.ID
s.SpanContext.ParentID = &parentID
s.SpanContext.ID = t.generate.SpanID(model.TraceID{})
}
}
if !s.SpanContext.Debug && s.Sampled == nil {
// deferred sampled context found, invoke sampler
sampled := t.sampler(s.SpanContext.TraceID.Low)
s.SpanContext.Sampled = &sampled
if sampled {
s.mustCollect = 1
}
} else {
if s.SpanContext.Debug || *s.Sampled {
s.mustCollect = 1
}
}
if t.unsampledNoop && s.mustCollect == 0 {
// trace not being sampled and noop requested
return &noopSpan{
SpanContext: s.SpanContext,
}
}
// add start time
if s.Timestamp.IsZero() {
s.Timestamp = time.Now()
}
return s
}
// Extract extracts a SpanContext using the provided Extractor function.
func (t *Tracer) Extract(extractor propagation.Extractor) (sc model.SpanContext) {
if atomic.LoadInt32(&t.noop) == 1 {
return
}
psc, err := extractor()
if psc != nil {
sc = *psc
}
sc.Err = err
return
}
// SetNoop allows for killswitch behavior. If set to true the tracer will return
// noopSpans and all data is dropped. This allows operators to stop tracing in
// risk scenarios. Set back to false to resume tracing.
func (t *Tracer) SetNoop(noop bool) {
if noop {
atomic.CompareAndSwapInt32(&t.noop, 0, 1)
} else {
atomic.CompareAndSwapInt32(&t.noop, 1, 0)
}
}
// LocalEndpoint returns a copy of the currently set local endpoint of the
// tracer instance.
func (t *Tracer) LocalEndpoint() *model.Endpoint {
if t.localEndpoint == nil {
return nil
}
ep := *t.localEndpoint
return &ep
}
package zipkin
import (
"errors"
"github.com/openzipkin/zipkin-go/idgenerator"
"github.com/openzipkin/zipkin-go/model"
)
// Tracer Option Errors
var (
ErrInvalidEndpoint = errors.New("requires valid local endpoint")
ErrInvalidExtractFailurePolicy = errors.New("invalid extract failure policy provided")
)
// ExtractFailurePolicy deals with Extraction errors
type ExtractFailurePolicy int
// ExtractFailurePolicyOptions
const (
ExtractFailurePolicyRestart ExtractFailurePolicy = iota
ExtractFailurePolicyError
ExtractFailurePolicyTagAndRestart
)
// TracerOption allows for functional options to adjust behavior of the Tracer
// to be created with NewTracer().
type TracerOption func(o *Tracer) error
// WithLocalEndpoint sets the local endpoint of the tracer.
func WithLocalEndpoint(e *model.Endpoint) TracerOption {
return func(o *Tracer) error {
if e == nil {
o.localEndpoint = nil
return nil
}
ep := *e
o.localEndpoint = &ep
return nil
}
}
// WithExtractFailurePolicy allows one to set the ExtractFailurePolicy.
func WithExtractFailurePolicy(p ExtractFailurePolicy) TracerOption {
return func(o *Tracer) error {
if p < 0 || p > ExtractFailurePolicyTagAndRestart {
return ErrInvalidExtractFailurePolicy
}
o.extractFailurePolicy = p
return nil
}
}
// WithNoopSpan if set to true will switch to a NoopSpan implementation
// if the trace is not sampled.
func WithNoopSpan(unsampledNoop bool) TracerOption {
return func(o *Tracer) error {
o.unsampledNoop = unsampledNoop
return nil
}
}
// WithSharedSpans allows to place client-side and server-side annotations
// for a RPC call in the same span (Zipkin V1 behavior) or different spans
// (more in line with other tracing solutions). By default this Tracer
// uses shared host spans (so client-side and server-side in the same span).
func WithSharedSpans(val bool) TracerOption {
return func(o *Tracer) error {
o.sharedSpans = val
return nil
}
}
// WithSampler allows one to set a Sampler function
func WithSampler(sampler Sampler) TracerOption {
return func(o *Tracer) error {
o.sampler = sampler
return nil
}
}
// WithTraceID128Bit if set to true will instruct the Tracer to start traces
// with 128 bit TraceID's. If set to false the Tracer will start traces with
// 64 bits.
func WithTraceID128Bit(val bool) TracerOption {
return func(o *Tracer) error {
if val {
o.generate = idgenerator.NewRandom128()
} else {
o.generate = idgenerator.NewRandom64()
}
return nil
}
}
// WithIDGenerator allows one to set a custom ID Generator
func WithIDGenerator(generator idgenerator.IDGenerator) TracerOption {
return func(o *Tracer) error {
o.generate = generator
return nil
}
}
// WithTags allows one to set default tags to be added to each created span
func WithTags(tags map[string]string) TracerOption {
return func(o *Tracer) error {
for k, v := range tags {
o.defaultTags[k] = v
}
return nil
}
}
// WithNoopTracer allows one to start the Tracer as Noop implementation.
func WithNoopTracer(tracerNoop bool) TracerOption {
return func(o *Tracer) error {
if tracerNoop {
o.noop = 1
} else {
o.noop = 0
}
return nil
}
}
command-line-arguments.test