diff --git a/cmd/radiod/radiod.go b/cmd/radiod/radiod.go index cadd94732e16ff09201d30fb524ad07c82a110db..c4443669d519d4288303d3adf3b76810504da76f 100644 --- a/cmd/radiod/radiod.go +++ b/cmd/radiod/radiod.go @@ -8,6 +8,9 @@ import ( "strings" "syscall" + "net/http" + _ "net/http/pprof" + "git.autistici.org/ale/autoradio" "git.autistici.org/ale/autoradio/instrumentation" "git.autistici.org/ale/autoradio/node" @@ -21,6 +24,7 @@ var ( netDev = flag.String("interface", "", "Network interface to monitor for utilization. If unset, default to the interface associated with --ip.") bwLimit = flag.Int("bwlimit", 100, "Bandwidth usage limit (Mbps)") maxClients = flag.Int("max-clients", 1000, "Maximum number of connected clients") + debugAddr = flag.String("debug-addr", "", "Set to a host:port to enable a HTTP server with debugging information") ) func shortHostname() string { @@ -54,5 +58,18 @@ func main() { }() signal.Notify(stopch, syscall.SIGTERM, syscall.SIGINT) + if *debugAddr != "" { + http.Handle("/debug/node", n) + http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/" { + http.Redirect(w, r, "/debug/node", 302) + } + http.NotFound(w, r) + }) + go func() { + http.ListenAndServe(*debugAddr, nil) + }() + } + n.Run() } diff --git a/node/node.go b/node/node.go index 24122ade00fb314542e5576acf96c25d818d0d89..19110328c12472aff4183a4adff1237948d245cd 100644 --- a/node/node.go +++ b/node/node.go @@ -166,13 +166,18 @@ type RadioNode struct { // Node presence heartbeat. presence *presence.Presence - // How often to restart the Icecast daemon. + // Rate limiting for Icecast daemon restarts. reloadDelay time.Duration // Generator for transcodingControllers. Exposed as a member // so that it can be stubbed out during tests. transcoderFn transcodingControllerFunc + // All currently active transcoders (locked due to the + // async debugging handler). + transcodersMx sync.Mutex + transcoders map[string]*transcoder + // A note on channel types used for signaling: while I // personally prefer struct{} chans, the etcd interface for // Watch makes it convenient to use bool stop channels @@ -236,6 +241,7 @@ func NewRadioNode(name string, ips, internalIPs []net.IP, netDev string, bwLimit transcoderFn: func(p *liquidsoapParams) (transcodingController, error) { return newLiquidsoap(p) }, + transcoders: make(map[string]*transcoder), reloadDelay: 1000 * time.Millisecond, bw: bwmonitor.NewBandwidthUsageMonitor(netDev, bwLimit), maxListeners: maxListeners, @@ -286,9 +292,8 @@ func (rc *RadioNode) updater(stop chan bool) { // Keep track of all the configured transcoders (and clean // them up at the end). - transcoders := make(map[string]*transcoder) defer func() { - for _, t := range transcoders { + for _, t := range rc.transcoders { t.Stop() } }() @@ -320,8 +325,9 @@ func (rc *RadioNode) updater(stop chan bool) { // associated transcoder objects. We also need // to detect changes in the encoding params // and restart the transcoder if necessary. + rc.transcodersMx.Lock() tmp := make(map[string]struct{}) - for name := range transcoders { + for name := range rc.transcoders { tmp[name] = struct{}{} } for _, m := range rc.config.ListMounts() { @@ -330,7 +336,7 @@ func (rc *RadioNode) updater(stop chan bool) { } tparams := newLiquidsoapParams(m) - cur, ok := transcoders[m.Name] + cur, ok := rc.transcoders[m.Name] if ok { delete(tmp, m.Name) if cur.Changed(tparams) { @@ -343,14 +349,15 @@ func (rc *RadioNode) updater(stop chan bool) { rc.Log.Printf("could not create transcoder: %v", err) } else { t.Start() - transcoders[m.Name] = t + rc.transcoders[m.Name] = t } } } for name := range tmp { - transcoders[name].Stop() - delete(transcoders, name) + rc.transcoders[name].Stop() + delete(rc.transcoders, name) } + rc.transcodersMx.Unlock() // Limit the rate of reconfigurations. if rc.reloadDelay > 0 { diff --git a/node/node_debug.go b/node/node_debug.go new file mode 100644 index 0000000000000000000000000000000000000000..4f39e3c034af5d86bda006b512decd5d40f75d1b --- /dev/null +++ b/node/node_debug.go @@ -0,0 +1,96 @@ +package node + +import ( + "fmt" + "net/http" + "text/template" +) + +const debugText = ` + + + + + Node status: {{.Name}} + + + + + + + + + + + + + + + + + + + + + +
Name:{{.Name}}
Master:{{if .IsMaster}}YES{{else}}NO{{end}}
Icecast:{{if .IcecastUp}}OK{{else}}DOWN{{end}}
Bandwidth:{{.BandwidthUsage}}%
+

Transcoders

+ {{if .Transcoders}} + + + + + + + + + {{range .Transcoders}} + + + + + + + + {{end}} +
SourceTargetMountFormatBitrate/Q
{{.SourceURL}}{{.TargetIP}}:{{.TargetPort}}{{.TargetMount}}{{.Format}}{{if gt .BitRate 0}}{{.BitRate}}{{else}}{{.Quality}}{{end}}
+ {{else}} +

No active transcoders on this node.

+ {{end}} + +` + +var ( + debugTmpl = template.Must(template.New("node debug").Parse(debugText)) +) + +// ServeHTTP serves the debug console. +func (rc *RadioNode) ServeHTTP(w http.ResponseWriter, r *http.Request) { + rc.transcodersMx.Lock() + var transcoders []*transcoder + for _, t := range rc.transcoders { + transcoders = append(transcoders, t) + } + rc.transcodersMx.Unlock() + + ctx := struct { + Name string + IsMaster bool + IcecastUp bool + BandwidthUsage float64 + Transcoders []*transcoder + }{ + Name: rc.name, + IsMaster: rc.me.IsMaster(), + IcecastUp: rc.icecast.GetStatus().Up, + BandwidthUsage: rc.bw.GetUsage(), + Transcoders: transcoders, + } + err := debugTmpl.Execute(w, &ctx) + if err != nil { + fmt.Fprintln(w, "debug: error executing template: ", err.Error()) + } +}