diff --git a/fe/dns.go b/fe/dns.go index 8682a07686042a3dadf5ea9dc202900d3f8f9a4f..15ae66cdc0cffc49818cabd280f1e62e8612d0a9 100644 --- a/fe/dns.go +++ b/fe/dns.go @@ -1,7 +1,6 @@ package fe import ( - "expvar" "fmt" "log" "math/rand" @@ -10,6 +9,7 @@ import ( "time" "git.autistici.org/ale/autoradio" + "git.autistici.org/ale/autoradio/instrumentation" "github.com/miekg/dns" ) @@ -25,13 +25,14 @@ var ( "stream", "etcd", } + + dnsQueryStats = instrumentation.NewCounter("dns.status") + dnsTargetStats = instrumentation.NewCounter("dns.target") ) // DNS server. type DnsRedirector struct { client *autoradio.RadioAPI - queryStats *expvar.Map - targetStats *expvar.Map origin string originNumParts int publicIp string @@ -63,20 +64,14 @@ func NewDnsRedirector(client *autoradio.RadioAPI, origin, publicIp string, ttl i Minttl: uint32(ttl), } - d := &DnsRedirector{ + return &DnsRedirector{ client: client, origin: origin, originNumParts: len(dns.SplitDomainName(origin)), publicIp: publicIp, ttl: ttl, soa: soa, - queryStats: new(expvar.Map).Init(), - targetStats: new(expvar.Map).Init(), } - statsMap := expvar.NewMap("dns") - statsMap.Set("queries", d.queryStats) - statsMap.Set("targets", d.targetStats) - return d } // Randomly shuffle a list of strings. @@ -188,7 +183,7 @@ func (d *DnsRedirector) serveDNS(w dns.ResponseWriter, req *dns.Msg) { for _, ip := range ips { rec := d.recordForIp(query, ip) m.Answer = append(m.Answer, rec) - d.targetStats.Add(ip, 1) + dnsTargetStats.IncrVar(ipToMetric(ip)) } responseMsg = fmt.Sprintf("%v", ips) @@ -199,9 +194,9 @@ func (d *DnsRedirector) serveDNS(w dns.ResponseWriter, req *dns.Msg) { } if responseMsg == "NXDOMAIN" { - d.queryStats.Add("errors", 1) + dnsQueryStats.IncrVar("error") } else { - d.queryStats.Add("ok", 1) + dnsQueryStats.IncrVar("ok") } log.Printf("[%d] %s.%s %s (from %s) -> %s", req.MsgHdr.Id, query, d.origin, dns.TypeToString[req.Question[0].Qtype], w.RemoteAddr(), responseMsg) @@ -225,3 +220,7 @@ func (d *DnsRedirector) Run(addr string) { }(proto) } } + +func ipToMetric(ip string) string { + return strings.Replace(ip, ".", "_", -1) +} diff --git a/fe/http.go b/fe/http.go index 44520e41bf72831d2d182d092c1d28d5283f4e70..7e96f47b6ee69784aa9ab39befd72b2aa8f2dd29 100644 --- a/fe/http.go +++ b/fe/http.go @@ -2,7 +2,6 @@ package fe import ( "bytes" - "expvar" "fmt" "html/template" "io" @@ -18,9 +17,18 @@ import ( _ "net/http/pprof" "git.autistici.org/ale/autoradio" + "git.autistici.org/ale/autoradio/instrumentation" "github.com/PuerkitoBio/ghost/handlers" ) +var ( + httpStatusCodes = instrumentation.NewCounter("http.status") + httpTargetStats = instrumentation.NewCounter("http.target") + sourceConnections = instrumentation.NewCounter("http.source_connections") + source404Errors = instrumentation.NewCounter("http.source_404") + sourceErrors = instrumentation.NewCounter("http.source_errors") +) + type statsResponseWriter struct { http.ResponseWriter code int @@ -31,37 +39,27 @@ func (w *statsResponseWriter) WriteHeader(code int) { w.ResponseWriter.WriteHeader(code) } -func statsHandler(h http.Handler, stats *expvar.Map) http.HandlerFunc { - // Add a sub-map to hold aggregate HTTP status stats. - statusMap := new(expvar.Map).Init() - stats.Set("status", statusMap) +func statsHandler(h http.Handler) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { wrapw := &statsResponseWriter{w, 200} h.ServeHTTP(wrapw, r) - statusMap.Add(strconv.Itoa(wrapw.code), 1) + httpStatusCodes.IncrVar(strconv.Itoa(wrapw.code)) } } // HTTP redirector. type HttpRedirector struct { - domain string - lb LoadBalancingPolicy - client *autoradio.RadioAPI - template *template.Template - stats *expvar.Map - targetStats *expvar.Map + domain string + lb LoadBalancingPolicy + client *autoradio.RadioAPI + template *template.Template } func NewHttpRedirector(client *autoradio.RadioAPI, domain string, lbpolicy string) *HttpRedirector { - targetStats := new(expvar.Map).Init() - stats := expvar.NewMap("http") - stats.Set("targets", targetStats) return &HttpRedirector{ - client: client, - domain: domain, - lb: getNamedLoadBalancingPolicy(lbpolicy), - stats: stats, - targetStats: targetStats, + client: client, + domain: domain, + lb: getNamedLoadBalancingPolicy(lbpolicy), } } @@ -118,7 +116,7 @@ func (h *HttpRedirector) serveRelay(w http.ResponseWriter, r *http.Request) { http.Error(w, "No active nodes", http.StatusServiceUnavailable) return } - h.targetStats.Add(relayAddr, 1) + httpTargetStats.IncrVar(relayAddr) // Create the m3u response. m3u := fmt.Sprintf("http://%s%s\n", makeIcecastUrl(relayAddr), mount.Name) @@ -136,20 +134,20 @@ func (h *HttpRedirector) serveSource(w http.ResponseWriter, r *http.Request) { log.Printf("source: error retrieving mount for %+v: %s", r, err) } else { log.Printf("source: connection to relay stream %s", m.Name) - } + } http.Error(w, "Not Found", http.StatusNotFound) - h.stats.Add("source_404", 1) + source404Errors.Incr() return } - h.stats.Add("source_connections", 1) + sourceConnections.Incr() // Handy function to report a source error (the ones we care // the most about because they measure failures in the // coordination infrastructure). sendErr := func(err error) { http.Error(w, err.Error(), http.StatusServiceUnavailable) - h.stats.Add("source_errors", 1) + sourceErrors.Incr() } // Find the current master node. @@ -271,7 +269,7 @@ func (h *HttpRedirector) Run(addr, staticDir, templateDir string) { wraph := handlers.GZIPHandler(mux, nil) logopts := handlers.NewLogOptions(nil, handlers.Lshort) wraph = handlers.LogHandler(wraph, logopts) - wraph = statsHandler(wraph, h.stats) + wraph = statsHandler(wraph) // Serve SOURCE requests bypassing the logging and gzip // handlers: since they wrap the ResponseWriter, we would be diff --git a/instrumentation/stats.go b/instrumentation/stats.go new file mode 100644 index 0000000000000000000000000000000000000000..adcfb65bf6218e9c13977c35c417fe4a9b717a90 --- /dev/null +++ b/instrumentation/stats.go @@ -0,0 +1,61 @@ +package instrumentation + +import ( + "flag" + "log" + "sync" + + "github.com/cactus/go-statsd-client/statsd" +) + +var ( + DefaultRate float32 = 1 + + statsdServer = flag.String("statsd-server", "localhost:8125", "statsd server (host:port)") + + clientInit sync.Once + client *statsd.Client +) + +func getClient() *statsd.Client { + clientInit.Do(func() { + c, err := statsd.New(*statsdServer, "autoradio.") + if err != nil { + log.Fatal("Error initializing instrumentation: %v", err) + } + client = c + }) + return client +} + +type Counter struct { + Name string +} + +func NewCounter(name string) *Counter { + return &Counter{Name: name} +} + +func (c *Counter) Incr() { + getClient().Inc(c.Name, 1, DefaultRate) +} + +func (c *Counter) IncrVar(v string) { + getClient().Inc(c.Name+"."+v, 1, DefaultRate) +} + +type Gauge struct { + Name string +} + +func NewGauge(name string) *Gauge { + return &Gauge{Name: name} +} + +func (g *Gauge) Set(value int64) { + getClient().Gauge(g.Name, value, DefaultRate) +} + +func (g *Gauge) SetVar(v string, value int64) { + getClient().Gauge(g.Name+"."+v, value, DefaultRate) +} diff --git a/node/icecast.go b/node/icecast.go index c8cafdfdd3e75ac44f05f3acbf32b1891fb95ef1..50ee1644291a416dcd2c7c2abd43b96443a3fa8a 100644 --- a/node/icecast.go +++ b/node/icecast.go @@ -12,10 +12,13 @@ import ( "time" "git.autistici.org/ale/autoradio" + "git.autistici.org/ale/autoradio/instrumentation" ) var ( statusPageUrl = "http://localhost:8000/status-autoradio.xsl" + + icecastOk = instrumentation.NewGauge("icecast.ok") ) // Icecast returns empty fields in our status handler, which we'll @@ -105,9 +108,11 @@ func (ic *IcecastController) statusUpdater() { case <-t.C: if status, err := ic.fetchStatus(); err == nil { ic.status = status + icecastOk.Set(1) } else { log.Printf("bad status from iceast: %v", err) ic.status = downStatus + icecastOk.Set(0) } case <-ic.stop: return diff --git a/node/node.go b/node/node.go index 6ba8ba398f666ae6be38654aeb17b644163b67cf..e177138c13a2f11a3f4b065781889199844ff04e 100644 --- a/node/node.go +++ b/node/node.go @@ -9,11 +9,18 @@ import ( "time" "git.autistici.org/ale/autoradio" + "git.autistici.org/ale/autoradio/instrumentation" "git.autistici.org/ale/autoradio/masterelection" "git.autistici.org/ale/autoradio/node/bwmonitor" "git.autistici.org/ale/autoradio/third_party/github.com/coreos/go-etcd/etcd" ) +var ( + icecastReloadErrors = instrumentation.NewCounter("icecast.reload_errors") + icecastReloads = instrumentation.NewCounter("icecast.reload") + configIndex = instrumentation.NewGauge("config.etcd_index") +) + func trigger(c chan bool) { select { case c <- true: @@ -115,6 +122,7 @@ func (w *ConfigSyncer) syncer() { // from and we do not have to download the // full configuration again. w.index = response.EtcdIndex + configIndex.Set(int64(w.index)) // Trigger an update. trigger(w.upch) @@ -291,8 +299,10 @@ func (rc *RadioNode) Run() { for { select { case <-rc.upch: + icecastReloads.Incr() log.Printf("reloading icecast config") if err := rc.icecast.Update(rc.Config, rc.me.IsMaster(), rc.me.GetMasterAddr()); err != nil { + icecastReloadErrors.Incr() log.Printf("Update(): %s", err) }