diff --git a/statsd/aggregator.go b/statsd/aggregator.go new file mode 100644 index 0000000000000000000000000000000000000000..2cbf6101699a6adf37d04b0c2b1aad5db4dc8a3e --- /dev/null +++ b/statsd/aggregator.go @@ -0,0 +1,182 @@ +package statsd + +import ( + "log" + "sort" + "strconv" + "sync" + "time" +) + +var percentThresholds []float64 + +func init() { + percentThresholds = []float64{90.0} +} + +type MetricAggregatorStats struct { + BadLines int + LastMessage time.Time + GraphiteLastFlush time.Time + GraphiteLastError time.Time +} + +func thresholdStats(vals []float64, threshold float64) (mean, upper float64) { + if count := len(vals); count > 1 { + idx := int(round(((100 - threshold) / 100) * float64(count))) + thresholdCount := count - idx + thresholdValues := vals[:thresholdCount] + + mean = average(thresholdValues) + upper = thresholdValues[len(thresholdValues)-1] + } else { + mean = vals[0] + upper = vals[0] + } + return mean, upper +} + +type MetricSender interface { + SendMetrics(MetricMap) error +} + +type MetricAggregator struct { + sync.Mutex + MetricChan chan Metric // Channel on which metrics are received + FlushInterval time.Duration // How often to flush metrics to the sender + Sender MetricSender + stats MetricAggregatorStats + counters MetricMap + gauges MetricMap + timers MetricListMap +} + +func (m *MetricAggregator) flush() (metrics MetricMap) { + metrics = make(MetricMap) + numStats := 0 + + for k, v := range m.counters { + perSecond := v / m.FlushInterval.Seconds() + metrics["stats."+k] = perSecond + metrics["stats_counts."+k] = v + numStats += 1 + } + + for k, v := range m.gauges { + metrics["stats.gauges."+k] = v + numStats += 1 + } + + for k, v := range m.timers { + if count := len(v); count > 0 { + sort.Float64s(v) + min := v[0] + max := v[count-1] + + metrics["stats.timers."+k+".lower"] = min + metrics["stats.timers."+k+".upper"] = max + metrics["stats.timers."+k+".count"] = float64(count) + + for _, threshold := range percentThresholds { + mean, upper := thresholdStats(v, threshold) + thresholdName := strconv.FormatFloat(threshold, 'f', 1, 64) + metrics["stats.timers."+k+"mean_"+thresholdName] = mean + metrics["stats.timers."+k+"upper_"+thresholdName] = upper + } + numStats += 1 + } + } + metrics["statsd.numStats"] = float64(numStats) + return metrics +} + +func (a *MetricAggregator) Reset() { + // Reset counters + new_counters := make(MetricMap) + for k := range a.counters { + new_counters[k] = 0 + } + a.counters = new_counters + + // Reset timers + new_timers := make(MetricListMap) + for k := range a.timers { + new_timers[k] = []float64{} + } + a.timers = new_timers + + // Keep values of gauges + new_gauges := make(MetricMap) + for k, v := range a.gauges { + new_gauges[k] = v + } + a.gauges = new_gauges +} + +func (a *MetricAggregator) receiveMetric(m Metric) { + defer a.Unlock() + a.Lock() + + switch m.Type { + case COUNTER: + v, ok := a.counters[m.Bucket] + if ok { + a.counters[m.Bucket] = v + m.Value + } else { + a.counters[m.Bucket] = m.Value + } + case GAUGE: + a.gauges[m.Bucket] = m.Value + case TIMER: + v, ok := a.timers[m.Bucket] + if ok { + v = append(v, m.Value) + a.timers[m.Bucket] = v + } else { + a.timers[m.Bucket] = []float64{m.Value} + } + case ERROR: + a.stats.BadLines += 1 + } + a.stats.LastMessage = time.Now() +} + +func (m *MetricAggregator) Aggregate() { + m.counters = make(MetricMap) + m.gauges = make(MetricMap) + m.timers = make(MetricListMap) + flushChan := make(chan error) + flushTimer := time.NewTimer(m.FlushInterval) + + for { + select { + case metric := <-m.MetricChan: // Incoming metrics + m.receiveMetric(metric) + case <-flushTimer.C: // Time to flush to graphite + m.Lock() + + flushed := m.flush() + go func() { + e := m.Sender.SendMetrics(flushed) + if e != nil { + flushChan <- e + } + }() + m.Reset() + flushTimer = time.NewTimer(m.FlushInterval) + + m.Unlock() + case flushResult := <-flushChan: + m.Lock() + + if flushResult != nil { + log.Printf("Sending metrics to Graphite failed: %s", flushResult) + m.stats.GraphiteLastError = time.Now() + } else { + m.stats.GraphiteLastFlush = time.Now() + } + m.Unlock() + } + } + +} diff --git a/statsd/console.go b/statsd/console.go new file mode 100644 index 0000000000000000000000000000000000000000..492c859b063bb4623b8ff8a67796873521c22514 --- /dev/null +++ b/statsd/console.go @@ -0,0 +1,114 @@ +package statsd + +import ( + "fmt" + "net" + "strings" +) + +const DefaultConsoleAddr = ":8126" + +type ConsoleServer struct { + Addr string + Aggregator *MetricAggregator +} + +func (s *ConsoleServer) ListenAndServe() error { + addr := s.Addr + if addr == "" { + addr = DefaultConsoleAddr + } + l, err := net.Listen("tcp", addr) + if err != nil { + return err + } + return s.Serve(l) +} + +func (s *ConsoleServer) Serve(l net.Listener) error { + defer l.Close() + for { + c, err := l.Accept() + if err != nil { + return err + } + console := consoleConn{c, s} + go console.serve() + } + panic("not reached") +} + +type consoleConn struct { + conn net.Conn + server *ConsoleServer +} + +func (c *consoleConn) serve() { + buf := make([]byte, 1024) + + for { + nbytes, err := c.conn.Read(buf) + if err != nil { + // Connection has likely closed + return + } + + var result string + switch parts := strings.Fields(string(buf[:nbytes])); parts[0] { + case "help": + result = "Commands: stats, counters, timers, gauges, delcounters, deltimers, delgauges, quit\n" + case "stats": + c.server.Aggregator.Lock() + result = fmt.Sprintf( + "Invalid messages received: %d\n"+ + "Last message received: %s\n"+ + "Last flush to Graphite: %s\n"+ + "Last error from Graphite: %s\n", + c.server.Aggregator.stats.BadLines, + c.server.Aggregator.stats.LastMessage, + c.server.Aggregator.stats.GraphiteLastFlush, + c.server.Aggregator.stats.GraphiteLastError) + c.server.Aggregator.Unlock() + case "counters": + c.server.Aggregator.Lock() + result = fmt.Sprint(c.server.Aggregator.counters) + c.server.Aggregator.Unlock() + case "timers": + c.server.Aggregator.Lock() + result = fmt.Sprint(c.server.Aggregator.timers) + c.server.Aggregator.Unlock() + case "gauges": + c.server.Aggregator.Lock() + result = fmt.Sprint(c.server.Aggregator.gauges) + c.server.Aggregator.Unlock() + case "delcounters": + c.server.Aggregator.Lock() + for _, k := range parts[1:] { + delete(c.server.Aggregator.counters, k) + } + c.server.Aggregator.Unlock() + case "deltimers": + c.server.Aggregator.Lock() + for _, k := range parts[1:] { + delete(c.server.Aggregator.timers, k) + } + c.server.Aggregator.Unlock() + case "delgauges": + c.server.Aggregator.Lock() + for _, k := range parts[1:] { + delete(c.server.Aggregator.gauges, k) + } + c.server.Aggregator.Unlock() + case "quit": + result = "quit" + default: + result = fmt.Sprintf("unknown command: %s\n", parts[0]) + } + + if result == "quit" { + return + } else { + c.conn.Write([]byte(result)) + } + } +} diff --git a/statsd/graphite.go b/statsd/graphite.go new file mode 100644 index 0000000000000000000000000000000000000000..0182f85cf8b5b7ce9ffc290a0ad93e6a69f20f47 --- /dev/null +++ b/statsd/graphite.go @@ -0,0 +1,39 @@ +package statsd + +import ( + "bytes" + "fmt" + "net" + "time" +) + +// Normalize a bucket name by replacing or translating invalid characters +func normalizeBucketName(name string) string { + nospaces := regSpaces.ReplaceAllString(name, "_") + noslashes := regSlashes.ReplaceAllString(nospaces, "-") + return regInvalid.ReplaceAllString(noslashes, "") +} + +type GraphiteClient struct { + conn *net.Conn +} + +func NewGraphiteClient(addr string) (client GraphiteClient, err error) { + conn, err := net.Dial("tcp", addr) + client = GraphiteClient{&conn} + return +} + +func (client *GraphiteClient) SendMetrics(metrics MetricMap) (err error) { + buf := new(bytes.Buffer) + now := time.Now().Unix() + for k, v := range metrics { + nk := normalizeBucketName(k) + fmt.Fprintf(buf, "%s %f %d\n", nk, v, now) + } + _, err = buf.WriteTo(*client.conn) + if err != nil { + return err + } + return nil +} diff --git a/statsd/metrics.go b/statsd/metrics.go new file mode 100644 index 0000000000000000000000000000000000000000..9dafcf91db2a44e52c538db3d4be829354ae6c98 --- /dev/null +++ b/statsd/metrics.go @@ -0,0 +1,70 @@ +package statsd + +import ( + "bytes" + "fmt" + "regexp" +) + +// Regular expressions used for bucket name normalization +var ( + regSpaces = regexp.MustCompile("\\s+") + regSlashes = regexp.MustCompile("\\/") + regInvalid = regexp.MustCompile("[^a-zA-Z_\\-0-9\\.]") +) + +type MetricType float64 + +// Enumeration, see http://golang.org/doc/effective_go.html#constants +const ( + _ = iota + ERROR MetricType = 1 << (10 * iota) + COUNTER + TIMER + GAUGE +) + +func (m MetricType) String() string { + switch { + case m >= GAUGE: + return "gauge" + case m >= TIMER: + return "timer" + case m >= COUNTER: + return "counter" + } + return "unknown" +} + +type Metric struct { + Type MetricType + Bucket string + Value float64 +} + +func (m Metric) String() string { + return fmt.Sprintf("{%s, %s, %f}", m.Type, m.Bucket, m.Value) +} + +type MetricMap map[string]float64 + +func (m MetricMap) String() string { + buf := new(bytes.Buffer) + for k, v := range m { + fmt.Fprintf(buf, "%s: %f\n", k, v) + } + return buf.String() +} + +type MetricListMap map[string][]float64 + +func (m MetricListMap) String() string { + buf := new(bytes.Buffer) + for k, v := range m { + buf.Write([]byte(fmt.Sprint(k))) + for _, v2 := range v { + fmt.Fprintf(buf, "\t%f\n", k, v2) + } + } + return buf.String() +} diff --git a/statsd/receiver.go b/statsd/receiver.go index c06afde99ee37ea2c0c92cddbe980b659954f2bd..971a59b5fa1d71555c94b9933040cb26e3f68552 100644 --- a/statsd/receiver.go +++ b/statsd/receiver.go @@ -1,8 +1,11 @@ package statsd import ( + "fmt" "log" "net" + "strconv" + "strings" ) // DefaultMetricsAddr is the default address on which a MetricReceiver will listen @@ -54,7 +57,6 @@ func (r *MetricReceiver) Receive(c net.PacketConn) error { log.Printf("%s", err) continue } - log.Printf("%s", msg[:nbytes]) go r.handleMessage(msg[:nbytes]) } panic("not reached") @@ -70,3 +72,66 @@ func (srv *MetricReceiver) handleMessage(msg []byte) { srv.Handler.HandleMetric(metric) } } + +func parseMessage(msg string) ([]Metric, error) { + metricList := []Metric{} + + segments := strings.Split(strings.TrimSpace(msg), ":") + if len(segments) < 1 { + return metricList, fmt.Errorf("ill-formatted message: %s", msg) + } + + bucket := segments[0] + var values []string + if len(segments) == 1 { + values = []string{"1"} + } else { + values = segments[1:] + } + + for _, value := range values { + fields := strings.Split(value, "|") + + metricValue, err := strconv.ParseFloat(fields[0], 64) + if err != nil { + return metricList, fmt.Errorf("%s: bad metric value \"%s\"", bucket, fields[0]) + } + + var metricTypeString string + if len(fields) == 1 { + metricTypeString = "c" + } else { + metricTypeString = fields[1] + } + + var metricType MetricType + switch metricTypeString { + case "ms": + // Timer + metricType = TIMER + case "g": + // Gauge + metricType = GAUGE + default: + // Counter, allows skipping of |c suffix + metricType = COUNTER + + var rate float64 + if len(fields) == 3 { + var err error + rate, err = strconv.ParseFloat(fields[2][1:], 64) + if err != nil { + return metricList, fmt.Errorf("%s: bad rate %s", fields[2]) + } + } else { + rate = 1 + } + metricValue = metricValue / rate + } + + metric := Metric{metricType, bucket, metricValue} + metricList = append(metricList, metric) + } + + return metricList, nil +} diff --git a/statsd/server.go b/statsd/server.go index 18aba95e8511eb1e0d3cfd152eb04941b5381964..d83d208028fe8c779ca2fc771e8d6ce96f8cbb4e 100644 --- a/statsd/server.go +++ b/statsd/server.go @@ -1,429 +1,29 @@ package statsd import ( - "bytes" - "fmt" - "log" - "math" - "net" - "regexp" - "sort" - "strconv" - "strings" "time" ) -// Regular expressions used for bucket name normalization -var ( - regSpaces = regexp.MustCompile("\\s+") - regSlashes = regexp.MustCompile("\\/") - regInvalid = regexp.MustCompile("[^a-zA-Z_\\-0-9\\.]") -) - -var percentThresholds []float64 - -func init() { - percentThresholds = []float64{90.0} -} - -type MetricType float64 - -// Enumeration, see http://golang.org/doc/effective_go.html#constants -const ( - _ = iota - ERROR MetricType = 1 << (10 * iota) - COUNTER - TIMER - GAUGE -) - -func (m MetricType) String() string { - switch { - case m >= GAUGE: - return "gauge" - case m >= TIMER: - return "timer" - case m >= COUNTER: - return "counter" - } - return "unknown" -} - -type Metric struct { - Type MetricType - Bucket string - Value float64 -} - -type MetricAggregatorStats struct { - BadLines int - LastMessage time.Time - GraphiteLastFlush time.Time - GraphiteLastError time.Time -} - -func (m Metric) String() string { - return fmt.Sprintf("{%s, %s, %f}", m.Type, m.Bucket, m.Value) -} - -type MetricMap map[string]float64 -type MetricListMap map[string][]float64 - -type GraphiteClient struct { - conn *net.Conn -} - -func NewGraphiteClient(addr string) (client GraphiteClient, err error) { - conn, err := net.Dial("tcp", addr) - client = GraphiteClient{&conn} - return -} - -func (client *GraphiteClient) SendMetrics(metrics MetricMap) (err error) { - buf := new(bytes.Buffer) - now := time.Now().Unix() - for k, v := range metrics { - fmt.Fprintf(buf, "%s %f %d\n", k, v, now) - } - _, err = buf.WriteTo(*client.conn) - if err != nil { - return err - } - return nil -} - -func (m MetricListMap) String() string { - buf := new(bytes.Buffer) - for k, v := range m { - buf.Write([]byte(fmt.Sprint(k))) - for _, v2 := range v { - fmt.Fprintf(buf, "\t%f\n", k, v2) - } - } - return buf.String() -} - -func (m MetricMap) String() string { - buf := new(bytes.Buffer) - for k, v := range m { - fmt.Fprintf(buf, "%s: %f\n", k, v) - } - return buf.String() -} - -type ConsoleRequest struct { - Command string - ResultChan chan string -} - -type ConsoleSession struct { - RequestChan chan string - ResultChan chan string -} - -func round(v float64) float64 { - return math.Floor(v + 0.5) -} - -func average(vals []float64) float64 { - sum := 0.0 - for _, v := range vals { - sum += v - } - return sum / float64(len(vals)) -} - -func thresholdStats(vals []float64, threshold float64) (mean, upper float64) { - if count := len(vals); count > 1 { - idx := int(round(((100 - threshold) / 100) * float64(count))) - thresholdCount := count - idx - thresholdValues := vals[:thresholdCount] - - mean = average(thresholdValues) - upper = thresholdValues[len(thresholdValues)-1] - } else { - mean = vals[0] - upper = vals[0] - } - return mean, upper -} - -func aggregateMetrics(counters MetricMap, gauges MetricMap, timers MetricListMap, flushInterval time.Duration) (metrics MetricMap) { - metrics = make(MetricMap) - numStats := 0 - - for k, v := range counters { - perSecond := v / flushInterval.Seconds() - metrics["stats."+k] = perSecond - metrics["stats_counts."+k] = v - numStats += 1 - } - - for k, v := range gauges { - metrics["stats.gauges."+k] = v - numStats += 1 - } - - for k, v := range timers { - if count := len(v); count > 0 { - sort.Float64s(v) - min := v[0] - max := v[count-1] - - metrics["stats.timers."+k+".lower"] = min - metrics["stats.timers."+k+".upper"] = max - metrics["stats.timers."+k+".count"] = float64(count) - - for _, threshold := range percentThresholds { - mean, upper := thresholdStats(v, threshold) - thresholdName := strconv.FormatFloat(threshold, 'f', 1, 64) - metrics["stats.timers."+k+"mean_"+thresholdName] = mean - metrics["stats.timers."+k+"upper_"+thresholdName] = upper - } - numStats += 1 - } - } - metrics["statsd.numStats"] = float64(numStats) - return metrics -} - -func metricAggregator(graphiteAddr string, metricChan chan Metric, consoleChan chan ConsoleRequest, flushInterval time.Duration) (err error) { +func ListenAndServe(metricAddr string, consoleAddr string, graphiteAddr string, flushInterval time.Duration) error { graphite, err := NewGraphiteClient(graphiteAddr) if err != nil { - return - } - stats := new(MetricAggregatorStats) - counters := make(MetricMap) - gauges := make(MetricMap) - timers := make(MetricListMap) - - flushTimer := time.NewTimer(flushInterval) - flushChan := make(chan error) - - log.Printf("Started aggregator") - - for { - select { - case metric := <-metricChan: // Incoming metrics - switch metric.Type { - case COUNTER: - v, ok := counters[metric.Bucket] - if ok { - counters[metric.Bucket] = v + metric.Value - } else { - counters[metric.Bucket] = metric.Value - } - case GAUGE: - gauges[metric.Bucket] = metric.Value - case TIMER: - v, ok := timers[metric.Bucket] - if ok { - v = append(v, metric.Value) - timers[metric.Bucket] = v - } else { - timers[metric.Bucket] = []float64{metric.Value} - } - case ERROR: - stats.BadLines += 1 - } - stats.LastMessage = time.Now() - case <-flushTimer.C: // Time to flush to graphite - go func() { - flushChan <- graphite.SendMetrics(aggregateMetrics(counters, gauges, timers, flushInterval)) - }() - - // Reset counters - new_counters := make(MetricMap) - for k := range counters { - new_counters[k] = 0 - } - counters = new_counters - - // Reset timers - new_timers := make(MetricListMap) - for k := range timers { - new_timers[k] = []float64{} - } - timers = new_timers - - // Keep values of gauges - new_gauges := make(MetricMap) - for k, v := range gauges { - new_gauges[k] = v - } - gauges = new_gauges - - flushTimer = time.NewTimer(flushInterval) - case flushResult := <-flushChan: - if flushResult != nil { - log.Printf("Sending metrics to Graphite failed: %s", flushResult) - stats.GraphiteLastError = time.Now() - } else { - stats.GraphiteLastFlush = time.Now() - } - case consoleRequest := <-consoleChan: - var result string - switch parts := strings.Split(strings.TrimSpace(consoleRequest.Command), " "); parts[0] { - case "help": - result = "Commands: stats, counters, timers, gauges, delcounters, deltimers, delgauges, quit\n" - case "stats": - result = fmt.Sprintf( - "Invalid messages received: %d\n"+ - "Last message received: %s\n"+ - "Last flush to Graphite: %s\n"+ - "Last error from Graphite: %s\n", - stats.BadLines, stats.LastMessage, stats.GraphiteLastFlush, stats.GraphiteLastError) - case "counters": - result = fmt.Sprint(counters) - case "timers": - result = fmt.Sprint(timers) - case "gauges": - result = fmt.Sprint(gauges) - case "delcounters": - for _, k := range parts[1:] { - delete(counters, k) - } - case "deltimers": - for _, k := range parts[1:] { - delete(timers, k) - } - case "delgauges": - for _, k := range parts[1:] { - delete(gauges, k) - } - case "quit": - result = "quit" - default: - result = fmt.Sprintf("unknown command: %s\n", parts[0]) - } - consoleRequest.ResultChan <- result - } - } - - return -} - -// Normalize a bucket name by replacing or translating invalid characters -func normalizeBucketName(name string) string { - nospaces := regSpaces.ReplaceAllString(name, "_") - noslashes := regSlashes.ReplaceAllString(nospaces, "-") - return regInvalid.ReplaceAllString(noslashes, "") -} - -func parseMessage(msg string) ([]Metric, error) { - metricList := []Metric{} - - segments := strings.Split(strings.TrimSpace(msg), ":") - if len(segments) < 1 { - return metricList, fmt.Errorf("ill-formatted message: %s", msg) - } - - bucket := normalizeBucketName(segments[0]) - var values []string - if len(segments) == 1 { - values = []string{"1"} - } else { - values = segments[1:] - } - - for _, value := range values { - fields := strings.Split(value, "|") - - metricValue, err := strconv.ParseFloat(fields[0], 64) - if err != nil { - return metricList, fmt.Errorf("%s: bad metric value \"%s\"", bucket, fields[0]) - } - - var metricTypeString string - if len(fields) == 1 { - metricTypeString = "c" - } else { - metricTypeString = fields[1] - } - - var metricType MetricType - switch metricTypeString { - case "ms": - // Timer - metricType = TIMER - case "g": - // Gauge - metricType = GAUGE - default: - // Counter, allows skipping of |c suffix - metricType = COUNTER - - var rate float64 - if len(fields) == 3 { - var err error - rate, err = strconv.ParseFloat(fields[2][1:], 64) - if err != nil { - return metricList, fmt.Errorf("%s: bad rate %s", fields[2]) - } - } else { - rate = 1 - } - metricValue = metricValue / rate - } - - metric := Metric{metricType, bucket, metricValue} - metricList = append(metricList, metric) - } - - return metricList, nil -} - -func consoleClient(conn net.Conn, consoleChan chan ConsoleRequest) { - defer conn.Close() - - command := make([]byte, 1024) - resultChan := make(chan string) - - for { - nbytes, err := conn.Read(command) - if err != nil { - // Connection has likely closed - return - } - consoleChan <- ConsoleRequest{string(command[:nbytes]), resultChan} - result := <-resultChan - if result == "quit" { - return - } - conn.Write([]byte(result)) - } -} - -func consoleServer(addr string, consoleChan chan ConsoleRequest) { - ln, err := net.Listen("tcp", addr) - if err != nil { - log.Fatal(err) - return - } - - for { - conn, err := ln.Accept() - if err != nil { - log.Printf("%s", err) - continue - } - go consoleClient(conn, consoleChan) + return err } -} -func ListenAndServe(metricAddr string, consoleAddr string, graphiteAddr string, flushInterval time.Duration) error { - var metricChan = make(chan Metric) - var consoleChan = make(chan ConsoleRequest) + aggregator := MetricAggregator{} + aggregator.FlushInterval = flushInterval + aggregator.Sender = &graphite f := func(metric Metric) { - metricChan <- metric + aggregator.MetricChan <- metric } - s := MetricReceiver{metricAddr, HandlerFunc(f)} + receiver := MetricReceiver{metricAddr, HandlerFunc(f)} + + console := ConsoleServer{consoleAddr, &aggregator} - go s.ListenAndReceive() - go metricAggregator(graphiteAddr, metricChan, consoleChan, flushInterval) - go consoleServer(consoleAddr, consoleChan) - // Run forever + go aggregator.Aggregate() + go receiver.ListenAndReceive() + go console.ListenAndServe() select {} return nil } diff --git a/statsd/util.go b/statsd/util.go new file mode 100644 index 0000000000000000000000000000000000000000..7c2d31277f29adfcfcb9c81b1a46c7490ef6622b --- /dev/null +++ b/statsd/util.go @@ -0,0 +1,17 @@ +package statsd + +import ( + "math" +) + +func round(v float64) float64 { + return math.Floor(v + 0.5) +} + +func average(vals []float64) float64 { + sum := 0.0 + for _, v := range vals { + sum += v + } + return sum / float64(len(vals)) +}