Commit 0e1ebcaa authored by Kamil Kisiel's avatar Kamil Kisiel

Simplified reset, better error handling on receive.

parent bb124e50
...@@ -46,23 +46,26 @@ func NewMetricAggregator(sender MetricSender, flushInterval time.Duration) (a Me ...@@ -46,23 +46,26 @@ func NewMetricAggregator(sender MetricSender, flushInterval time.Duration) (a Me
} }
// flush prepares the contents of a MetricAggregator for sending via the Sender // flush prepares the contents of a MetricAggregator for sending via the Sender
func (m *MetricAggregator) flush() (metrics MetricMap) { func (a *MetricAggregator) flush() (metrics MetricMap) {
defer a.Unlock()
a.Lock()
metrics = make(MetricMap) metrics = make(MetricMap)
numStats := 0 numStats := 0
for k, v := range m.counters { for k, v := range a.counters {
perSecond := v / m.FlushInterval.Seconds() perSecond := v / a.FlushInterval.Seconds()
metrics["stats."+k] = perSecond metrics["stats."+k] = perSecond
metrics["stats_counts."+k] = v metrics["stats_counts."+k] = v
numStats += 1 numStats += 1
} }
for k, v := range m.gauges { for k, v := range a.gauges {
metrics["stats.gauges."+k] = v metrics["stats.gauges."+k] = v
numStats += 1 numStats += 1
} }
for k, v := range m.timers { for k, v := range a.timers {
if count := len(v); count > 0 { if count := len(v); count > 0 {
sort.Float64s(v) sort.Float64s(v)
min := v[0] min := v[0]
...@@ -80,26 +83,18 @@ func (m *MetricAggregator) flush() (metrics MetricMap) { ...@@ -80,26 +83,18 @@ func (m *MetricAggregator) flush() (metrics MetricMap) {
// Reset clears the contents of a MetricAggregator // Reset clears the contents of a MetricAggregator
func (a *MetricAggregator) Reset() { func (a *MetricAggregator) Reset() {
// Reset counters defer a.Unlock()
new_counters := make(MetricMap) a.Lock()
for k := range a.counters { for k := range a.counters {
new_counters[k] = 0 a.counters[k] = 0
} }
a.counters = new_counters
// Reset timers
new_timers := make(MetricListMap)
for k := range a.timers { for k := range a.timers {
new_timers[k] = []float64{} a.timers[k] = []float64{}
} }
a.timers = new_timers
// Keep values of gauges // No reset for gauges, they keep the last value
new_gauges := make(MetricMap)
for k, v := range a.gauges {
new_gauges[k] = v
}
a.gauges = new_gauges
} }
// receiveMetric is called for each incoming metric on MetricChan // receiveMetric is called for each incoming metric on MetricChan
...@@ -133,38 +128,34 @@ func (a *MetricAggregator) receiveMetric(m Metric) { ...@@ -133,38 +128,34 @@ func (a *MetricAggregator) receiveMetric(m Metric) {
// Aggregate starts the MetricAggregator so it begins consuming metrics from MetricChan // Aggregate starts the MetricAggregator so it begins consuming metrics from MetricChan
// and flushing them periodically via its Sender // and flushing them periodically via its Sender
func (m *MetricAggregator) Aggregate() { func (a *MetricAggregator) Aggregate() {
m.counters = make(MetricMap) a.counters = make(MetricMap)
m.gauges = make(MetricMap) a.gauges = make(MetricMap)
m.timers = make(MetricListMap) a.timers = make(MetricListMap)
flushChan := make(chan error) flushChan := make(chan error)
flushTimer := time.NewTimer(m.FlushInterval) flushTimer := time.NewTimer(a.FlushInterval)
for { for {
select { select {
case metric := <-m.MetricChan: // Incoming metrics case metric := <-a.MetricChan: // Incoming metrics
m.receiveMetric(metric) a.receiveMetric(metric)
case <-flushTimer.C: // Time to flush to graphite case <-flushTimer.C: // Time to flush to graphite
m.Lock() flushed := a.flush()
flushed := m.flush()
go func() { go func() {
flushChan <- m.Sender.SendMetrics(flushed) flushChan <- a.Sender.SendMetrics(flushed)
}() }()
m.Reset() a.Reset()
flushTimer = time.NewTimer(m.FlushInterval) flushTimer = time.NewTimer(a.FlushInterval)
m.Unlock()
case flushResult := <-flushChan: case flushResult := <-flushChan:
m.Lock() a.Lock()
if flushResult != nil { if flushResult != nil {
log.Printf("Sending metrics to Graphite failed: %s", flushResult) log.Printf("Sending metrics to Graphite failed: %s", flushResult)
m.stats.GraphiteLastError = time.Now() a.stats.GraphiteLastError = time.Now()
} else { } else {
m.stats.GraphiteLastFlush = time.Now() a.stats.GraphiteLastFlush = time.Now()
} }
m.Unlock() a.Unlock()
} }
} }
......
...@@ -76,12 +76,17 @@ func (srv *MetricReceiver) handleMessage(msg []byte) { ...@@ -76,12 +76,17 @@ func (srv *MetricReceiver) handleMessage(msg []byte) {
return return
} }
metric, err := parseLine(line[:len(line)-1]) lineLength := len(line)
if err != nil { // Only process lines with more than one character
log.Printf("error parsing metric: %s", err) if lineLength > 1 {
continue metric, err := parseLine(line[:lineLength-1])
if err != nil {
log.Println(line)
log.Println(err)
continue
}
go srv.Handler.HandleMetric(metric)
} }
go srv.Handler.HandleMetric(metric)
} }
} }
...@@ -91,22 +96,23 @@ func parseLine(line []byte) (Metric, error) { ...@@ -91,22 +96,23 @@ func parseLine(line []byte) (Metric, error) {
buf := bytes.NewBuffer(line) buf := bytes.NewBuffer(line)
bucket, err := buf.ReadBytes(':') bucket, err := buf.ReadBytes(':')
if err != nil { if err != nil {
return metric, fmt.Errorf("error parsing metric: %s", err) fmt.Println(line)
return metric, fmt.Errorf("error parsing metric name: %s", err)
} }
metric.Bucket = string(bucket[:len(bucket)-1]) metric.Bucket = string(bucket[:len(bucket)-1])
value, err := buf.ReadBytes('|') value, err := buf.ReadBytes('|')
if err != nil { if err != nil {
return metric, fmt.Errorf("error parsing metric: %s", err) return metric, fmt.Errorf("error parsing metric value: %s", err)
} }
metric.Value, err = strconv.ParseFloat(string(value[:len(value)-1]), 64) metric.Value, err = strconv.ParseFloat(string(value[:len(value)-1]), 64)
if err != nil { if err != nil {
return metric, fmt.Errorf("error parsing value of metric: %s", err) return metric, fmt.Errorf("error converting metric value: %s", err)
} }
metricType := buf.Bytes() metricType := buf.Bytes()
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
return metric, fmt.Errorf("error parsing metric: %s", err) return metric, fmt.Errorf("error parsing metric type: %s", err)
} }
switch string(metricType[:len(metricType)]) { switch string(metricType[:len(metricType)]) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment