gostatsd.go 3.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200
package main

import (
	"net"
	"fmt"
	"log"
	"time"
	"strings"
	"regexp"
	"strconv"
)

const FLUSH_INTERVAL = 1000

type MetricType int

// Enumeration, see http://golang.org/doc/effective_go.html#constants
const (
	_ = iota
	COUNTER MetricType = 1 << (10 * iota)
	TIMER
	GAUGE
)

func (m MetricType) String() string {
	switch {
	case m >= GAUGE:
		return "gauge"
	case m >= TIMER:
		return "timer"
	case m >= COUNTER:
		return "counter"
	}
	return "unknown"
}

type Metric struct {
	Type MetricType
	Bucket string
	Value float64
}

func (m Metric) String() string {
	return fmt.Sprintf("{%s, %s, %f}", m.Type, m.Bucket, m.Value)
}

func main () {
	var metrics = make(chan Metric)
	go metricListener(metrics)
	go metricAggregator(metrics)
	// Run forever
	select {}
}

func metricListener(metrics chan Metric) {
	conn, err := net.ListenPacket("udp", ":8125")
	if err != nil {
		// Do something about it
		log.Fatal(err)
		return
	}
	msg := make([]byte, 1024)
	for {
		nbytes, _, err := conn.ReadFrom(msg)
		if err != nil {
			log.Fatal(err)
			continue
		}
		go handleMessage(metrics, string(msg[:nbytes]))
	}
}

type MetricMap map[string]float64
type MetricListMap map[string][]float64

func flushMetrics(counters MetricMap, gauges MetricMap, timers MetricListMap) {

}

func metricAggregator(metrics chan Metric) {
	var counters = make(MetricMap)
	var gauges = make(MetricMap)
	var timers = make(MetricListMap)

	flush_timer := time.NewTimer(FLUSH_INTERVAL)

	log.Printf("Started aggregator")

	for {
		select {
		case metric := <-metrics:
			log.Printf("Got %s", metric)
			switch metric.Type {
			case COUNTER:
				v, ok := counters[metric.Bucket]
				if ok {
					counters[metric.Bucket] = v + metric.Value
				} else {
					counters[metric.Bucket] = metric.Value
				}
			case GAUGE:
				gauges[metric.Bucket] = metric.Value
			case TIMER:
				v, ok := timers[metric.Bucket]
				if ok {
					v = append(v, metric.Value)
					timers[metric.Bucket] = v
				} else {
					timers[metric.Bucket] = []float64{metric.Value}
				}
			}
		case <-flush_timer.C:
			go flushMetrics(counters, gauges, timers)

			// Reset counters
			for k := range counters {
				counters[k] = 0
			}
			// Reset timers
			for k := range timers {
				timers[k] = []float64{}
			}
			// Note: gauges are not reset

			flush_timer = time.NewTimer(FLUSH_INTERVAL)
		}
	}
}

func normalizeBucketName(name string) string {
	spaces, _ := regexp.Compile("\\s+")
	slashes, _ := regexp.Compile("\\/")
	invalid, _ := regexp.Compile("[^a-zA-Z_\\-0-9\\.]")
	return invalid.ReplaceAllString(slashes.ReplaceAllString(spaces.ReplaceAllString(name, "_"), "-"), "")
}

func handleMessage(metrics chan Metric, msg string) {
	segments := strings.Split(strings.TrimSpace(msg), ":")
	if len(segments) < 1 {
		log.Printf("Received ill-formatted message: %s", msg)
		return
	}

	bucket := normalizeBucketName(segments[0])
	var values []string
	if len(segments) == 1 {
		values = []string{"1|c"}
	} else {
		values = segments[1:]
	}

	for _, value := range values {
		//sampleRate := 1
		fields := strings.Split(value, "|")

		if len(fields) == 1 {
			log.Printf("Bad value for %s: %s", bucket, value)
			return
		}

		metric_value, err := strconv.ParseFloat(fields[0], 64)
		if err != nil {
			log.Printf("Bad metric value for %s: %s", bucket, fields[0])
			return
		}

		var metric_type MetricType
		switch fields[1] {
		case "ms":
			// Timer
			metric_type = TIMER
		case "g":
			// Gauge
			metric_type = GAUGE
		case "c":
			// Counter
			metric_type = COUNTER

			var rate float64
			if len(fields) == 3 {
				var err error
				rate, err = strconv.ParseFloat(fields[2][1:], 64)
				if err != nil {
					log.Printf("Could not parse rate from %s", fields[2])
					return
				}
			} else {
				rate = 1
			}
			metric_value = metric_value / rate
		default:
			log.Printf("Unknown metric type: %s", metric_type)
			return
		}

		metric := Metric{metric_type, bucket, metric_value}
		log.Printf("%s", metric)
		metrics <- metric
	}
}