Commit af1815e3 authored by Kamil Kisiel's avatar Kamil Kisiel

Separated out a MetricReceiver type and created an example

parent a0a1dbc0
package main
import (
"github.com/kisielk/gostatsd/statsd"
"log"
)
func main() {
f := func(m statsd.Metric) {
log.Printf("%s", m)
}
r := statsd.MetricReceiver{":8125", f}
r.ListenAndReceive()
}
package statsd
import (
"log"
"net"
)
const DefaultMetricsAddr = ":8125"
type Handler func(Metric)
type MetricReceiver struct {
Addr string // UDP address on which to listen for metrics
Handler Handler // handler to invoke
}
func (r *MetricReceiver) ListenAndReceive() error {
addr := r.Addr
if addr == "" {
addr = DefaultMetricsAddr
}
c, err := net.ListenPacket("udp", addr)
if err != nil {
return err
}
return r.Receive(c)
}
func (r *MetricReceiver) Receive(c net.PacketConn) error {
defer c.Close()
msg := make([]byte, 1024)
for {
nbytes, _, err := c.ReadFrom(msg)
if err != nil {
log.Printf("%s", err)
continue
}
log.Printf("%s", msg[:nbytes])
go r.handleMessage(msg[:nbytes])
}
panic("not reached")
}
func (srv *MetricReceiver) handleMessage(msg []byte) {
metrics, err := parseMessage(string(msg))
if err != nil {
log.Printf("Error parsing metric %s", err)
}
for _, metric := range metrics {
srv.Handler(metric)
}
}
......@@ -384,23 +384,6 @@ func handleMessage(metricChan chan Metric, msg string) {
}
}
func metricListener(addr string, metricChan chan Metric) {
conn, err := net.ListenPacket("udp", addr)
if err != nil {
log.Fatal(err)
return
}
msg := make([]byte, 1024)
for {
nbytes, _, err := conn.ReadFrom(msg)
if err != nil {
log.Printf("%s", err)
continue
}
go handleMessage(metricChan, string(msg[:nbytes]))
}
}
func consoleClient(conn net.Conn, consoleChan chan ConsoleRequest) {
defer conn.Close()
......@@ -442,7 +425,13 @@ func consoleServer(addr string, consoleChan chan ConsoleRequest) {
func ListenAndServe(metricAddr string, consoleAddr string, graphiteAddr string, flushInterval time.Duration) error {
var metricChan = make(chan Metric)
var consoleChan = make(chan ConsoleRequest)
go metricListener(metricAddr, metricChan)
f := func(metric Metric) {
metricChan <- metric
}
s := MetricReceiver{metricAddr, f}
go s.ListenAndReceive()
go metricAggregator(graphiteAddr, metricChan, consoleChan, flushInterval)
go consoleServer(consoleAddr, consoleChan)
// Run forever
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment