aggregator.go 4.08 KB
Newer Older
1 2 3 4 5 6 7 8 9
package statsd

import (
	"log"
	"sort"
	"sync"
	"time"
)

Kamil Kisiel's avatar
Kamil Kisiel committed
10 11
// metricAggregatorStats is a bookkeeping structure for statistics about a MetricAggregator
type metricAggregatorStats struct {
12 13 14 15
	BadLines       int
	LastMessage    time.Time
	LastFlush      time.Time
	LastFlushError time.Time
16 17
}

Kamil Kisiel's avatar
Kamil Kisiel committed
18
// MetricSender is an interface that can be implemented by objects which
Kamil Kisiel's avatar
Kamil Kisiel committed
19
// can provide metrics to a MetricAggregator
20 21 22 23
type MetricSender interface {
	SendMetrics(MetricMap) error
}

24
// The MetricSenderFunc type is an adapter to allow the use of ordinary functions as metric senders
25
type MetricSenderFunc func(MetricMap) error
26 27 28

// SendMetrics calls f(m)
func (f MetricSenderFunc) SendMetrics(m MetricMap) error {
29
	return f(m)
30 31
}

Kamil Kisiel's avatar
Kamil Kisiel committed
32 33 34 35
// MetricAggregator is an object that aggregates statsd metrics.
// The function NewMetricAggregator should be used to create the objects.
//
// Incoming metrics should be sent to the MetricChan channel.
36 37 38 39
type MetricAggregator struct {
	sync.Mutex
	MetricChan    chan Metric   // Channel on which metrics are received
	FlushInterval time.Duration // How often to flush metrics to the sender
Kamil Kisiel's avatar
Kamil Kisiel committed
40
	Sender        MetricSender  // The sender to which metrics are flushed
41 42 43 44
	Stats         metricAggregatorStats
	Counters      MetricMap
	Gauges        MetricMap
	Timers        MetricListMap
45 46
}

Kamil Kisiel's avatar
Kamil Kisiel committed
47
// NewMetricAggregator creates a new MetricAggregator object
48 49
func NewMetricAggregator(sender MetricSender, flushInterval time.Duration) MetricAggregator {
	a := MetricAggregator{}
50 51 52
	a.FlushInterval = flushInterval
	a.Sender = sender
	a.MetricChan = make(chan Metric)
53 54 55 56
	a.Counters = make(MetricMap)
	a.Gauges = make(MetricMap)
	a.Timers = make(MetricListMap)
	return a
57 58
}

Kamil Kisiel's avatar
Kamil Kisiel committed
59
// flush prepares the contents of a MetricAggregator for sending via the Sender
60 61 62 63
func (a *MetricAggregator) flush() (metrics MetricMap) {
	defer a.Unlock()
	a.Lock()

64 65 66
	metrics = make(MetricMap)
	numStats := 0

67
	for k, v := range a.Counters {
68
		perSecond := v / a.FlushInterval.Seconds()
69 70 71 72 73
		metrics["stats."+k] = perSecond
		metrics["stats_counts."+k] = v
		numStats += 1
	}

74
	for k, v := range a.Gauges {
75 76 77 78
		metrics["stats.gauges."+k] = v
		numStats += 1
	}

79
	for k, v := range a.Timers {
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
		if count := len(v); count > 0 {
			sort.Float64s(v)
			min := v[0]
			max := v[count-1]

			metrics["stats.timers."+k+".lower"] = min
			metrics["stats.timers."+k+".upper"] = max
			metrics["stats.timers."+k+".count"] = float64(count)
			numStats += 1
		}
	}
	metrics["statsd.numStats"] = float64(numStats)
	return metrics
}

Kamil Kisiel's avatar
Kamil Kisiel committed
95
// Reset clears the contents of a MetricAggregator
96
func (a *MetricAggregator) Reset() {
97 98 99
	defer a.Unlock()
	a.Lock()

100 101
	for k := range a.Counters {
		a.Counters[k] = 0
102 103
	}

104 105
	for k := range a.Timers {
		a.Timers[k] = []float64{}
106 107
	}

108
	// No reset for gauges, they keep the last value
109 110
}

Kamil Kisiel's avatar
Kamil Kisiel committed
111
// receiveMetric is called for each incoming metric on MetricChan
112 113 114 115 116 117
func (a *MetricAggregator) receiveMetric(m Metric) {
	defer a.Unlock()
	a.Lock()

	switch m.Type {
	case COUNTER:
118
		v, ok := a.Counters[m.Bucket]
119
		if ok {
120
			a.Counters[m.Bucket] = v + m.Value
121
		} else {
122
			a.Counters[m.Bucket] = m.Value
123 124
		}
	case GAUGE:
125
		a.Gauges[m.Bucket] = m.Value
126
	case TIMER:
127
		v, ok := a.Timers[m.Bucket]
128 129
		if ok {
			v = append(v, m.Value)
130
			a.Timers[m.Bucket] = v
131
		} else {
132
			a.Timers[m.Bucket] = []float64{m.Value}
133 134
		}
	case ERROR:
135
		a.Stats.BadLines += 1
136
	}
137
	a.Stats.LastMessage = time.Now()
138 139
}

Kamil Kisiel's avatar
Kamil Kisiel committed
140 141
// Aggregate starts the MetricAggregator so it begins consuming metrics from MetricChan
// and flushing them periodically via its Sender
142
func (a *MetricAggregator) Aggregate() {
143
	flushChan := make(chan error)
144
	flushTimer := time.NewTimer(a.FlushInterval)
145 146 147

	for {
		select {
148 149
		case metric := <-a.MetricChan: // Incoming metrics
			a.receiveMetric(metric)
150
		case <-flushTimer.C: // Time to flush to graphite
151
			flushed := a.flush()
152
			go func() {
153
				flushChan <- a.Sender.SendMetrics(flushed)
154
			}()
155 156
			a.Reset()
			flushTimer = time.NewTimer(a.FlushInterval)
157
		case flushResult := <-flushChan:
158
			a.Lock()
159 160 161

			if flushResult != nil {
				log.Printf("Sending metrics to Graphite failed: %s", flushResult)
162
				a.Stats.LastFlushError = time.Now()
163
			} else {
164
				a.Stats.LastFlush = time.Now()
165
			}
166
			a.Unlock()
167 168 169 170
		}
	}

}