Skip to content
Snippets Groups Projects
Commit 6d2d377b authored by ale's avatar ale
Browse files

add instrumentation to the redirector

parent cbccb3f9
No related branches found
No related tags found
No related merge requests found
package fe
import (
"expvar"
"fmt"
"log"
"math/rand"
......@@ -8,8 +9,8 @@ import (
"strings"
"time"
"github.com/miekg/dns"
"git.autistici.org/ale/radioai"
"github.com/miekg/dns"
)
var (
......@@ -29,6 +30,8 @@ var (
// DNS server.
type DnsRedirector struct {
client *radioai.RadioAPI
queryStats *expvar.Map
targetStats *expvar.Map
origin string
originNumParts int
publicIp string
......@@ -60,14 +63,20 @@ func NewDnsRedirector(client *radioai.RadioAPI, origin, publicIp string, ttl int
Minttl: uint32(ttl),
}
return &DnsRedirector{
d := &DnsRedirector{
client: client,
origin: origin,
originNumParts: len(dns.SplitDomainName(origin)),
publicIp: publicIp,
ttl: ttl,
soa: soa,
queryStats: new(expvar.Map).Init(),
targetStats: new(expvar.Map).Init(),
}
statsMap := expvar.NewMap("dns")
statsMap.Set("queries", d.queryStats)
statsMap.Set("targets", d.targetStats)
return d
}
// Randomly shuffle a list of strings.
......@@ -179,6 +188,7 @@ func (d *DnsRedirector) serveDNS(w dns.ResponseWriter, req *dns.Msg) {
for _, ip := range ips {
rec := d.recordForIp(query, ip)
m.Answer = append(m.Answer, rec)
d.targetStats.Add(ip, 1)
}
responseMsg = fmt.Sprintf("%v", ips)
......@@ -188,6 +198,12 @@ func (d *DnsRedirector) serveDNS(w dns.ResponseWriter, req *dns.Msg) {
responseMsg = "NXDOMAIN"
}
if responseMsg == "NXDOMAIN" {
d.queryStats.Add("errors", 1)
} else {
d.queryStats.Add("ok", 1)
}
log.Printf("[%d] %s.%s %s (from %s) -> %s", req.MsgHdr.Id, query, d.origin, dns.TypeToString[req.Question[0].Qtype], w.RemoteAddr(), responseMsg)
ednsFromRequest(req, m)
......
......@@ -2,6 +2,7 @@ package fe
import (
"bytes"
"expvar"
"fmt"
"html/template"
"io"
......@@ -20,19 +21,47 @@ import (
"github.com/PuerkitoBio/ghost/handlers"
)
type statsResponseWriter struct {
http.ResponseWriter
code int
}
func (w *statsResponseWriter) WriteHeader(code int) {
w.code = code
w.ResponseWriter.WriteHeader(code)
}
func statsHandler(h http.Handler, stats *expvar.Map) http.HandlerFunc {
// Add a sub-map to hold aggregate HTTP status stats.
statusMap := new(expvar.Map).Init()
stats.Set("status", statusMap)
return func(w http.ResponseWriter, r *http.Request) {
wrapw := &statsResponseWriter{w, 200}
h.ServeHTTP(wrapw, r)
statusMap.Add(strconv.Itoa(wrapw.code), 1)
}
}
// HTTP redirector.
type HttpRedirector struct {
domain string
client *radioai.RadioAPI
template *template.Template
lb LoadBalancingPolicy
domain string
lb LoadBalancingPolicy
client *radioai.RadioAPI
template *template.Template
stats *expvar.Map
targetStats *expvar.Map
}
func NewHttpRedirector(client *radioai.RadioAPI, domain string, lbpolicy string) *HttpRedirector {
targetStats := new(expvar.Map).Init()
stats := expvar.NewMap("http")
stats.Set("targets", targetStats)
return &HttpRedirector{
client: client,
domain: domain,
lb: getNamedLoadBalancingPolicy(lbpolicy),
client: client,
domain: domain,
lb: getNamedLoadBalancingPolicy(lbpolicy),
stats: stats,
targetStats: targetStats,
}
}
......@@ -89,6 +118,7 @@ func (h *HttpRedirector) serveRelay(w http.ResponseWriter, r *http.Request) {
http.Error(w, "No active nodes", http.StatusServiceUnavailable)
return
}
h.targetStats.Add(relayAddr, 1)
// Create the m3u response.
m3u := fmt.Sprintf("http://%s%s\n", makeIcecastUrl(relayAddr), mount.Name)
......@@ -104,14 +134,41 @@ func (h *HttpRedirector) serveSource(w http.ResponseWriter, r *http.Request) {
if err != nil {
log.Printf("source: error retrieving mount for %+v: %s", r, err)
http.Error(w, "Not Found", http.StatusNotFound)
h.stats.Add("source_404", 1)
return
}
h.stats.Add("source_connections", 1)
// Handy function to report a source error (the ones we care
// the most about because they measure failures in the
// coordination infrastructure).
sendErr := func(err error) {
http.Error(w, err.Error(), http.StatusServiceUnavailable)
h.stats.Add("source_errors", 1)
}
// Find the current master node.
masterAddr, err := h.client.GetMasterAddr()
if err != nil {
log.Printf("source: no master: %s", err)
http.Error(w, err.Error(), http.StatusServiceUnavailable)
sendErr(err)
return
}
// Create the upstream connection, and write the original
// request to it as-is (the URL path on the backend is the
// same, and the headers do not need to change).
upstream, err := net.Dial("tcp", makeIcecastUrl(masterAddr))
if err != nil {
log.Printf("source: dial upstream: %v", err)
sendErr(err)
return
}
defer upstream.Close()
if err := r.Write(upstream); err != nil {
log.Printf("source: write upstream request: %v", err)
sendErr(err)
return
}
......@@ -131,7 +188,7 @@ func (h *HttpRedirector) serveSource(w http.ResponseWriter, r *http.Request) {
conn, _, err := w.(http.Hijacker).Hijack()
if err != nil {
log.Printf("source: hijack failed: %v", err)
http.Error(w, err.Error(), http.StatusServiceUnavailable)
sendErr(err)
return
}
defer conn.Close()
......@@ -139,20 +196,6 @@ func (h *HttpRedirector) serveSource(w http.ResponseWriter, r *http.Request) {
log.Printf("source: could not reset deadline: %v", err)
}
// Create the upstream connection, and write the original
// request to it as-is (the URL path on the backend is the
// same, and the headers do not need to change).
upstream, err := net.Dial("tcp", makeIcecastUrl(masterAddr))
if err != nil {
log.Printf("source: dial upstream: %v", err)
return
}
defer upstream.Close()
if err := r.Write(upstream); err != nil {
log.Printf("source: write upstream request: %v", err)
return
}
// Start two copiers, one for the source data, one for the
// replies. Wait until both are done.
var wg sync.WaitGroup
......@@ -224,10 +267,12 @@ func (h *HttpRedirector) Run(addr, staticDir, templateDir string) {
wraph := handlers.GZIPHandler(mux, nil)
logopts := handlers.NewLogOptions(nil, handlers.Lshort)
wraph = handlers.LogHandler(wraph, logopts)
wraph = statsHandler(wraph, h.stats)
// Serve SOURCE requests bypassing the logging and gzip
// handlers: since they wrap the ResponseWriter, we would be
// unable to hijack the underlying connection for proxying.
// TODO: look into using WrapWriter.
rooth := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method == "SOURCE" {
h.serveSource(w, r)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment