Skip to content
Snippets Groups Projects
Commit 0b0d24d2 authored by ale's avatar ale
Browse files

add some statsd instrumentation

parent 4f7e0d28
Branches
No related tags found
No related merge requests found
package fe
import (
"expvar"
"fmt"
"log"
"math/rand"
......@@ -10,6 +9,7 @@ import (
"time"
"git.autistici.org/ale/autoradio"
"git.autistici.org/ale/autoradio/instrumentation"
"github.com/miekg/dns"
)
......@@ -25,13 +25,14 @@ var (
"stream",
"etcd",
}
dnsQueryStats = instrumentation.NewCounter("dns.status")
dnsTargetStats = instrumentation.NewCounter("dns.target")
)
// DNS server.
type DnsRedirector struct {
client *autoradio.RadioAPI
queryStats *expvar.Map
targetStats *expvar.Map
origin string
originNumParts int
publicIp string
......@@ -63,20 +64,14 @@ func NewDnsRedirector(client *autoradio.RadioAPI, origin, publicIp string, ttl i
Minttl: uint32(ttl),
}
d := &DnsRedirector{
return &DnsRedirector{
client: client,
origin: origin,
originNumParts: len(dns.SplitDomainName(origin)),
publicIp: publicIp,
ttl: ttl,
soa: soa,
queryStats: new(expvar.Map).Init(),
targetStats: new(expvar.Map).Init(),
}
statsMap := expvar.NewMap("dns")
statsMap.Set("queries", d.queryStats)
statsMap.Set("targets", d.targetStats)
return d
}
// Randomly shuffle a list of strings.
......@@ -188,7 +183,7 @@ func (d *DnsRedirector) serveDNS(w dns.ResponseWriter, req *dns.Msg) {
for _, ip := range ips {
rec := d.recordForIp(query, ip)
m.Answer = append(m.Answer, rec)
d.targetStats.Add(ip, 1)
dnsTargetStats.IncrVar(ipToMetric(ip))
}
responseMsg = fmt.Sprintf("%v", ips)
......@@ -199,9 +194,9 @@ func (d *DnsRedirector) serveDNS(w dns.ResponseWriter, req *dns.Msg) {
}
if responseMsg == "NXDOMAIN" {
d.queryStats.Add("errors", 1)
dnsQueryStats.IncrVar("error")
} else {
d.queryStats.Add("ok", 1)
dnsQueryStats.IncrVar("ok")
}
log.Printf("[%d] %s.%s %s (from %s) -> %s", req.MsgHdr.Id, query, d.origin, dns.TypeToString[req.Question[0].Qtype], w.RemoteAddr(), responseMsg)
......@@ -225,3 +220,7 @@ func (d *DnsRedirector) Run(addr string) {
}(proto)
}
}
func ipToMetric(ip string) string {
return strings.Replace(ip, ".", "_", -1)
}
......@@ -2,7 +2,6 @@ package fe
import (
"bytes"
"expvar"
"fmt"
"html/template"
"io"
......@@ -18,9 +17,18 @@ import (
_ "net/http/pprof"
"git.autistici.org/ale/autoradio"
"git.autistici.org/ale/autoradio/instrumentation"
"github.com/PuerkitoBio/ghost/handlers"
)
var (
httpStatusCodes = instrumentation.NewCounter("http.status")
httpTargetStats = instrumentation.NewCounter("http.target")
sourceConnections = instrumentation.NewCounter("http.source_connections")
source404Errors = instrumentation.NewCounter("http.source_404")
sourceErrors = instrumentation.NewCounter("http.source_errors")
)
type statsResponseWriter struct {
http.ResponseWriter
code int
......@@ -31,37 +39,27 @@ func (w *statsResponseWriter) WriteHeader(code int) {
w.ResponseWriter.WriteHeader(code)
}
func statsHandler(h http.Handler, stats *expvar.Map) http.HandlerFunc {
// Add a sub-map to hold aggregate HTTP status stats.
statusMap := new(expvar.Map).Init()
stats.Set("status", statusMap)
func statsHandler(h http.Handler) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
wrapw := &statsResponseWriter{w, 200}
h.ServeHTTP(wrapw, r)
statusMap.Add(strconv.Itoa(wrapw.code), 1)
httpStatusCodes.IncrVar(strconv.Itoa(wrapw.code))
}
}
// HTTP redirector.
type HttpRedirector struct {
domain string
lb LoadBalancingPolicy
client *autoradio.RadioAPI
template *template.Template
stats *expvar.Map
targetStats *expvar.Map
domain string
lb LoadBalancingPolicy
client *autoradio.RadioAPI
template *template.Template
}
func NewHttpRedirector(client *autoradio.RadioAPI, domain string, lbpolicy string) *HttpRedirector {
targetStats := new(expvar.Map).Init()
stats := expvar.NewMap("http")
stats.Set("targets", targetStats)
return &HttpRedirector{
client: client,
domain: domain,
lb: getNamedLoadBalancingPolicy(lbpolicy),
stats: stats,
targetStats: targetStats,
client: client,
domain: domain,
lb: getNamedLoadBalancingPolicy(lbpolicy),
}
}
......@@ -118,7 +116,7 @@ func (h *HttpRedirector) serveRelay(w http.ResponseWriter, r *http.Request) {
http.Error(w, "No active nodes", http.StatusServiceUnavailable)
return
}
h.targetStats.Add(relayAddr, 1)
httpTargetStats.IncrVar(relayAddr)
// Create the m3u response.
m3u := fmt.Sprintf("http://%s%s\n", makeIcecastUrl(relayAddr), mount.Name)
......@@ -136,20 +134,20 @@ func (h *HttpRedirector) serveSource(w http.ResponseWriter, r *http.Request) {
log.Printf("source: error retrieving mount for %+v: %s", r, err)
} else {
log.Printf("source: connection to relay stream %s", m.Name)
}
}
http.Error(w, "Not Found", http.StatusNotFound)
h.stats.Add("source_404", 1)
source404Errors.Incr()
return
}
h.stats.Add("source_connections", 1)
sourceConnections.Incr()
// Handy function to report a source error (the ones we care
// the most about because they measure failures in the
// coordination infrastructure).
sendErr := func(err error) {
http.Error(w, err.Error(), http.StatusServiceUnavailable)
h.stats.Add("source_errors", 1)
sourceErrors.Incr()
}
// Find the current master node.
......@@ -271,7 +269,7 @@ func (h *HttpRedirector) Run(addr, staticDir, templateDir string) {
wraph := handlers.GZIPHandler(mux, nil)
logopts := handlers.NewLogOptions(nil, handlers.Lshort)
wraph = handlers.LogHandler(wraph, logopts)
wraph = statsHandler(wraph, h.stats)
wraph = statsHandler(wraph)
// Serve SOURCE requests bypassing the logging and gzip
// handlers: since they wrap the ResponseWriter, we would be
......
package instrumentation
import (
"flag"
"log"
"sync"
"github.com/cactus/go-statsd-client/statsd"
)
var (
DefaultRate float32 = 1
statsdServer = flag.String("statsd-server", "localhost:8125", "statsd server (host:port)")
clientInit sync.Once
client *statsd.Client
)
func getClient() *statsd.Client {
clientInit.Do(func() {
c, err := statsd.New(*statsdServer, "autoradio.")
if err != nil {
log.Fatal("Error initializing instrumentation: %v", err)
}
client = c
})
return client
}
type Counter struct {
Name string
}
func NewCounter(name string) *Counter {
return &Counter{Name: name}
}
func (c *Counter) Incr() {
getClient().Inc(c.Name, 1, DefaultRate)
}
func (c *Counter) IncrVar(v string) {
getClient().Inc(c.Name+"."+v, 1, DefaultRate)
}
type Gauge struct {
Name string
}
func NewGauge(name string) *Gauge {
return &Gauge{Name: name}
}
func (g *Gauge) Set(value int64) {
getClient().Gauge(g.Name, value, DefaultRate)
}
func (g *Gauge) SetVar(v string, value int64) {
getClient().Gauge(g.Name+"."+v, value, DefaultRate)
}
......@@ -12,10 +12,13 @@ import (
"time"
"git.autistici.org/ale/autoradio"
"git.autistici.org/ale/autoradio/instrumentation"
)
var (
statusPageUrl = "http://localhost:8000/status-autoradio.xsl"
icecastOk = instrumentation.NewGauge("icecast.ok")
)
// Icecast returns empty fields in our status handler, which we'll
......@@ -105,9 +108,11 @@ func (ic *IcecastController) statusUpdater() {
case <-t.C:
if status, err := ic.fetchStatus(); err == nil {
ic.status = status
icecastOk.Set(1)
} else {
log.Printf("bad status from iceast: %v", err)
ic.status = downStatus
icecastOk.Set(0)
}
case <-ic.stop:
return
......
......@@ -9,11 +9,18 @@ import (
"time"
"git.autistici.org/ale/autoradio"
"git.autistici.org/ale/autoradio/instrumentation"
"git.autistici.org/ale/autoradio/masterelection"
"git.autistici.org/ale/autoradio/node/bwmonitor"
"git.autistici.org/ale/autoradio/third_party/github.com/coreos/go-etcd/etcd"
)
var (
icecastReloadErrors = instrumentation.NewCounter("icecast.reload_errors")
icecastReloads = instrumentation.NewCounter("icecast.reload")
configIndex = instrumentation.NewGauge("config.etcd_index")
)
func trigger(c chan bool) {
select {
case c <- true:
......@@ -115,6 +122,7 @@ func (w *ConfigSyncer) syncer() {
// from and we do not have to download the
// full configuration again.
w.index = response.EtcdIndex
configIndex.Set(int64(w.index))
// Trigger an update.
trigger(w.upch)
......@@ -291,8 +299,10 @@ func (rc *RadioNode) Run() {
for {
select {
case <-rc.upch:
icecastReloads.Incr()
log.Printf("reloading icecast config")
if err := rc.icecast.Update(rc.Config, rc.me.IsMaster(), rc.me.GetMasterAddr()); err != nil {
icecastReloadErrors.Incr()
log.Printf("Update(): %s", err)
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment