Commit e1b7f409 authored by Kamil Kisiel's avatar Kamil Kisiel

Removed thresholding functionality, simplified the receiver, added tests.

parent b9b0317f
...@@ -149,10 +149,7 @@ func (m *MetricAggregator) Aggregate() { ...@@ -149,10 +149,7 @@ func (m *MetricAggregator) Aggregate() {
flushed := m.flush() flushed := m.flush()
go func() { go func() {
e := m.Sender.SendMetrics(flushed) flushChan <- m.Sender.SendMetrics(flushed)
if e != nil {
flushChan <- e
}
}() }()
m.Reset() m.Reset()
flushTimer = time.NewTimer(m.FlushInterval) flushTimer = time.NewTimer(m.FlushInterval)
......
package statsd package statsd
import ( import (
"bytes"
"fmt" "fmt"
"io"
"log" "log"
"net" "net"
"strconv" "strconv"
"strings"
) )
// DefaultMetricsAddr is the default address on which a MetricReceiver will listen // DefaultMetricsAddr is the default address on which a MetricReceiver will listen
...@@ -64,75 +65,63 @@ func (r *MetricReceiver) Receive(c net.PacketConn) error { ...@@ -64,75 +65,63 @@ func (r *MetricReceiver) Receive(c net.PacketConn) error {
// handleMessage handles the contents of a datagram and attempts to parse a Metric from each line // handleMessage handles the contents of a datagram and attempts to parse a Metric from each line
func (srv *MetricReceiver) handleMessage(msg []byte) { func (srv *MetricReceiver) handleMessage(msg []byte) {
metrics, err := parseMessage(string(msg)) buf := bytes.NewBuffer(msg)
if err != nil { for {
log.Printf("Error parsing metric %s", err) line, err := buf.ReadBytes('\n')
} if err == io.EOF {
for _, metric := range metrics { break
srv.Handler.HandleMetric(metric) }
if err != nil {
log.Printf("error reading message: %s", err)
return
}
metric, err := parseLine(line[:len(line)-1])
if err != nil {
log.Printf("error parsing metric: %s", err)
continue
}
go srv.Handler.HandleMetric(metric)
} }
} }
// parseMessage parses a message string string in to a list of metrics func parseLine(line []byte) (Metric, error) {
func parseMessage(msg string) ([]Metric, error) { var metric Metric
metricList := []Metric{}
segments := strings.Split(strings.TrimSpace(msg), ":") buf := bytes.NewBuffer(line)
if len(segments) < 1 { bucket, err := buf.ReadBytes(':')
return metricList, fmt.Errorf("ill-formatted message: %s", msg) if err != nil {
return metric, fmt.Errorf("error parsing metric: %s", err)
} }
metric.Bucket = string(bucket[:len(bucket)-1])
bucket := segments[0] value, err := buf.ReadBytes('|')
var values []string if err != nil {
if len(segments) == 1 { return metric, fmt.Errorf("error parsing metric: %s", err)
values = []string{"1"} }
} else { metric.Value, err = strconv.ParseFloat(string(value[:len(value)-1]), 64)
values = segments[1:] if err != nil {
return metric, fmt.Errorf("error parsing value of metric: %s", err)
} }
for _, value := range values { metricType := buf.Bytes()
fields := strings.Split(value, "|") if err != nil && err != io.EOF {
return metric, fmt.Errorf("error parsing metric: %s", err)
metricValue, err := strconv.ParseFloat(fields[0], 64) }
if err != nil {
return metricList, fmt.Errorf("%s: bad metric value \"%s\"", bucket, fields[0])
}
var metricTypeString string
if len(fields) == 1 {
metricTypeString = "c"
} else {
metricTypeString = fields[1]
}
var metricType MetricType
switch metricTypeString {
case "ms":
// Timer
metricType = TIMER
case "g":
// Gauge
metricType = GAUGE
default:
// Counter, allows skipping of |c suffix
metricType = COUNTER
var rate float64
if len(fields) == 3 {
var err error
rate, err = strconv.ParseFloat(fields[2][1:], 64)
if err != nil {
return metricList, fmt.Errorf("%s: bad rate %s", fields[2])
}
} else {
rate = 1
}
metricValue = metricValue / rate
}
metric := Metric{metricType, bucket, metricValue} switch string(metricType[:len(metricType)]) {
metricList = append(metricList, metric) case "ms":
// Timer
metric.Type = TIMER
case "g":
// Gauge
metric.Type = GAUGE
case "c":
metric.Type = COUNTER
default:
err = fmt.Errorf("invalid metric type: %q", metricType)
return metric, err
} }
return metricList, nil return metric, nil
} }
package statsd
import (
"testing"
)
func TestParseLine(t *testing.T) {
tests := map[string]Metric{
"foo.bar.baz:2|c": Metric{Bucket: "foo.bar.baz", Value: 2.0, Type: COUNTER},
"abc.def.g:3|g": Metric{Bucket: "abc.def.g", Value: 3, Type: GAUGE},
"def.g:10|ms": Metric{Bucket: "def.g", Value: 10, Type: TIMER},
}
for input, expected := range tests {
result, err := parseLine([]byte(input))
if err != nil {
t.Errorf("test %s error: %s", input, err)
continue
}
if result != expected {
t.Errorf("test %s: expected %s, got %s", input, expected, result)
continue
}
}
failing := []string{"fOO|bar:bazkk", "foo.bar.baz:1|q"}
for _, tc := range failing {
result, err := parseLine([]byte(tc))
if err == nil {
t.Errorf("test %s: expected error but got %s", tc, result)
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment