gostatsd.go 6.43 KB
Newer Older
1 2 3 4 5
package main

import (
	"fmt"
	"log"
Kamil Kisiel's avatar
Kamil Kisiel committed
6 7
	"math"
	"net"
8
	"regexp"
Kamil Kisiel's avatar
Kamil Kisiel committed
9
	"sort"
10
	"strconv"
Kamil Kisiel's avatar
Kamil Kisiel committed
11 12
	"strings"
	"time"
13 14
)

Kamil Kisiel's avatar
Kamil Kisiel committed
15 16 17 18 19 20 21 22 23 24

var flushInterval time.Duration
var graphiteServer string
var percentThresholds []float64

func init() {
	flushInterval = 10 * time.Second
	graphiteServer = "localhost:1234"
	percentThresholds = []float64{90.0}
}
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58

type MetricType int

// Enumeration, see http://golang.org/doc/effective_go.html#constants
const (
	_ = iota
	COUNTER MetricType = 1 << (10 * iota)
	TIMER
	GAUGE
)

func (m MetricType) String() string {
	switch {
	case m >= GAUGE:
		return "gauge"
	case m >= TIMER:
		return "timer"
	case m >= COUNTER:
		return "counter"
	}
	return "unknown"
}

type Metric struct {
	Type MetricType
	Bucket string
	Value float64
}

func (m Metric) String() string {
	return fmt.Sprintf("{%s, %s, %f}", m.Type, m.Bucket, m.Value)
}

func main () {
Kamil Kisiel's avatar
Kamil Kisiel committed
59 60 61
	var metricChan = make(chan Metric)
	go metricListener(metricChan)
	go metricAggregator(metricChan)
62 63 64 65
	// Run forever
	select {}
}

Kamil Kisiel's avatar
Kamil Kisiel committed
66
func metricListener(metricChan chan Metric) {
67 68 69 70 71 72 73 74 75
	conn, err := net.ListenPacket("udp", ":8125")
	if err != nil {
		log.Fatal(err)
		return
	}
	msg := make([]byte, 1024)
	for {
		nbytes, _, err := conn.ReadFrom(msg)
		if err != nil {
Kamil Kisiel's avatar
Kamil Kisiel committed
76
			log.Printf("%s", err)
77 78
			continue
		}
Kamil Kisiel's avatar
Kamil Kisiel committed
79
		go handleMessage(metricChan, string(msg[:nbytes]))
80 81 82 83 84 85
	}
}

type MetricMap map[string]float64
type MetricListMap map[string][]float64

Kamil Kisiel's avatar
Kamil Kisiel committed
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
func round(v float64) float64 {
	return math.Floor(v + 0.5)
}

func average(vals []float64) float64 {
	sum := 0.0
	for _, v := range vals {
		sum += v
	}
	return sum / float64(len(vals))
}

func thresholdStats(vals []float64, threshold float64) (mean, upper float64) {
	if count := len(vals); count > 1 {
		idx := int(round(((100 - threshold) / 100) * float64(count)))
		thresholdCount := count - idx
		thresholdValues := vals[:thresholdCount]

		mean = average(thresholdValues)
		upper = thresholdValues[len(thresholdValues) - 1]
	} else {
		mean = vals[0]
		upper = vals[0]
	}
	return mean, upper
}

113
func flushMetrics(counters MetricMap, gauges MetricMap, timers MetricListMap, flushInterval time.Duration) {
Kamil Kisiel's avatar
Kamil Kisiel committed
114
	conn, err := net.Dial("tcp", graphiteServer)
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
	if err != nil {
		log.Printf("Could not contact Graphite server")
		return
	}
	defer conn.Close()

	numStats := 0
	now := time.Now().Unix()

	for k, v := range counters {
		perSecond := v / flushInterval.Seconds()
		fmt.Fprintf(conn, "stats.%s %f %d\n", k, perSecond, now)
		fmt.Fprintf(conn, "stats_counts.%s %f %d\n", k, v, now)
		numStats += 1
	}

	for k, v := range gauges {
		fmt.Fprintf(conn, "stats.gauges.%s %f %d\n", k, v, now)
		numStats += 1
	}

Kamil Kisiel's avatar
Kamil Kisiel committed
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154
	for k, v := range timers {
		if count := len(v); count > 0 {
			sort.Float64s(v)
			min := v[0]
			max := v[count-1]

			fmt.Fprintf(conn, "stats.timers.%s.lower %f %d\n", k, min, now)
			fmt.Fprintf(conn, "stats.timers.%s.upper %f %d\n", k, max, now)
			fmt.Fprintf(conn, "stats.timers.%s.count %d %d\n", k, count, now)

			for _, threshold := range percentThresholds {
				mean, upper := thresholdStats(v, threshold)
				thresholdName := strconv.FormatFloat(threshold, 'f', 1, 64)
				fmt.Fprintf(conn, "stats.timers.%s.mean_%s %f %d\n", k, thresholdName, mean, now)
				fmt.Fprintf(conn, "stats.timers.%s.upper_%s %f %d\n", k, thresholdName, upper, now)
			}
			numStats += 1
		}
	}
155
	fmt.Fprintf(conn, "statsd.numStats %d %d\n", numStats, now)
156 157 158

}

Kamil Kisiel's avatar
Kamil Kisiel committed
159
func metricAggregator(metricChan chan Metric) {
160 161 162 163
	var counters = make(MetricMap)
	var gauges = make(MetricMap)
	var timers = make(MetricListMap)

164
	flushTimer := time.NewTimer(flushInterval)
165 166 167 168 169

	log.Printf("Started aggregator")

	for {
		select {
Kamil Kisiel's avatar
Kamil Kisiel committed
170
		case metric := <-metricChan: // Incoming metrics
171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189
			switch metric.Type {
			case COUNTER:
				v, ok := counters[metric.Bucket]
				if ok {
					counters[metric.Bucket] = v + metric.Value
				} else {
					counters[metric.Bucket] = metric.Value
				}
			case GAUGE:
				gauges[metric.Bucket] = metric.Value
			case TIMER:
				v, ok := timers[metric.Bucket]
				if ok {
					v = append(v, metric.Value)
					timers[metric.Bucket] = v
				} else {
					timers[metric.Bucket] = []float64{metric.Value}
				}
			}
Kamil Kisiel's avatar
Kamil Kisiel committed
190
		case <-flushTimer.C: // Time to flush to graphite
191
			go flushMetrics(counters, gauges, timers, flushInterval)
192 193

			// Reset counters
194
			new_counters := make(MetricMap)
195
			for k := range counters {
196
				new_counters[k] = 0
197
			}
198 199
			counters = new_counters

200
			// Reset timers
201
			new_timers := make(MetricListMap)
202
			for k := range timers {
203
				new_timers[k] = []float64{}
204
			}
205
			timers = new_timers
206

207 208 209 210 211 212 213 214
			// Keep values of gauges
			new_gauges := make(MetricMap)
			for k, v := range gauges {
				new_gauges[k] = v
			}
			gauges = new_gauges

			flushTimer = time.NewTimer(flushInterval)
215 216 217 218 219 220 221 222 223 224 225
		}
	}
}

func normalizeBucketName(name string) string {
	spaces, _ := regexp.Compile("\\s+")
	slashes, _ := regexp.Compile("\\/")
	invalid, _ := regexp.Compile("[^a-zA-Z_\\-0-9\\.]")
	return invalid.ReplaceAllString(slashes.ReplaceAllString(spaces.ReplaceAllString(name, "_"), "-"), "")
}

226 227 228
func parseMessage(msg string) ([]Metric, error) {
	metricList := []Metric{}

229 230
	segments := strings.Split(strings.TrimSpace(msg), ":")
	if len(segments) < 1 {
231
		return metricList, fmt.Errorf("ill-formatted message: %s", msg)
232 233 234 235 236
	}

	bucket := normalizeBucketName(segments[0])
	var values []string
	if len(segments) == 1 {
237
		values = []string{"1"}
238 239 240 241 242 243 244
	} else {
		values = segments[1:]
	}

	for _, value := range values {
		fields := strings.Split(value, "|")

245 246 247
		metricValue, err := strconv.ParseFloat(fields[0], 64)
		if err != nil {
			return metricList, fmt.Errorf("%s: bad metric value \"%s\"", bucket, fields[0])
248 249
		}

250 251 252 253 254
		var metricTypeString string
		if len(fields) == 1 {
			metricTypeString = "c"
		} else {
			metricTypeString = fields[1]
255 256
		}

257 258
		var metricType MetricType
		switch metricTypeString {
259 260
		case "ms":
			// Timer
261
			metricType = TIMER
262 263
		case "g":
			// Gauge
264 265 266 267
			metricType = GAUGE
		default:
			// Counter, allows skipping of |c suffix
			metricType = COUNTER
268 269 270 271 272 273

			var rate float64
			if len(fields) == 3 {
				var err error
				rate, err = strconv.ParseFloat(fields[2][1:], 64)
				if err != nil {
274
					return metricList, fmt.Errorf("%s: bad rate %s", fields[2])
275 276 277 278
				}
			} else {
				rate = 1
			}
279
			metricValue = metricValue / rate
280 281
		}

282 283 284
		metric := Metric{metricType, bucket, metricValue}
		metricList = append(metricList, metric)
	}
Kamil Kisiel's avatar
Kamil Kisiel committed
285

286 287 288
	return metricList, nil
}

Kamil Kisiel's avatar
Kamil Kisiel committed
289
func handleMessage(metricChan chan Metric, msg string) {
290 291 292 293 294
	metrics, err := parseMessage(msg)
	if err != nil {
		log.Printf("Error parsing metric %s", err)
	} else {
		for _, metric := range metrics {
Kamil Kisiel's avatar
Kamil Kisiel committed
295
			metricChan <- metric
296
		}
297 298
	}
}