diff --git a/gostatsd/gostatsd.go b/gostatsd/gostatsd.go index 6bc40ed7acd7689dca890cdf417be4696d69780c..bdc59c1e1315b0f3b96157d9f5c754610d26e532 100644 --- a/gostatsd/gostatsd.go +++ b/gostatsd/gostatsd.go @@ -46,7 +46,7 @@ func main() { go receiver.ListenAndReceive() // Start the console - console := statsd.ConsoleServer{consoleAddr, &aggregator} + console := statsd.WebConsoleServer{consoleAddr, &aggregator} go console.ListenAndServe() // Listen forever diff --git a/statsd/aggregator.go b/statsd/aggregator.go index 1632ef06b1782920573e8c01eff8a806676b0737..d7e654c9eda861bc6469cac0e0f4c5caeac6aa24 100644 --- a/statsd/aggregator.go +++ b/statsd/aggregator.go @@ -9,10 +9,10 @@ import ( // metricAggregatorStats is a bookkeeping structure for statistics about a MetricAggregator type metricAggregatorStats struct { - BadLines int - LastMessage time.Time - GraphiteLastFlush time.Time - GraphiteLastError time.Time + BadLines int + LastMessage time.Time + LastFlush time.Time + LastFlushError time.Time } // MetricSender is an interface that can be implemented by objects which @@ -30,10 +30,10 @@ type MetricAggregator struct { MetricChan chan Metric // Channel on which metrics are received FlushInterval time.Duration // How often to flush metrics to the sender Sender MetricSender // The sender to which metrics are flushed - stats metricAggregatorStats - counters MetricMap - gauges MetricMap - timers MetricListMap + Stats metricAggregatorStats + Counters MetricMap + Gauges MetricMap + Timers MetricListMap } // NewMetricAggregator creates a new MetricAggregator object @@ -53,19 +53,19 @@ func (a *MetricAggregator) flush() (metrics MetricMap) { metrics = make(MetricMap) numStats := 0 - for k, v := range a.counters { + for k, v := range a.Counters { perSecond := v / a.FlushInterval.Seconds() metrics["stats."+k] = perSecond metrics["stats_counts."+k] = v numStats += 1 } - for k, v := range a.gauges { + for k, v := range a.Gauges { metrics["stats.gauges."+k] = v numStats += 1 } - for k, v := range a.timers { + for k, v := range a.Timers { if count := len(v); count > 0 { sort.Float64s(v) min := v[0] @@ -86,12 +86,12 @@ func (a *MetricAggregator) Reset() { defer a.Unlock() a.Lock() - for k := range a.counters { - a.counters[k] = 0 + for k := range a.Counters { + a.Counters[k] = 0 } - for k := range a.timers { - a.timers[k] = []float64{} + for k := range a.Timers { + a.Timers[k] = []float64{} } // No reset for gauges, they keep the last value @@ -104,34 +104,34 @@ func (a *MetricAggregator) receiveMetric(m Metric) { switch m.Type { case COUNTER: - v, ok := a.counters[m.Bucket] + v, ok := a.Counters[m.Bucket] if ok { - a.counters[m.Bucket] = v + m.Value + a.Counters[m.Bucket] = v + m.Value } else { - a.counters[m.Bucket] = m.Value + a.Counters[m.Bucket] = m.Value } case GAUGE: - a.gauges[m.Bucket] = m.Value + a.Gauges[m.Bucket] = m.Value case TIMER: - v, ok := a.timers[m.Bucket] + v, ok := a.Timers[m.Bucket] if ok { v = append(v, m.Value) - a.timers[m.Bucket] = v + a.Timers[m.Bucket] = v } else { - a.timers[m.Bucket] = []float64{m.Value} + a.Timers[m.Bucket] = []float64{m.Value} } case ERROR: - a.stats.BadLines += 1 + a.Stats.BadLines += 1 } - a.stats.LastMessage = time.Now() + a.Stats.LastMessage = time.Now() } // Aggregate starts the MetricAggregator so it begins consuming metrics from MetricChan // and flushing them periodically via its Sender func (a *MetricAggregator) Aggregate() { - a.counters = make(MetricMap) - a.gauges = make(MetricMap) - a.timers = make(MetricListMap) + a.Counters = make(MetricMap) + a.Gauges = make(MetricMap) + a.Timers = make(MetricListMap) flushChan := make(chan error) flushTimer := time.NewTimer(a.FlushInterval) @@ -151,9 +151,9 @@ func (a *MetricAggregator) Aggregate() { if flushResult != nil { log.Printf("Sending metrics to Graphite failed: %s", flushResult) - a.stats.GraphiteLastError = time.Now() + a.Stats.LastFlushError = time.Now() } else { - a.stats.GraphiteLastFlush = time.Now() + a.Stats.LastFlush = time.Now() } a.Unlock() } diff --git a/statsd/console.go b/statsd/console.go index 0daebc6d925e0e42d0442600d9effc6f8e039679..98fffb9eb527fde274b7462d48ee7bf4ee69f7d5 100644 --- a/statsd/console.go +++ b/statsd/console.go @@ -84,39 +84,39 @@ func (c *consoleConn) serve() { "Last message received: %s\n"+ "Last flush to Graphite: %s\n"+ "Last error from Graphite: %s\n", - c.server.Aggregator.stats.BadLines, - c.server.Aggregator.stats.LastMessage, - c.server.Aggregator.stats.GraphiteLastFlush, - c.server.Aggregator.stats.GraphiteLastError) + c.server.Aggregator.Stats.BadLines, + c.server.Aggregator.Stats.LastMessage, + c.server.Aggregator.Stats.LastFlush, + c.server.Aggregator.Stats.LastFlushError) c.server.Aggregator.Unlock() case "counters": c.server.Aggregator.Lock() - result = fmt.Sprint(c.server.Aggregator.counters) + result = fmt.Sprint(c.server.Aggregator.Counters) c.server.Aggregator.Unlock() case "timers": c.server.Aggregator.Lock() - result = fmt.Sprint(c.server.Aggregator.timers) + result = fmt.Sprint(c.server.Aggregator.Timers) c.server.Aggregator.Unlock() case "gauges": c.server.Aggregator.Lock() - result = fmt.Sprint(c.server.Aggregator.gauges) + result = fmt.Sprint(c.server.Aggregator.Gauges) c.server.Aggregator.Unlock() case "delcounters": c.server.Aggregator.Lock() for _, k := range args { - delete(c.server.Aggregator.counters, k) + delete(c.server.Aggregator.Counters, k) } c.server.Aggregator.Unlock() case "deltimers": c.server.Aggregator.Lock() for _, k := range args { - delete(c.server.Aggregator.timers, k) + delete(c.server.Aggregator.Timers, k) } c.server.Aggregator.Unlock() case "delgauges": c.server.Aggregator.Lock() for _, k := range args { - delete(c.server.Aggregator.gauges, k) + delete(c.server.Aggregator.Gauges, k) } c.server.Aggregator.Unlock() case "quit": diff --git a/statsd/webconsole.go b/statsd/webconsole.go new file mode 100644 index 0000000000000000000000000000000000000000..9d698a1d4ba3ca68499f64d42725697d4a6ae5e8 --- /dev/null +++ b/statsd/webconsole.go @@ -0,0 +1,77 @@ +package statsd + +import ( + "html/template" + "net/http" +) + +// WebConsoleServer is an object that listens for HTTP connection on a TCP address Addr +// and provides a web +type WebConsoleServer struct { + Addr string + Aggregator *MetricAggregator +} + +const tempText = ` + + +gostatsd + + + +

gostatsd

+

Bad lines received: {{.Stats.BadLines}}

+

Last messsage received: {{.Stats.LastMessage}}

+

Last flush to graphite: {{.Stats.LastFlush}}

+

Last error flushing to graphite: {{.Stats.LastFlushError}}

+

Counters

+ + +{{range $bucket, $value := .Counters}} + +{{end}} +
BucketValue
{{$bucket}}{{$value}}
+

Gauges

+ + +{{range $bucket, $value := .Gauges}} + +{{end}} +
BucketValue
{{$bucket}}{{$value}}
+

Timers

+ + +{{range $bucket, $value := .Timers}} + +{{end}} +
BucketValue
{{$bucket}}{{$value}}
+ + +` + +var temp = template.Must(template.New("temp").Parse(tempText)) + +func (s *WebConsoleServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { + defer s.Aggregator.Unlock() + s.Aggregator.Lock() + err := temp.Execute(w, s.Aggregator) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } +} + +// ListenAndServe listens on the ConsoleServer's TCP network address and then calls Serve +func (s *WebConsoleServer) ListenAndServe() error { + if s.Addr == "" { + s.Addr = DefaultConsoleAddr + } + return http.ListenAndServe(s.Addr, s) +}