Commit 509b9aae authored by Kamil Kisiel's avatar Kamil Kisiel

Added a web console server

parent 0e1ebcaa
......@@ -46,7 +46,7 @@ func main() {
go receiver.ListenAndReceive()
// Start the console
console := statsd.ConsoleServer{consoleAddr, &aggregator}
console := statsd.WebConsoleServer{consoleAddr, &aggregator}
go console.ListenAndServe()
// Listen forever
......
......@@ -11,8 +11,8 @@ import (
type metricAggregatorStats struct {
BadLines int
LastMessage time.Time
GraphiteLastFlush time.Time
GraphiteLastError time.Time
LastFlush time.Time
LastFlushError time.Time
}
// MetricSender is an interface that can be implemented by objects which
......@@ -30,10 +30,10 @@ type MetricAggregator struct {
MetricChan chan Metric // Channel on which metrics are received
FlushInterval time.Duration // How often to flush metrics to the sender
Sender MetricSender // The sender to which metrics are flushed
stats metricAggregatorStats
counters MetricMap
gauges MetricMap
timers MetricListMap
Stats metricAggregatorStats
Counters MetricMap
Gauges MetricMap
Timers MetricListMap
}
// NewMetricAggregator creates a new MetricAggregator object
......@@ -53,19 +53,19 @@ func (a *MetricAggregator) flush() (metrics MetricMap) {
metrics = make(MetricMap)
numStats := 0
for k, v := range a.counters {
for k, v := range a.Counters {
perSecond := v / a.FlushInterval.Seconds()
metrics["stats."+k] = perSecond
metrics["stats_counts."+k] = v
numStats += 1
}
for k, v := range a.gauges {
for k, v := range a.Gauges {
metrics["stats.gauges."+k] = v
numStats += 1
}
for k, v := range a.timers {
for k, v := range a.Timers {
if count := len(v); count > 0 {
sort.Float64s(v)
min := v[0]
......@@ -86,12 +86,12 @@ func (a *MetricAggregator) Reset() {
defer a.Unlock()
a.Lock()
for k := range a.counters {
a.counters[k] = 0
for k := range a.Counters {
a.Counters[k] = 0
}
for k := range a.timers {
a.timers[k] = []float64{}
for k := range a.Timers {
a.Timers[k] = []float64{}
}
// No reset for gauges, they keep the last value
......@@ -104,34 +104,34 @@ func (a *MetricAggregator) receiveMetric(m Metric) {
switch m.Type {
case COUNTER:
v, ok := a.counters[m.Bucket]
v, ok := a.Counters[m.Bucket]
if ok {
a.counters[m.Bucket] = v + m.Value
a.Counters[m.Bucket] = v + m.Value
} else {
a.counters[m.Bucket] = m.Value
a.Counters[m.Bucket] = m.Value
}
case GAUGE:
a.gauges[m.Bucket] = m.Value
a.Gauges[m.Bucket] = m.Value
case TIMER:
v, ok := a.timers[m.Bucket]
v, ok := a.Timers[m.Bucket]
if ok {
v = append(v, m.Value)
a.timers[m.Bucket] = v
a.Timers[m.Bucket] = v
} else {
a.timers[m.Bucket] = []float64{m.Value}
a.Timers[m.Bucket] = []float64{m.Value}
}
case ERROR:
a.stats.BadLines += 1
a.Stats.BadLines += 1
}
a.stats.LastMessage = time.Now()
a.Stats.LastMessage = time.Now()
}
// Aggregate starts the MetricAggregator so it begins consuming metrics from MetricChan
// and flushing them periodically via its Sender
func (a *MetricAggregator) Aggregate() {
a.counters = make(MetricMap)
a.gauges = make(MetricMap)
a.timers = make(MetricListMap)
a.Counters = make(MetricMap)
a.Gauges = make(MetricMap)
a.Timers = make(MetricListMap)
flushChan := make(chan error)
flushTimer := time.NewTimer(a.FlushInterval)
......@@ -151,9 +151,9 @@ func (a *MetricAggregator) Aggregate() {
if flushResult != nil {
log.Printf("Sending metrics to Graphite failed: %s", flushResult)
a.stats.GraphiteLastError = time.Now()
a.Stats.LastFlushError = time.Now()
} else {
a.stats.GraphiteLastFlush = time.Now()
a.Stats.LastFlush = time.Now()
}
a.Unlock()
}
......
......@@ -84,39 +84,39 @@ func (c *consoleConn) serve() {
"Last message received: %s\n"+
"Last flush to Graphite: %s\n"+
"Last error from Graphite: %s\n",
c.server.Aggregator.stats.BadLines,
c.server.Aggregator.stats.LastMessage,
c.server.Aggregator.stats.GraphiteLastFlush,
c.server.Aggregator.stats.GraphiteLastError)
c.server.Aggregator.Stats.BadLines,
c.server.Aggregator.Stats.LastMessage,
c.server.Aggregator.Stats.LastFlush,
c.server.Aggregator.Stats.LastFlushError)
c.server.Aggregator.Unlock()
case "counters":
c.server.Aggregator.Lock()
result = fmt.Sprint(c.server.Aggregator.counters)
result = fmt.Sprint(c.server.Aggregator.Counters)
c.server.Aggregator.Unlock()
case "timers":
c.server.Aggregator.Lock()
result = fmt.Sprint(c.server.Aggregator.timers)
result = fmt.Sprint(c.server.Aggregator.Timers)
c.server.Aggregator.Unlock()
case "gauges":
c.server.Aggregator.Lock()
result = fmt.Sprint(c.server.Aggregator.gauges)
result = fmt.Sprint(c.server.Aggregator.Gauges)
c.server.Aggregator.Unlock()
case "delcounters":
c.server.Aggregator.Lock()
for _, k := range args {
delete(c.server.Aggregator.counters, k)
delete(c.server.Aggregator.Counters, k)
}
c.server.Aggregator.Unlock()
case "deltimers":
c.server.Aggregator.Lock()
for _, k := range args {
delete(c.server.Aggregator.timers, k)
delete(c.server.Aggregator.Timers, k)
}
c.server.Aggregator.Unlock()
case "delgauges":
c.server.Aggregator.Lock()
for _, k := range args {
delete(c.server.Aggregator.gauges, k)
delete(c.server.Aggregator.Gauges, k)
}
c.server.Aggregator.Unlock()
case "quit":
......
package statsd
import (
"html/template"
"net/http"
)
// WebConsoleServer is an object that listens for HTTP connection on a TCP address Addr
// and provides a web
type WebConsoleServer struct {
Addr string
Aggregator *MetricAggregator
}
const tempText = `
<html>
<head>
<title>gostatsd</title>
<style>
table
{
border-collapse:collapse;
}
table, th, td {
border: 1px solid black;
}
</style>
</head>
<body>
<h1>gostatsd</h1>
<p>Bad lines received: {{.Stats.BadLines}}</p>
<p>Last messsage received: {{.Stats.LastMessage}}</p>
<p>Last flush to graphite: {{.Stats.LastFlush}}</p>
<p>Last error flushing to graphite: {{.Stats.LastFlushError}}</p>
<h2>Counters</h2>
<table>
<tr><th>Bucket</th><th>Value</th></tr>
{{range $bucket, $value := .Counters}}
<tr><td>{{$bucket}}</td><td>{{$value}}</td></tr>
{{end}}
</table>
<h2>Gauges</h2>
<table>
<tr><th>Bucket</th><th>Value</th></tr>
{{range $bucket, $value := .Gauges}}
<tr><td>{{$bucket}}</td><td>{{$value}}</td></tr>
{{end}}
</table>
<h2>Timers</h2>
<table>
<tr><th>Bucket</th><th>Value</th></tr>
{{range $bucket, $value := .Timers}}
<tr><td>{{$bucket}}</td><td>{{$value}}</td></tr>
{{end}}
</table>
</body>
</html>
`
var temp = template.Must(template.New("temp").Parse(tempText))
func (s *WebConsoleServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
defer s.Aggregator.Unlock()
s.Aggregator.Lock()
err := temp.Execute(w, s.Aggregator)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}
// ListenAndServe listens on the ConsoleServer's TCP network address and then calls Serve
func (s *WebConsoleServer) ListenAndServe() error {
if s.Addr == "" {
s.Addr = DefaultConsoleAddr
}
return http.ListenAndServe(s.Addr, s)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment