Commit be0aab99 authored by Kamil Kisiel's avatar Kamil Kisiel

Made code more idiomatic Go. Split in to modules

parent bc548548
package statsd
import (
"log"
"sort"
"strconv"
"sync"
"time"
)
var percentThresholds []float64
func init() {
percentThresholds = []float64{90.0}
}
type MetricAggregatorStats struct {
BadLines int
LastMessage time.Time
GraphiteLastFlush time.Time
GraphiteLastError time.Time
}
func thresholdStats(vals []float64, threshold float64) (mean, upper float64) {
if count := len(vals); count > 1 {
idx := int(round(((100 - threshold) / 100) * float64(count)))
thresholdCount := count - idx
thresholdValues := vals[:thresholdCount]
mean = average(thresholdValues)
upper = thresholdValues[len(thresholdValues)-1]
} else {
mean = vals[0]
upper = vals[0]
}
return mean, upper
}
type MetricSender interface {
SendMetrics(MetricMap) error
}
type MetricAggregator struct {
sync.Mutex
MetricChan chan Metric // Channel on which metrics are received
FlushInterval time.Duration // How often to flush metrics to the sender
Sender MetricSender
stats MetricAggregatorStats
counters MetricMap
gauges MetricMap
timers MetricListMap
}
func (m *MetricAggregator) flush() (metrics MetricMap) {
metrics = make(MetricMap)
numStats := 0
for k, v := range m.counters {
perSecond := v / m.FlushInterval.Seconds()
metrics["stats."+k] = perSecond
metrics["stats_counts."+k] = v
numStats += 1
}
for k, v := range m.gauges {
metrics["stats.gauges."+k] = v
numStats += 1
}
for k, v := range m.timers {
if count := len(v); count > 0 {
sort.Float64s(v)
min := v[0]
max := v[count-1]
metrics["stats.timers."+k+".lower"] = min
metrics["stats.timers."+k+".upper"] = max
metrics["stats.timers."+k+".count"] = float64(count)
for _, threshold := range percentThresholds {
mean, upper := thresholdStats(v, threshold)
thresholdName := strconv.FormatFloat(threshold, 'f', 1, 64)
metrics["stats.timers."+k+"mean_"+thresholdName] = mean
metrics["stats.timers."+k+"upper_"+thresholdName] = upper
}
numStats += 1
}
}
metrics["statsd.numStats"] = float64(numStats)
return metrics
}
func (a *MetricAggregator) Reset() {
// Reset counters
new_counters := make(MetricMap)
for k := range a.counters {
new_counters[k] = 0
}
a.counters = new_counters
// Reset timers
new_timers := make(MetricListMap)
for k := range a.timers {
new_timers[k] = []float64{}
}
a.timers = new_timers
// Keep values of gauges
new_gauges := make(MetricMap)
for k, v := range a.gauges {
new_gauges[k] = v
}
a.gauges = new_gauges
}
func (a *MetricAggregator) receiveMetric(m Metric) {
defer a.Unlock()
a.Lock()
switch m.Type {
case COUNTER:
v, ok := a.counters[m.Bucket]
if ok {
a.counters[m.Bucket] = v + m.Value
} else {
a.counters[m.Bucket] = m.Value
}
case GAUGE:
a.gauges[m.Bucket] = m.Value
case TIMER:
v, ok := a.timers[m.Bucket]
if ok {
v = append(v, m.Value)
a.timers[m.Bucket] = v
} else {
a.timers[m.Bucket] = []float64{m.Value}
}
case ERROR:
a.stats.BadLines += 1
}
a.stats.LastMessage = time.Now()
}
func (m *MetricAggregator) Aggregate() {
m.counters = make(MetricMap)
m.gauges = make(MetricMap)
m.timers = make(MetricListMap)
flushChan := make(chan error)
flushTimer := time.NewTimer(m.FlushInterval)
for {
select {
case metric := <-m.MetricChan: // Incoming metrics
m.receiveMetric(metric)
case <-flushTimer.C: // Time to flush to graphite
m.Lock()
flushed := m.flush()
go func() {
e := m.Sender.SendMetrics(flushed)
if e != nil {
flushChan <- e
}
}()
m.Reset()
flushTimer = time.NewTimer(m.FlushInterval)
m.Unlock()
case flushResult := <-flushChan:
m.Lock()
if flushResult != nil {
log.Printf("Sending metrics to Graphite failed: %s", flushResult)
m.stats.GraphiteLastError = time.Now()
} else {
m.stats.GraphiteLastFlush = time.Now()
}
m.Unlock()
}
}
}
package statsd
import (
"fmt"
"net"
"strings"
)
const DefaultConsoleAddr = ":8126"
type ConsoleServer struct {
Addr string
Aggregator *MetricAggregator
}
func (s *ConsoleServer) ListenAndServe() error {
addr := s.Addr
if addr == "" {
addr = DefaultConsoleAddr
}
l, err := net.Listen("tcp", addr)
if err != nil {
return err
}
return s.Serve(l)
}
func (s *ConsoleServer) Serve(l net.Listener) error {
defer l.Close()
for {
c, err := l.Accept()
if err != nil {
return err
}
console := consoleConn{c, s}
go console.serve()
}
panic("not reached")
}
type consoleConn struct {
conn net.Conn
server *ConsoleServer
}
func (c *consoleConn) serve() {
buf := make([]byte, 1024)
for {
nbytes, err := c.conn.Read(buf)
if err != nil {
// Connection has likely closed
return
}
var result string
switch parts := strings.Fields(string(buf[:nbytes])); parts[0] {
case "help":
result = "Commands: stats, counters, timers, gauges, delcounters, deltimers, delgauges, quit\n"
case "stats":
c.server.Aggregator.Lock()
result = fmt.Sprintf(
"Invalid messages received: %d\n"+
"Last message received: %s\n"+
"Last flush to Graphite: %s\n"+
"Last error from Graphite: %s\n",
c.server.Aggregator.stats.BadLines,
c.server.Aggregator.stats.LastMessage,
c.server.Aggregator.stats.GraphiteLastFlush,
c.server.Aggregator.stats.GraphiteLastError)
c.server.Aggregator.Unlock()
case "counters":
c.server.Aggregator.Lock()
result = fmt.Sprint(c.server.Aggregator.counters)
c.server.Aggregator.Unlock()
case "timers":
c.server.Aggregator.Lock()
result = fmt.Sprint(c.server.Aggregator.timers)
c.server.Aggregator.Unlock()
case "gauges":
c.server.Aggregator.Lock()
result = fmt.Sprint(c.server.Aggregator.gauges)
c.server.Aggregator.Unlock()
case "delcounters":
c.server.Aggregator.Lock()
for _, k := range parts[1:] {
delete(c.server.Aggregator.counters, k)
}
c.server.Aggregator.Unlock()
case "deltimers":
c.server.Aggregator.Lock()
for _, k := range parts[1:] {
delete(c.server.Aggregator.timers, k)
}
c.server.Aggregator.Unlock()
case "delgauges":
c.server.Aggregator.Lock()
for _, k := range parts[1:] {
delete(c.server.Aggregator.gauges, k)
}
c.server.Aggregator.Unlock()
case "quit":
result = "quit"
default:
result = fmt.Sprintf("unknown command: %s\n", parts[0])
}
if result == "quit" {
return
} else {
c.conn.Write([]byte(result))
}
}
}
package statsd
import (
"bytes"
"fmt"
"net"
"time"
)
// Normalize a bucket name by replacing or translating invalid characters
func normalizeBucketName(name string) string {
nospaces := regSpaces.ReplaceAllString(name, "_")
noslashes := regSlashes.ReplaceAllString(nospaces, "-")
return regInvalid.ReplaceAllString(noslashes, "")
}
type GraphiteClient struct {
conn *net.Conn
}
func NewGraphiteClient(addr string) (client GraphiteClient, err error) {
conn, err := net.Dial("tcp", addr)
client = GraphiteClient{&conn}
return
}
func (client *GraphiteClient) SendMetrics(metrics MetricMap) (err error) {
buf := new(bytes.Buffer)
now := time.Now().Unix()
for k, v := range metrics {
nk := normalizeBucketName(k)
fmt.Fprintf(buf, "%s %f %d\n", nk, v, now)
}
_, err = buf.WriteTo(*client.conn)
if err != nil {
return err
}
return nil
}
package statsd
import (
"bytes"
"fmt"
"regexp"
)
// Regular expressions used for bucket name normalization
var (
regSpaces = regexp.MustCompile("\\s+")
regSlashes = regexp.MustCompile("\\/")
regInvalid = regexp.MustCompile("[^a-zA-Z_\\-0-9\\.]")
)
type MetricType float64
// Enumeration, see http://golang.org/doc/effective_go.html#constants
const (
_ = iota
ERROR MetricType = 1 << (10 * iota)
COUNTER
TIMER
GAUGE
)
func (m MetricType) String() string {
switch {
case m >= GAUGE:
return "gauge"
case m >= TIMER:
return "timer"
case m >= COUNTER:
return "counter"
}
return "unknown"
}
type Metric struct {
Type MetricType
Bucket string
Value float64
}
func (m Metric) String() string {
return fmt.Sprintf("{%s, %s, %f}", m.Type, m.Bucket, m.Value)
}
type MetricMap map[string]float64
func (m MetricMap) String() string {
buf := new(bytes.Buffer)
for k, v := range m {
fmt.Fprintf(buf, "%s: %f\n", k, v)
}
return buf.String()
}
type MetricListMap map[string][]float64
func (m MetricListMap) String() string {
buf := new(bytes.Buffer)
for k, v := range m {
buf.Write([]byte(fmt.Sprint(k)))
for _, v2 := range v {
fmt.Fprintf(buf, "\t%f\n", k, v2)
}
}
return buf.String()
}
package statsd
import (
"fmt"
"log"
"net"
"strconv"
"strings"
)
// DefaultMetricsAddr is the default address on which a MetricReceiver will listen
......@@ -54,7 +57,6 @@ func (r *MetricReceiver) Receive(c net.PacketConn) error {
log.Printf("%s", err)
continue
}
log.Printf("%s", msg[:nbytes])
go r.handleMessage(msg[:nbytes])
}
panic("not reached")
......@@ -70,3 +72,66 @@ func (srv *MetricReceiver) handleMessage(msg []byte) {
srv.Handler.HandleMetric(metric)
}
}
func parseMessage(msg string) ([]Metric, error) {
metricList := []Metric{}
segments := strings.Split(strings.TrimSpace(msg), ":")
if len(segments) < 1 {
return metricList, fmt.Errorf("ill-formatted message: %s", msg)
}
bucket := segments[0]
var values []string
if len(segments) == 1 {
values = []string{"1"}
} else {
values = segments[1:]
}
for _, value := range values {
fields := strings.Split(value, "|")
metricValue, err := strconv.ParseFloat(fields[0], 64)
if err != nil {
return metricList, fmt.Errorf("%s: bad metric value \"%s\"", bucket, fields[0])
}
var metricTypeString string
if len(fields) == 1 {
metricTypeString = "c"
} else {
metricTypeString = fields[1]
}
var metricType MetricType
switch metricTypeString {
case "ms":
// Timer
metricType = TIMER
case "g":
// Gauge
metricType = GAUGE
default:
// Counter, allows skipping of |c suffix
metricType = COUNTER
var rate float64
if len(fields) == 3 {
var err error
rate, err = strconv.ParseFloat(fields[2][1:], 64)
if err != nil {
return metricList, fmt.Errorf("%s: bad rate %s", fields[2])
}
} else {
rate = 1
}
metricValue = metricValue / rate
}
metric := Metric{metricType, bucket, metricValue}
metricList = append(metricList, metric)
}
return metricList, nil
}
This diff is collapsed.
package statsd
import (
"math"
)
func round(v float64) float64 {
return math.Floor(v + 0.5)
}
func average(vals []float64) float64 {
sum := 0.0
for _, v := range vals {
sum += v
}
return sum / float64(len(vals))
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment