Commit 9d6a7897 authored by Kamil Kisiel's avatar Kamil Kisiel

Flush metrics to graphite. Better error handling.

parent 5abe00c7
......@@ -10,7 +10,8 @@ import (
"strconv"
)
const FLUSH_INTERVAL = 1000
const flushInterval = 10
const GRAPHITE_SERVER = "localhost:1234"
type MetricType int
......@@ -73,16 +74,41 @@ func metricListener(metrics chan Metric) {
type MetricMap map[string]float64
type MetricListMap map[string][]float64
func flushMetrics(counters MetricMap, gauges MetricMap, timers MetricListMap) {
func flushMetrics(counters MetricMap, gauges MetricMap, timers MetricListMap, flushInterval time.Duration) {
conn, err := net.Dial("tcp", GRAPHITE_SERVER)
if err != nil {
log.Printf("Could not contact Graphite server")
return
}
defer conn.Close()
numStats := 0
now := time.Now().Unix()
for k, v := range counters {
perSecond := v / flushInterval.Seconds()
fmt.Fprintf(conn, "stats.%s %f %d\n", k, perSecond, now)
fmt.Fprintf(conn, "stats_counts.%s %f %d\n", k, v, now)
numStats += 1
}
for k, v := range gauges {
fmt.Fprintf(conn, "stats.gauges.%s %f %d\n", k, v, now)
numStats += 1
}
fmt.Fprintf(conn, "statsd.numStats %d %d\n", numStats, now)
}
func metricAggregator(metrics chan Metric) {
flushInterval := time.Duration(flushInterval * time.Second)
var counters = make(MetricMap)
var gauges = make(MetricMap)
var timers = make(MetricListMap)
flush_timer := time.NewTimer(FLUSH_INTERVAL)
flushTimer := time.NewTimer(flushInterval)
log.Printf("Started aggregator")
......@@ -109,20 +135,31 @@ func metricAggregator(metrics chan Metric) {
timers[metric.Bucket] = []float64{metric.Value}
}
}
case <-flush_timer.C:
go flushMetrics(counters, gauges, timers)
case <-flushTimer.C:
go flushMetrics(counters, gauges, timers, flushInterval)
// Reset counters
new_counters := make(MetricMap)
for k := range counters {
counters[k] = 0
new_counters[k] = 0
}
counters = new_counters
// Reset timers
new_timers := make(MetricListMap)
for k := range timers {
timers[k] = []float64{}
new_timers[k] = []float64{}
}
// Note: gauges are not reset
timers = new_timers
flush_timer = time.NewTimer(FLUSH_INTERVAL)
// Keep values of gauges
new_gauges := make(MetricMap)
for k, v := range gauges {
new_gauges[k] = v
}
gauges = new_gauges
flushTimer = time.NewTimer(flushInterval)
}
}
}
......@@ -134,67 +171,77 @@ func normalizeBucketName(name string) string {
return invalid.ReplaceAllString(slashes.ReplaceAllString(spaces.ReplaceAllString(name, "_"), "-"), "")
}
func handleMessage(metrics chan Metric, msg string) {
func parseMessage(msg string) ([]Metric, error) {
metricList := []Metric{}
segments := strings.Split(strings.TrimSpace(msg), ":")
if len(segments) < 1 {
log.Printf("Received ill-formatted message: %s", msg)
return
return metricList, fmt.Errorf("ill-formatted message: %s", msg)
}
bucket := normalizeBucketName(segments[0])
var values []string
if len(segments) == 1 {
values = []string{"1|c"}
values = []string{"1"}
} else {
values = segments[1:]
}
for _, value := range values {
//sampleRate := 1
fields := strings.Split(value, "|")
if len(fields) == 1 {
log.Printf("Bad value for %s: %s", bucket, value)
return
metricValue, err := strconv.ParseFloat(fields[0], 64)
if err != nil {
return metricList, fmt.Errorf("%s: bad metric value \"%s\"", bucket, fields[0])
}
metric_value, err := strconv.ParseFloat(fields[0], 64)
if err != nil {
log.Printf("Bad metric value for %s: %s", bucket, fields[0])
return
var metricTypeString string
if len(fields) == 1 {
metricTypeString = "c"
} else {
metricTypeString = fields[1]
}
var metric_type MetricType
switch fields[1] {
var metricType MetricType
switch metricTypeString {
case "ms":
// Timer
metric_type = TIMER
metricType = TIMER
case "g":
// Gauge
metric_type = GAUGE
case "c":
// Counter
metric_type = COUNTER
metricType = GAUGE
default:
// Counter, allows skipping of |c suffix
metricType = COUNTER
var rate float64
if len(fields) == 3 {
var err error
rate, err = strconv.ParseFloat(fields[2][1:], 64)
if err != nil {
log.Printf("Could not parse rate from %s", fields[2])
return
return metricList, fmt.Errorf("%s: bad rate %s", fields[2])
}
} else {
rate = 1
}
metric_value = metric_value / rate
default:
log.Printf("Unknown metric type: %s", metric_type)
return
metricValue = metricValue / rate
}
metric := Metric{metric_type, bucket, metric_value}
log.Printf("%s", metric)
metrics <- metric
metric := Metric{metricType, bucket, metricValue}
metricList = append(metricList, metric)
}
return metricList, nil
}
func handleMessage(metric_chan chan Metric, msg string) {
metrics, err := parseMessage(msg)
if err != nil {
log.Printf("Error parsing metric %s", err)
} else {
for _, metric := range metrics {
log.Printf("%s", metric)
metric_chan <- metric
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment