Commit 899d1553 authored by Kamil Kisiel's avatar Kamil Kisiel

More documentation

parent 33570415
...@@ -14,43 +14,36 @@ func init() { ...@@ -14,43 +14,36 @@ func init() {
percentThresholds = []float64{90.0} percentThresholds = []float64{90.0}
} }
type MetricAggregatorStats struct { // metricAggregatorStats is a bookkeeping structure for statistics about a MetricAggregator
type metricAggregatorStats struct {
BadLines int BadLines int
LastMessage time.Time LastMessage time.Time
GraphiteLastFlush time.Time GraphiteLastFlush time.Time
GraphiteLastError time.Time GraphiteLastError time.Time
} }
func thresholdStats(vals []float64, threshold float64) (mean, upper float64) { // MetricSender is an interface that can be implemented by objects which
if count := len(vals); count > 1 { // could be connected to a MetricAggregator
idx := int(round(((100 - threshold) / 100) * float64(count)))
thresholdCount := count - idx
thresholdValues := vals[:thresholdCount]
mean = average(thresholdValues)
upper = thresholdValues[len(thresholdValues)-1]
} else {
mean = vals[0]
upper = vals[0]
}
return mean, upper
}
type MetricSender interface { type MetricSender interface {
SendMetrics(MetricMap) error SendMetrics(MetricMap) error
} }
// MetricAggregator is an object that aggregates statsd metrics.
// The function NewMetricAggregator should be used to create the objects.
//
// Incoming metrics should be sent to the MetricChan channel.
type MetricAggregator struct { type MetricAggregator struct {
sync.Mutex sync.Mutex
MetricChan chan Metric // Channel on which metrics are received MetricChan chan Metric // Channel on which metrics are received
FlushInterval time.Duration // How often to flush metrics to the sender FlushInterval time.Duration // How often to flush metrics to the sender
Sender MetricSender Sender MetricSender // The sender to which metrics are flushed
stats MetricAggregatorStats stats metricAggregatorStats
counters MetricMap counters MetricMap
gauges MetricMap gauges MetricMap
timers MetricListMap timers MetricListMap
} }
// NewMetricAggregator creates a new MetricAggregator object
func NewMetricAggregator(sender MetricSender, flushInterval time.Duration) (a MetricAggregator) { func NewMetricAggregator(sender MetricSender, flushInterval time.Duration) (a MetricAggregator) {
a = MetricAggregator{} a = MetricAggregator{}
a.FlushInterval = flushInterval a.FlushInterval = flushInterval
...@@ -59,6 +52,7 @@ func NewMetricAggregator(sender MetricSender, flushInterval time.Duration) (a Me ...@@ -59,6 +52,7 @@ func NewMetricAggregator(sender MetricSender, flushInterval time.Duration) (a Me
return return
} }
// flush prepares the contents of a MetricAggregator for sending via the Sender
func (m *MetricAggregator) flush() (metrics MetricMap) { func (m *MetricAggregator) flush() (metrics MetricMap) {
metrics = make(MetricMap) metrics = make(MetricMap)
numStats := 0 numStats := 0
...@@ -98,6 +92,7 @@ func (m *MetricAggregator) flush() (metrics MetricMap) { ...@@ -98,6 +92,7 @@ func (m *MetricAggregator) flush() (metrics MetricMap) {
return metrics return metrics
} }
// Reset clears the contents of a MetricAggregator
func (a *MetricAggregator) Reset() { func (a *MetricAggregator) Reset() {
// Reset counters // Reset counters
new_counters := make(MetricMap) new_counters := make(MetricMap)
...@@ -121,6 +116,7 @@ func (a *MetricAggregator) Reset() { ...@@ -121,6 +116,7 @@ func (a *MetricAggregator) Reset() {
a.gauges = new_gauges a.gauges = new_gauges
} }
// receiveMetric is called for each incoming metric on MetricChan
func (a *MetricAggregator) receiveMetric(m Metric) { func (a *MetricAggregator) receiveMetric(m Metric) {
defer a.Unlock() defer a.Unlock()
a.Lock() a.Lock()
...@@ -149,6 +145,8 @@ func (a *MetricAggregator) receiveMetric(m Metric) { ...@@ -149,6 +145,8 @@ func (a *MetricAggregator) receiveMetric(m Metric) {
a.stats.LastMessage = time.Now() a.stats.LastMessage = time.Now()
} }
// Aggregate starts the MetricAggregator so it begins consuming metrics from MetricChan
// and flushing them periodically via its Sender
func (m *MetricAggregator) Aggregate() { func (m *MetricAggregator) Aggregate() {
m.counters = make(MetricMap) m.counters = make(MetricMap)
m.gauges = make(MetricMap) m.gauges = make(MetricMap)
......
...@@ -7,23 +7,19 @@ import ( ...@@ -7,23 +7,19 @@ import (
"time" "time"
) )
// Normalize a bucket name by replacing or translating invalid characters // normalizeBucketName cleans up a bucket name by replacing or translating invalid characters
func normalizeBucketName(name string) string { func normalizeBucketName(name string) string {
nospaces := regSpaces.ReplaceAllString(name, "_") nospaces := regSpaces.ReplaceAllString(name, "_")
noslashes := regSlashes.ReplaceAllString(nospaces, "-") noslashes := regSlashes.ReplaceAllString(nospaces, "-")
return regInvalid.ReplaceAllString(noslashes, "") return regInvalid.ReplaceAllString(noslashes, "")
} }
// GraphiteClient is an object that is used to send messages to a Graphite server's UDP interface
type GraphiteClient struct { type GraphiteClient struct {
conn *net.Conn conn *net.Conn
} }
func NewGraphiteClient(addr string) (client GraphiteClient, err error) { // SendMetrics sends the metrics in a MetricsMap to the Graphite server
conn, err := net.Dial("tcp", addr)
client = GraphiteClient{&conn}
return
}
func (client *GraphiteClient) SendMetrics(metrics MetricMap) (err error) { func (client *GraphiteClient) SendMetrics(metrics MetricMap) (err error) {
buf := new(bytes.Buffer) buf := new(bytes.Buffer)
now := time.Now().Unix() now := time.Now().Unix()
...@@ -37,3 +33,10 @@ func (client *GraphiteClient) SendMetrics(metrics MetricMap) (err error) { ...@@ -37,3 +33,10 @@ func (client *GraphiteClient) SendMetrics(metrics MetricMap) (err error) {
} }
return nil return nil
} }
// NewGraphiteClient constructs a GraphiteClient object by connecting to an address
func NewGraphiteClient(addr string) (client GraphiteClient, err error) {
conn, err := net.Dial("tcp", addr)
client = GraphiteClient{&conn}
return
}
...@@ -73,6 +73,7 @@ func (srv *MetricReceiver) handleMessage(msg []byte) { ...@@ -73,6 +73,7 @@ func (srv *MetricReceiver) handleMessage(msg []byte) {
} }
} }
// parseMessage parses a message string string in to a list of metrics
func parseMessage(msg string) ([]Metric, error) { func parseMessage(msg string) ([]Metric, error) {
metricList := []Metric{} metricList := []Metric{}
......
...@@ -4,10 +4,12 @@ import ( ...@@ -4,10 +4,12 @@ import (
"math" "math"
) )
// round rounds a number to its nearest integer value
func round(v float64) float64 { func round(v float64) float64 {
return math.Floor(v + 0.5) return math.Floor(v + 0.5)
} }
// average computes the average (mean) of a list of numbers
func average(vals []float64) float64 { func average(vals []float64) float64 {
sum := 0.0 sum := 0.0
for _, v := range vals { for _, v := range vals {
...@@ -15,3 +17,19 @@ func average(vals []float64) float64 { ...@@ -15,3 +17,19 @@ func average(vals []float64) float64 {
} }
return sum / float64(len(vals)) return sum / float64(len(vals))
} }
// thresholdStats calculates the mean and upper values of a list of values after a applying a minimum threshold
func thresholdStats(vals []float64, threshold float64) (mean, upper float64) {
if count := len(vals); count > 1 {
idx := int(round(((100 - threshold) / 100) * float64(count)))
thresholdCount := count - idx
thresholdValues := vals[:thresholdCount]
mean = average(thresholdValues)
upper = thresholdValues[len(thresholdValues)-1]
} else {
mean = vals[0]
upper = vals[0]
}
return mean, upper
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment