Skip to content
Snippets Groups Projects
Commit 1c01bf6c authored by ale's avatar ale
Browse files

Merge branch 'experimental' into jessie

parents 25107e12 c0ff2ab8
No related branches found
No related tags found
No related merge requests found
......@@ -8,6 +8,9 @@ import (
"strings"
"syscall"
"net/http"
_ "net/http/pprof"
"git.autistici.org/ale/autoradio"
"git.autistici.org/ale/autoradio/instrumentation"
"git.autistici.org/ale/autoradio/node"
......@@ -21,6 +24,7 @@ var (
netDev = flag.String("interface", "", "Network interface to monitor for utilization. If unset, default to the interface associated with --ip.")
bwLimit = flag.Int("bwlimit", 100, "Bandwidth usage limit (Mbps)")
maxClients = flag.Int("max-clients", 1000, "Maximum number of connected clients")
debugAddr = flag.String("debug-addr", "", "Set to a host:port to enable a HTTP server with debugging information")
)
func shortHostname() string {
......@@ -54,5 +58,18 @@ func main() {
}()
signal.Notify(stopch, syscall.SIGTERM, syscall.SIGINT)
if *debugAddr != "" {
http.Handle("/debug/node", n)
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/" {
http.Redirect(w, r, "/debug/node", 302)
}
http.NotFound(w, r)
})
go func() {
http.ListenAndServe(*debugAddr, nil)
}()
}
n.Run()
}
......@@ -166,13 +166,18 @@ type RadioNode struct {
// Node presence heartbeat.
presence *presence.Presence
// How often to restart the Icecast daemon.
// Rate limiting for Icecast daemon restarts.
reloadDelay time.Duration
// Generator for transcodingControllers. Exposed as a member
// so that it can be stubbed out during tests.
transcoderFn transcodingControllerFunc
// All currently active transcoders (locked due to the
// async debugging handler).
transcodersMx sync.Mutex
transcoders map[string]*transcoder
// A note on channel types used for signaling: while I
// personally prefer struct{} chans, the etcd interface for
// Watch makes it convenient to use bool stop channels
......@@ -236,6 +241,7 @@ func NewRadioNode(name string, ips, internalIPs []net.IP, netDev string, bwLimit
transcoderFn: func(p *liquidsoapParams) (transcodingController, error) {
return newLiquidsoap(p)
},
transcoders: make(map[string]*transcoder),
reloadDelay: 1000 * time.Millisecond,
bw: bwmonitor.NewBandwidthUsageMonitor(netDev, bwLimit),
maxListeners: maxListeners,
......@@ -286,9 +292,8 @@ func (rc *RadioNode) updater(stop chan bool) {
// Keep track of all the configured transcoders (and clean
// them up at the end).
transcoders := make(map[string]*transcoder)
defer func() {
for _, t := range transcoders {
for _, t := range rc.transcoders {
t.Stop()
}
}()
......@@ -320,8 +325,9 @@ func (rc *RadioNode) updater(stop chan bool) {
// associated transcoder objects. We also need
// to detect changes in the encoding params
// and restart the transcoder if necessary.
rc.transcodersMx.Lock()
tmp := make(map[string]struct{})
for name := range transcoders {
for name := range rc.transcoders {
tmp[name] = struct{}{}
}
for _, m := range rc.config.ListMounts() {
......@@ -330,7 +336,7 @@ func (rc *RadioNode) updater(stop chan bool) {
}
tparams := newLiquidsoapParams(m)
cur, ok := transcoders[m.Name]
cur, ok := rc.transcoders[m.Name]
if ok {
delete(tmp, m.Name)
if cur.Changed(tparams) {
......@@ -343,14 +349,15 @@ func (rc *RadioNode) updater(stop chan bool) {
rc.Log.Printf("could not create transcoder: %v", err)
} else {
t.Start()
transcoders[m.Name] = t
rc.transcoders[m.Name] = t
}
}
}
for name := range tmp {
transcoders[name].Stop()
delete(transcoders, name)
rc.transcoders[name].Stop()
delete(rc.transcoders, name)
}
rc.transcodersMx.Unlock()
// Limit the rate of reconfigurations.
if rc.reloadDelay > 0 {
......
package node
import (
"fmt"
"net/http"
"text/template"
)
const debugText = `<html>
<head>
<style type="text/css">
.info th { text-align: right; }
.error { color: red; }
</style>
</head>
<body>
<title>Node status: {{.Name}}</title>
<table class="info">
<tr>
<th>Name:</th>
<td>{{.Name}}</td>
</tr>
<tr>
<th>Master:</th>
<td>{{if .IsMaster}}YES{{else}}NO{{end}}</td>
</tr>
<tr>
<th>Icecast:</th>
<td>{{if .IcecastUp}}OK{{else}}<span class="error">DOWN</span>{{end}}</td>
</tr>
<tr>
<th>Bandwidth:</th>
<td>{{.BandwidthUsage}}%</td>
</tr>
<tr>
<th></th>
<td></td>
</tr>
</table>
<h3>Transcoders</h3>
{{if .Transcoders}}
<table>
<tr>
<th>Source</th>
<th>Target</th>
<th>Mount</th>
<th>Format</th>
<th>Bitrate/Q</th>
</tr>
{{range .Transcoders}}
<tr>
<td>{{.SourceURL}}</td>
<td>{{.TargetIP}}:{{.TargetPort}}</td>
<td>{{.TargetMount}}</td>
<td>{{.Format}}</td>
<td>{{if gt .BitRate 0}}{{.BitRate}}{{else}}{{.Quality}}{{end}}</td>
</tr>
{{end}}
</table>
{{else}}
<p>No active transcoders on this node.</p>
{{end}}
</body>
</html>`
var (
debugTmpl = template.Must(template.New("node debug").Parse(debugText))
)
// ServeHTTP serves the debug console.
func (rc *RadioNode) ServeHTTP(w http.ResponseWriter, r *http.Request) {
rc.transcodersMx.Lock()
var transcoders []*transcoder
for _, t := range rc.transcoders {
transcoders = append(transcoders, t)
}
rc.transcodersMx.Unlock()
ctx := struct {
Name string
IsMaster bool
IcecastUp bool
BandwidthUsage float64
Transcoders []*transcoder
}{
Name: rc.name,
IsMaster: rc.me.IsMaster(),
IcecastUp: rc.icecast.GetStatus().Up,
BandwidthUsage: rc.bw.GetUsage(),
Transcoders: transcoders,
}
err := debugTmpl.Execute(w, &ctx)
if err != nil {
fmt.Fprintln(w, "debug: error executing template: ", err.Error())
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment