Commit 2be64b46 authored by Rangel Reale's avatar Rangel Reale

* Allows lines not ending in \n, as it is not required by the protocol

* Function adapter for aggregator sender function
parent e770009a
......@@ -21,6 +21,15 @@ type MetricSender interface {
SendMetrics(MetricMap) error
}
// The MetricSenderFunc type is an adapter to allow the use of ordinary functions as metric senders
type MetricSenderFunc func(MetricMap)
// SendMetrics calls f(m)
func (f MetricSenderFunc) SendMetrics(m MetricMap) error {
f(m)
return nil
}
// MetricAggregator is an object that aggregates statsd metrics.
// The function NewMetricAggregator should be used to create the objects.
//
......
......@@ -69,25 +69,33 @@ func (r *MetricReceiver) Receive(c net.PacketConn) error {
func (srv *MetricReceiver) handleMessage(addr net.Addr, msg []byte) {
buf := bytes.NewBuffer(msg)
for {
line, err := buf.ReadBytes('\n')
if err == io.EOF {
break
}
if err != nil {
log.Printf("error reading message from %s: %s", addr, err)
line, readerr := buf.ReadBytes('\n')
// protocol does not require line to end in \n, if EOF use received line if valid
if readerr != nil && readerr != io.EOF {
log.Printf("error reading message from %s: %s", addr, readerr)
return
} else if readerr != io.EOF {
// remove newline, only if not EOF
if len(line) > 0 {
line = line[:len(line)-1]
}
}
lineLength := len(line)
// Only process lines with more than one character
if lineLength > 1 {
metric, err := parseLine(line[:lineLength-1])
if len(line) > 1 {
metric, err := parseLine(line)
if err != nil {
log.Println("error parsing line %q from %s: %s", line, addr, err)
log.Printf("error parsing line %q from %s: %s", line, addr, err)
continue
}
go srv.Handler.HandleMetric(metric)
}
if readerr != nil && readerr == io.EOF {
// if was EOF, finished handling
return
}
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment