Commit bb699a08 authored by Kamil Kisiel's avatar Kamil Kisiel

Documented the stuff in receiver.go

parent af1815e3
...@@ -9,6 +9,6 @@ func main() { ...@@ -9,6 +9,6 @@ func main() {
f := func(m statsd.Metric) { f := func(m statsd.Metric) {
log.Printf("%s", m) log.Printf("%s", m)
} }
r := statsd.MetricReceiver{":8125", f} r := statsd.MetricReceiver{":8125", HanlderFunc(f)}
r.ListenAndReceive() r.ListenAndReceive()
} }
...@@ -5,15 +5,31 @@ import ( ...@@ -5,15 +5,31 @@ import (
"net" "net"
) )
// DefaultMetricsAddr is the default address on which a MetricReceiver will listen
const DefaultMetricsAddr = ":8125" const DefaultMetricsAddr = ":8125"
type Handler func(Metric) // Objects implementing the Handler interface can be used to handle metrics for a MetricReceiver
type Handler interface {
HandleMetric(m Metric)
}
// The HandlerFunc type is an adapter to allow the use of ordinary functions as metric handlers
type HandlerFunc func(Metric)
// HandleMetric calls f(m)
func (f HandlerFunc) HandleMetric(m Metric) {
f(m)
}
// MetricReceiver receives data on its listening port and converts lines in to Metrics.
// For each Metric it calls r.Handler.HandleMetric()
type MetricReceiver struct { type MetricReceiver struct {
Addr string // UDP address on which to listen for metrics Addr string // UDP address on which to listen for metrics
Handler Handler // handler to invoke Handler Handler // handler to invoke
} }
// ListenAndReceive listens on the UDP network address of srv.Addr and then calls
// Receive to handle the incoming datagrams. If Addr is blank then DefaultMetricsAddr is used.
func (r *MetricReceiver) ListenAndReceive() error { func (r *MetricReceiver) ListenAndReceive() error {
addr := r.Addr addr := r.Addr
if addr == "" { if addr == "" {
...@@ -26,6 +42,8 @@ func (r *MetricReceiver) ListenAndReceive() error { ...@@ -26,6 +42,8 @@ func (r *MetricReceiver) ListenAndReceive() error {
return r.Receive(c) return r.Receive(c)
} }
// Receive accepts incoming datagrams on c and calls r.Handler.HandleMetric() for each line in the
// datagram that successfully parses in to a Metric
func (r *MetricReceiver) Receive(c net.PacketConn) error { func (r *MetricReceiver) Receive(c net.PacketConn) error {
defer c.Close() defer c.Close()
...@@ -42,12 +60,13 @@ func (r *MetricReceiver) Receive(c net.PacketConn) error { ...@@ -42,12 +60,13 @@ func (r *MetricReceiver) Receive(c net.PacketConn) error {
panic("not reached") panic("not reached")
} }
// handleMessage handles the contents of a datagram and attempts to parse a Metric from each line
func (srv *MetricReceiver) handleMessage(msg []byte) { func (srv *MetricReceiver) handleMessage(msg []byte) {
metrics, err := parseMessage(string(msg)) metrics, err := parseMessage(string(msg))
if err != nil { if err != nil {
log.Printf("Error parsing metric %s", err) log.Printf("Error parsing metric %s", err)
} }
for _, metric := range metrics { for _, metric := range metrics {
srv.Handler(metric) srv.Handler.HandleMetric(metric)
} }
} }
...@@ -429,7 +429,7 @@ func ListenAndServe(metricAddr string, consoleAddr string, graphiteAddr string, ...@@ -429,7 +429,7 @@ func ListenAndServe(metricAddr string, consoleAddr string, graphiteAddr string,
f := func(metric Metric) { f := func(metric Metric) {
metricChan <- metric metricChan <- metric
} }
s := MetricReceiver{metricAddr, f} s := MetricReceiver{metricAddr, HandlerFunc(f)}
go s.ListenAndReceive() go s.ListenAndReceive()
go metricAggregator(graphiteAddr, metricChan, consoleChan, flushInterval) go metricAggregator(graphiteAddr, metricChan, consoleChan, flushInterval)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment