Commit f402c352 authored by Kamil Kisiel's avatar Kamil Kisiel

Implemented statistics

parent 49bd4284
......@@ -24,12 +24,13 @@ func init() {
percentThresholds = []float64{90.0}
}
type MetricType int
type MetricType float64
// Enumeration, see http://golang.org/doc/effective_go.html#constants
const (
_ = iota
COUNTER MetricType = 1 << (10 * iota)
ERROR MetricType = 1 << (10 * iota)
COUNTER
TIMER
GAUGE
)
......@@ -52,6 +53,13 @@ type Metric struct {
Value float64
}
type MetricAggregatorStats struct {
BadLines int
LastMessage time.Time
GraphiteLastFlush time.Time
GraphiteLastError time.Time
}
func (m Metric) String() string {
return fmt.Sprintf("{%s, %s, %f}", m.Type, m.Bucket, m.Value)
}
......@@ -64,7 +72,7 @@ func (m MetricListMap) String() string {
for k, v := range m {
buf.Write([]byte(fmt.Sprint(k)))
for _, v2 := range v {
buf.Write([]byte(fmt.Sprintf("\t%f\n", k, v2)))
fmt.Fprintf(buf, "\t%f\n", k, v2)
}
}
return buf.String()
......@@ -73,7 +81,7 @@ func (m MetricListMap) String() string {
func (m MetricMap) String() string {
buf := new(bytes.Buffer)
for k, v := range m {
buf.Write([]byte(fmt.Sprintf("%s: %f\n", k, v)))
fmt.Fprintf(buf, "%s: %f\n", k, v)
}
return buf.String()
}
......@@ -115,26 +123,27 @@ func thresholdStats(vals []float64, threshold float64) (mean, upper float64) {
return mean, upper
}
func flushMetrics(counters MetricMap, gauges MetricMap, timers MetricListMap, flushInterval time.Duration) {
func flushMetrics(flushChan chan error, counters MetricMap, gauges MetricMap, timers MetricListMap, flushInterval time.Duration) {
conn, err := net.Dial("tcp", graphiteServer)
if err != nil {
log.Printf("Could not contact Graphite server")
flushChan <- err
return
}
defer conn.Close()
numStats := 0
now := time.Now().Unix()
buf := new(bytes.Buffer)
for k, v := range counters {
perSecond := v / flushInterval.Seconds()
fmt.Fprintf(conn, "stats.%s %f %d\n", k, perSecond, now)
fmt.Fprintf(conn, "stats_counts.%s %f %d\n", k, v, now)
fmt.Fprintf(buf, "stats.%s %f %d\n", k, perSecond, now)
fmt.Fprintf(buf, "stats_counts.%s %f %d\n", k, v, now)
numStats += 1
}
for k, v := range gauges {
fmt.Fprintf(conn, "stats.gauges.%s %f %d\n", k, v, now)
fmt.Fprintf(buf, "stats.gauges.%s %f %d\n", k, v, now)
numStats += 1
}
......@@ -144,29 +153,38 @@ func flushMetrics(counters MetricMap, gauges MetricMap, timers MetricListMap, fl
min := v[0]
max := v[count-1]
fmt.Fprintf(conn, "stats.timers.%s.lower %f %d\n", k, min, now)
fmt.Fprintf(conn, "stats.timers.%s.upper %f %d\n", k, max, now)
fmt.Fprintf(conn, "stats.timers.%s.count %d %d\n", k, count, now)
fmt.Fprintf(buf, "stats.timers.%s.lower %f %d\n", k, min, now)
fmt.Fprintf(buf, "stats.timers.%s.upper %f %d\n", k, max, now)
fmt.Fprintf(buf, "stats.timers.%s.count %d %d\n", k, count, now)
for _, threshold := range percentThresholds {
mean, upper := thresholdStats(v, threshold)
thresholdName := strconv.FormatFloat(threshold, 'f', 1, 64)
fmt.Fprintf(conn, "stats.timers.%s.mean_%s %f %d\n", k, thresholdName, mean, now)
fmt.Fprintf(conn, "stats.timers.%s.upper_%s %f %d\n", k, thresholdName, upper, now)
fmt.Fprintf(buf, "stats.timers.%s.mean_%s %f %d\n", k, thresholdName, mean, now)
fmt.Fprintf(buf, "stats.timers.%s.upper_%s %f %d\n", k, thresholdName, upper, now)
}
numStats += 1
}
}
fmt.Fprintf(conn, "statsd.numStats %d %d\n", numStats, now)
fmt.Fprintf(buf, "statsd.numStats %d %d\n", numStats, now)
_, err = buf.WriteTo(conn)
if err != nil {
log.Printf("Failed to write metrics to Graphite server")
flushChan <- err
} else {
flushChan <- nil
}
}
func metricAggregator(metricChan chan Metric, consoleChan chan ConsoleRequest) {
var counters = make(MetricMap)
var gauges = make(MetricMap)
var timers = make(MetricListMap)
stats := new(MetricAggregatorStats)
counters := make(MetricMap)
gauges := make(MetricMap)
timers := make(MetricListMap)
flushTimer := time.NewTimer(flushInterval)
flushChan := make(chan error)
log.Printf("Started aggregator")
......@@ -191,9 +209,12 @@ func metricAggregator(metricChan chan Metric, consoleChan chan ConsoleRequest) {
} else {
timers[metric.Bucket] = []float64{metric.Value}
}
case ERROR:
stats.BadLines += 1
}
stats.LastMessage = time.Now()
case <-flushTimer.C: // Time to flush to graphite
go flushMetrics(counters, gauges, timers, flushInterval)
go flushMetrics(flushChan, counters, gauges, timers, flushInterval)
// Reset counters
new_counters := make(MetricMap)
......@@ -217,13 +238,25 @@ func metricAggregator(metricChan chan Metric, consoleChan chan ConsoleRequest) {
gauges = new_gauges
flushTimer = time.NewTimer(flushInterval)
case flushResult := <-flushChan:
if flushResult != nil {
log.Printf("Sending metrics to Graphite failed: %s", flushResult)
stats.GraphiteLastError = time.Now()
} else {
stats.GraphiteLastFlush = time.Now()
}
case consoleRequest := <-consoleChan:
var result string
switch parts := strings.Split(strings.TrimSpace(consoleRequest.Command), " "); parts[0] {
case "help":
result = "Commands: stats, counters, timers, gauges, delcounters, deltimers, delgauges, quit\n"
case "stats":
result = "stats:\n"
result = fmt.Sprintf(
"Invalid messages received: %d\n" +
"Last message received: %s\n" +
"Last flush to Graphite: %s\n" +
"Last error from Graphite: %s\n",
stats.BadLines, stats.LastMessage, stats.GraphiteLastFlush, stats.GraphiteLastError)
case "counters":
result = fmt.Sprint(counters)
case "timers":
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment