receiver.go 3.5 KB
Newer Older
1 2 3
package statsd

import (
4
	"bytes"
5
	"fmt"
6
	"io"
7 8
	"log"
	"net"
9
	"strconv"
10 11
)

12
// DefaultMetricsAddr is the default address on which a MetricReceiver will listen
13 14
const DefaultMetricsAddr = ":8125"

15 16 17 18 19 20 21 22 23 24 25 26
// Objects implementing the Handler interface can be used to handle metrics for a MetricReceiver
type Handler interface {
	HandleMetric(m Metric)
}

// The HandlerFunc type is an adapter to allow the use of ordinary functions as metric handlers
type HandlerFunc func(Metric)

// HandleMetric calls f(m)
func (f HandlerFunc) HandleMetric(m Metric) {
	f(m)
}
27

28 29
// MetricReceiver receives data on its listening port and converts lines in to Metrics.
// For each Metric it calls r.Handler.HandleMetric()
30 31 32 33 34
type MetricReceiver struct {
	Addr    string  // UDP address on which to listen for metrics
	Handler Handler // handler to invoke
}

35 36
// ListenAndReceive listens on the UDP network address of srv.Addr and then calls
// Receive to handle the incoming datagrams. If Addr is blank then DefaultMetricsAddr is used.
37 38 39 40 41 42 43 44 45 46 47 48
func (r *MetricReceiver) ListenAndReceive() error {
	addr := r.Addr
	if addr == "" {
		addr = DefaultMetricsAddr
	}
	c, err := net.ListenPacket("udp", addr)
	if err != nil {
		return err
	}
	return r.Receive(c)
}

49 50
// Receive accepts incoming datagrams on c and calls r.Handler.HandleMetric() for each line in the
// datagram that successfully parses in to a Metric
51 52 53 54 55
func (r *MetricReceiver) Receive(c net.PacketConn) error {
	defer c.Close()

	msg := make([]byte, 1024)
	for {
56
		nbytes, addr, err := c.ReadFrom(msg)
57 58 59 60
		if err != nil {
			log.Printf("%s", err)
			continue
		}
61 62 63
		buf := make([]byte, nbytes)
		copy(buf, msg[:nbytes])
		go r.handleMessage(addr, buf)
64 65 66 67
	}
	panic("not reached")
}

68
// handleMessage handles the contents of a datagram and attempts to parse a Metric from each line
69
func (srv *MetricReceiver) handleMessage(addr net.Addr, msg []byte) {
70 71
	buf := bytes.NewBuffer(msg)
	for {
72 73 74 75 76
		line, readerr := buf.ReadBytes('\n')

		// protocol does not require line to end in \n, if EOF use received line if valid
		if readerr != nil && readerr != io.EOF {
			log.Printf("error reading message from %s: %s", addr, readerr)
77
			return
78 79 80 81 82
		} else if readerr != io.EOF {
			// remove newline, only if not EOF
			if len(line) > 0 {
				line = line[:len(line)-1]
			}
83 84
		}

85
		// Only process lines with more than one character
86 87
		if len(line) > 1 {
			metric, err := parseLine(line)
88
			if err != nil {
89
				log.Printf("error parsing line %q from %s: %s", line, addr, err)
90 91 92
				continue
			}
			go srv.Handler.HandleMetric(metric)
93
		}
94

Rangel Reale's avatar
Rangel Reale committed
95
		if readerr == io.EOF {
96 97 98
			// if was EOF, finished handling
			return
		}
99 100
	}
}
101

102 103
func parseLine(line []byte) (Metric, error) {
	var metric Metric
104

105 106 107
	buf := bytes.NewBuffer(line)
	bucket, err := buf.ReadBytes(':')
	if err != nil {
108
		return metric, fmt.Errorf("error parsing metric name: %s", err)
109
	}
110
	metric.Bucket = string(bucket[:len(bucket)-1])
111

112 113
	value, err := buf.ReadBytes('|')
	if err != nil {
114
		return metric, fmt.Errorf("error parsing metric value: %s", err)
115 116 117
	}
	metric.Value, err = strconv.ParseFloat(string(value[:len(value)-1]), 64)
	if err != nil {
118
		return metric, fmt.Errorf("error converting metric value: %s", err)
119 120
	}

121 122
	metricType := buf.Bytes()
	if err != nil && err != io.EOF {
123
		return metric, fmt.Errorf("error parsing metric type: %s", err)
124
	}
125

126 127 128 129 130 131 132 133 134 135 136 137
	switch string(metricType[:len(metricType)]) {
	case "ms":
		// Timer
		metric.Type = TIMER
	case "g":
		// Gauge
		metric.Type = GAUGE
	case "c":
		metric.Type = COUNTER
	default:
		err = fmt.Errorf("invalid metric type: %q", metricType)
		return metric, err
138 139
	}

140
	return metric, nil
141
}