diff --git a/statsd/aggregator.go b/statsd/aggregator.go index e64c8d87f43a8f1c3626afac58523275494d033b..c6640f66cd483b72710b71b5a6d42a9bdb48fae9 100644 --- a/statsd/aggregator.go +++ b/statsd/aggregator.go @@ -149,10 +149,7 @@ func (m *MetricAggregator) Aggregate() { flushed := m.flush() go func() { - e := m.Sender.SendMetrics(flushed) - if e != nil { - flushChan <- e - } + flushChan <- m.Sender.SendMetrics(flushed) }() m.Reset() flushTimer = time.NewTimer(m.FlushInterval) diff --git a/statsd/receiver.go b/statsd/receiver.go index 1855fb84cb9f51c225339de7fee7bf0c4fcd4c46..7e94917635a52bf0b84e15433f8f6beeb61c0a3f 100644 --- a/statsd/receiver.go +++ b/statsd/receiver.go @@ -1,11 +1,12 @@ package statsd import ( + "bytes" "fmt" + "io" "log" "net" "strconv" - "strings" ) // DefaultMetricsAddr is the default address on which a MetricReceiver will listen @@ -64,75 +65,63 @@ func (r *MetricReceiver) Receive(c net.PacketConn) error { // handleMessage handles the contents of a datagram and attempts to parse a Metric from each line func (srv *MetricReceiver) handleMessage(msg []byte) { - metrics, err := parseMessage(string(msg)) - if err != nil { - log.Printf("Error parsing metric %s", err) - } - for _, metric := range metrics { - srv.Handler.HandleMetric(metric) + buf := bytes.NewBuffer(msg) + for { + line, err := buf.ReadBytes('\n') + if err == io.EOF { + break + } + if err != nil { + log.Printf("error reading message: %s", err) + return + } + + metric, err := parseLine(line[:len(line)-1]) + if err != nil { + log.Printf("error parsing metric: %s", err) + continue + } + go srv.Handler.HandleMetric(metric) } } -// parseMessage parses a message string string in to a list of metrics -func parseMessage(msg string) ([]Metric, error) { - metricList := []Metric{} +func parseLine(line []byte) (Metric, error) { + var metric Metric - segments := strings.Split(strings.TrimSpace(msg), ":") - if len(segments) < 1 { - return metricList, fmt.Errorf("ill-formatted message: %s", msg) + buf := bytes.NewBuffer(line) + bucket, err := buf.ReadBytes(':') + if err != nil { + return metric, fmt.Errorf("error parsing metric: %s", err) } + metric.Bucket = string(bucket[:len(bucket)-1]) - bucket := segments[0] - var values []string - if len(segments) == 1 { - values = []string{"1"} - } else { - values = segments[1:] + value, err := buf.ReadBytes('|') + if err != nil { + return metric, fmt.Errorf("error parsing metric: %s", err) + } + metric.Value, err = strconv.ParseFloat(string(value[:len(value)-1]), 64) + if err != nil { + return metric, fmt.Errorf("error parsing value of metric: %s", err) } - for _, value := range values { - fields := strings.Split(value, "|") - - metricValue, err := strconv.ParseFloat(fields[0], 64) - if err != nil { - return metricList, fmt.Errorf("%s: bad metric value \"%s\"", bucket, fields[0]) - } - - var metricTypeString string - if len(fields) == 1 { - metricTypeString = "c" - } else { - metricTypeString = fields[1] - } - - var metricType MetricType - switch metricTypeString { - case "ms": - // Timer - metricType = TIMER - case "g": - // Gauge - metricType = GAUGE - default: - // Counter, allows skipping of |c suffix - metricType = COUNTER - - var rate float64 - if len(fields) == 3 { - var err error - rate, err = strconv.ParseFloat(fields[2][1:], 64) - if err != nil { - return metricList, fmt.Errorf("%s: bad rate %s", fields[2]) - } - } else { - rate = 1 - } - metricValue = metricValue / rate - } + metricType := buf.Bytes() + if err != nil && err != io.EOF { + return metric, fmt.Errorf("error parsing metric: %s", err) + } - metric := Metric{metricType, bucket, metricValue} - metricList = append(metricList, metric) + switch string(metricType[:len(metricType)]) { + case "ms": + // Timer + metric.Type = TIMER + case "g": + // Gauge + metric.Type = GAUGE + case "c": + metric.Type = COUNTER + default: + err = fmt.Errorf("invalid metric type: %q", metricType) + return metric, err } - return metricList, nil + return metric, nil } diff --git a/statsd/receiver_test.go b/statsd/receiver_test.go new file mode 100644 index 0000000000000000000000000000000000000000..b9d8f3e78fe5a63ff3ab230d855101d4c1cff289 --- /dev/null +++ b/statsd/receiver_test.go @@ -0,0 +1,33 @@ +package statsd + +import ( + "testing" +) + +func TestParseLine(t *testing.T) { + tests := map[string]Metric{ + "foo.bar.baz:2|c": Metric{Bucket: "foo.bar.baz", Value: 2.0, Type: COUNTER}, + "abc.def.g:3|g": Metric{Bucket: "abc.def.g", Value: 3, Type: GAUGE}, + "def.g:10|ms": Metric{Bucket: "def.g", Value: 10, Type: TIMER}, + } + + for input, expected := range tests { + result, err := parseLine([]byte(input)) + if err != nil { + t.Errorf("test %s error: %s", input, err) + continue + } + if result != expected { + t.Errorf("test %s: expected %s, got %s", input, expected, result) + continue + } + } + + failing := []string{"fOO|bar:bazkk", "foo.bar.baz:1|q"} + for _, tc := range failing { + result, err := parseLine([]byte(tc)) + if err == nil { + t.Errorf("test %s: expected error but got %s", tc, result) + } + } +}