receiver.go 3.17 KB
Newer Older
1 2 3
package statsd

import (
4
	"bytes"
5
	"fmt"
6
	"io"
7 8
	"log"
	"net"
9
	"strconv"
10 11
)

12
// DefaultMetricsAddr is the default address on which a MetricReceiver will listen
13 14
const DefaultMetricsAddr = ":8125"

15 16 17 18 19 20 21 22 23 24 25 26
// Objects implementing the Handler interface can be used to handle metrics for a MetricReceiver
type Handler interface {
	HandleMetric(m Metric)
}

// The HandlerFunc type is an adapter to allow the use of ordinary functions as metric handlers
type HandlerFunc func(Metric)

// HandleMetric calls f(m)
func (f HandlerFunc) HandleMetric(m Metric) {
	f(m)
}
27

28 29
// MetricReceiver receives data on its listening port and converts lines in to Metrics.
// For each Metric it calls r.Handler.HandleMetric()
30 31 32 33 34
type MetricReceiver struct {
	Addr    string  // UDP address on which to listen for metrics
	Handler Handler // handler to invoke
}

35 36
// ListenAndReceive listens on the UDP network address of srv.Addr and then calls
// Receive to handle the incoming datagrams. If Addr is blank then DefaultMetricsAddr is used.
37 38 39 40 41 42 43 44 45 46 47 48
func (r *MetricReceiver) ListenAndReceive() error {
	addr := r.Addr
	if addr == "" {
		addr = DefaultMetricsAddr
	}
	c, err := net.ListenPacket("udp", addr)
	if err != nil {
		return err
	}
	return r.Receive(c)
}

49 50
// Receive accepts incoming datagrams on c and calls r.Handler.HandleMetric() for each line in the
// datagram that successfully parses in to a Metric
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
func (r *MetricReceiver) Receive(c net.PacketConn) error {
	defer c.Close()

	msg := make([]byte, 1024)
	for {
		nbytes, _, err := c.ReadFrom(msg)
		if err != nil {
			log.Printf("%s", err)
			continue
		}
		go r.handleMessage(msg[:nbytes])
	}
	panic("not reached")
}

66
// handleMessage handles the contents of a datagram and attempts to parse a Metric from each line
67
func (srv *MetricReceiver) handleMessage(msg []byte) {
68 69 70 71 72 73 74 75 76 77 78
	buf := bytes.NewBuffer(msg)
	for {
		line, err := buf.ReadBytes('\n')
		if err == io.EOF {
			break
		}
		if err != nil {
			log.Printf("error reading message: %s", err)
			return
		}

79 80 81 82 83 84 85 86 87 88
		lineLength := len(line)
		// Only process lines with more than one character
		if lineLength > 1 {
			metric, err := parseLine(line[:lineLength-1])
			if err != nil {
				log.Println(line)
				log.Println(err)
				continue
			}
			go srv.Handler.HandleMetric(metric)
89
		}
90 91
	}
}
92

93 94
func parseLine(line []byte) (Metric, error) {
	var metric Metric
95

96 97 98
	buf := bytes.NewBuffer(line)
	bucket, err := buf.ReadBytes(':')
	if err != nil {
99 100
		fmt.Println(line)
		return metric, fmt.Errorf("error parsing metric name: %s", err)
101
	}
102
	metric.Bucket = string(bucket[:len(bucket)-1])
103

104 105
	value, err := buf.ReadBytes('|')
	if err != nil {
106
		return metric, fmt.Errorf("error parsing metric value: %s", err)
107 108 109
	}
	metric.Value, err = strconv.ParseFloat(string(value[:len(value)-1]), 64)
	if err != nil {
110
		return metric, fmt.Errorf("error converting metric value: %s", err)
111 112
	}

113 114
	metricType := buf.Bytes()
	if err != nil && err != io.EOF {
115
		return metric, fmt.Errorf("error parsing metric type: %s", err)
116
	}
117

118 119 120 121 122 123 124 125 126 127 128 129
	switch string(metricType[:len(metricType)]) {
	case "ms":
		// Timer
		metric.Type = TIMER
	case "g":
		// Gauge
		metric.Type = GAUGE
	case "c":
		metric.Type = COUNTER
	default:
		err = fmt.Errorf("invalid metric type: %q", metricType)
		return metric, err
130 131
	}

132
	return metric, nil
133
}