Commit 37cdc384 authored by Kamil Kisiel's avatar Kamil Kisiel

Decoupled metric aggregation from graphite server communication

parent 0e1ae461
......@@ -6,7 +6,7 @@ import (
)
func main() {
err := statsd.ListenAndServe(":8125", ":8126")
err := statsd.ListenAndServe(":8125", ":8126", "localhost:1234")
if err != nil {
log.Fatal(err)
}
......
......@@ -26,7 +26,6 @@ var percentThresholds []float64
func init() {
flushInterval = 10 * time.Second
graphiteServer = "localhost:1234"
percentThresholds = []float64{90.0}
}
......@@ -73,6 +72,30 @@ func (m Metric) String() string {
type MetricMap map[string]float64
type MetricListMap map[string][]float64
type GraphiteClient struct {
conn *net.Conn
}
func NewGraphiteClient(addr string) (client GraphiteClient, err error) {
conn, err := net.Dial("tcp", addr)
client = GraphiteClient{&conn}
return
}
func (client *GraphiteClient) SendMetrics(metrics MetricMap) (err error) {
buf := new(bytes.Buffer)
now := time.Now().Unix()
for k, v := range metrics {
fmt.Fprintf(buf, "%s %f %d\n", k, v, now)
}
_, err = buf.WriteTo(*client.conn)
if err != nil {
return err
}
return nil
}
func (m MetricListMap) String() string {
buf := new(bytes.Buffer)
for k, v := range m {
......@@ -129,27 +152,19 @@ func thresholdStats(vals []float64, threshold float64) (mean, upper float64) {
return mean, upper
}
func flushMetrics(flushChan chan error, counters MetricMap, gauges MetricMap, timers MetricListMap, flushInterval time.Duration) {
conn, err := net.Dial("tcp", graphiteServer)
if err != nil {
flushChan <- err
return
}
defer conn.Close()
func aggregateMetrics(counters MetricMap, gauges MetricMap, timers MetricListMap, flushInterval time.Duration) (metrics MetricMap) {
metrics = make(MetricMap)
numStats := 0
now := time.Now().Unix()
buf := new(bytes.Buffer)
for k, v := range counters {
perSecond := v / flushInterval.Seconds()
fmt.Fprintf(buf, "stats.%s %f %d\n", k, perSecond, now)
fmt.Fprintf(buf, "stats_counts.%s %f %d\n", k, v, now)
metrics["stats." + k] = perSecond
metrics["stats_counts." + k] = v
numStats += 1
}
for k, v := range gauges {
fmt.Fprintf(buf, "stats.gauges.%s %f %d\n", k, v, now)
metrics["stats.gauges." + k] = v
numStats += 1
}
......@@ -159,31 +174,28 @@ func flushMetrics(flushChan chan error, counters MetricMap, gauges MetricMap, ti
min := v[0]
max := v[count-1]
fmt.Fprintf(buf, "stats.timers.%s.lower %f %d\n", k, min, now)
fmt.Fprintf(buf, "stats.timers.%s.upper %f %d\n", k, max, now)
fmt.Fprintf(buf, "stats.timers.%s.count %d %d\n", k, count, now)
metrics["stats.timers." + k + ".lower"] = min
metrics["stats.timers." + k + ".upper"] = max
metrics["stats.timers." + k + ".count"] = float64(count)
for _, threshold := range percentThresholds {
mean, upper := thresholdStats(v, threshold)
thresholdName := strconv.FormatFloat(threshold, 'f', 1, 64)
fmt.Fprintf(buf, "stats.timers.%s.mean_%s %f %d\n", k, thresholdName, mean, now)
fmt.Fprintf(buf, "stats.timers.%s.upper_%s %f %d\n", k, thresholdName, upper, now)
metrics["stats.timers." + k + "mean_" + thresholdName] = mean
metrics["stats.timers." + k + "upper_" + thresholdName] = upper
}
numStats += 1
}
}
fmt.Fprintf(buf, "statsd.numStats %d %d\n", numStats, now)
metrics["statsd.numStats"] = float64(numStats)
return metrics
}
_, err = buf.WriteTo(conn)
func metricAggregator(graphiteAddr string, metricChan chan Metric, consoleChan chan ConsoleRequest) (err error) {
graphite, err := NewGraphiteClient(graphiteAddr)
if err != nil {
log.Printf("Failed to write metrics to Graphite server")
flushChan <- err
} else {
flushChan <- nil
return
}
}
func metricAggregator(metricChan chan Metric, consoleChan chan ConsoleRequest) {
stats := new(MetricAggregatorStats)
counters := make(MetricMap)
gauges := make(MetricMap)
......@@ -220,7 +232,7 @@ func metricAggregator(metricChan chan Metric, consoleChan chan ConsoleRequest) {
}
stats.LastMessage = time.Now()
case <-flushTimer.C: // Time to flush to graphite
go flushMetrics(flushChan, counters, gauges, timers, flushInterval)
go graphite.SendMetrics(aggregateMetrics(counters, gauges, timers, flushInterval))
// Reset counters
new_counters := make(MetricMap)
......@@ -289,6 +301,8 @@ func metricAggregator(metricChan chan Metric, consoleChan chan ConsoleRequest) {
consoleRequest.ResultChan <- result
}
}
return
}
......@@ -428,12 +442,12 @@ func consoleServer(addr string, consoleChan chan ConsoleRequest) {
}
}
func ListenAndServe(metric_addr string, console_addr string) error {
func ListenAndServe(metricAddr string, consoleAddr string, graphiteAddr string) error {
var metricChan = make(chan Metric)
var consoleChan = make(chan ConsoleRequest)
go metricListener(metric_addr, metricChan)
go metricAggregator(metricChan, consoleChan)
go consoleServer(console_addr, consoleChan)
go metricListener(metricAddr, metricChan)
go metricAggregator(graphiteAddr, metricChan, consoleChan)
go consoleServer(consoleAddr, consoleChan)
// Run forever
select {}
return nil
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment