diff --git a/README.rst b/README.rst index 198973af6c12dbca6c4c43634002c1d88c85d646..bf963a166a11d93962ab719d4fd1146346ac0969 100644 --- a/README.rst +++ b/README.rst @@ -1,12 +1,13 @@ - +======== radio.ai ======== The RadioAI service aims to provide a reliable, fault-tolerant Icecast streaming network for audio and video. It provides all the necessary components to ensure that the traffic from the source to the clients -is uninterrupted, even in face of high load or server crashes. +is uninterrupted, even in face of high load or server crashes. All +this, if possible, without any operator intervention. It is a full-stack service, meaning that it includes its own DNS and HTTP servers, for full control of the request flow. @@ -137,6 +138,14 @@ servers as the nameservers for zone delegation, and the other nodes are free to have dynamic membership. +Instrumentation ++++++++++++++++ + +The ``redirectord`` daemon exports statistics on served queries (along +with data on its own memory usage), in JSON format at the +``/debug/vars`` URL. + + .. _etcd: https://github.com/coreos/etcd -.. _autoca: https://git.autistici.org/public/projects/ai/autoca +.. _autoca: https://git.autistici.org/ai/autoca diff --git a/api.go b/api.go index 5d8e9311488cced157ca3e1ac031ade7bb57dfdc..7b49833b3fd5651d38e62234d2333b98b139e0b2 100644 --- a/api.go +++ b/api.go @@ -43,16 +43,47 @@ func mountPath(mountName string) string { return MountPrefix + mountName[1:] } +// Status of a mount on an individual Icecast server. +type IcecastMountStatus struct { + Name string + Listeners int + BitRate int + Quality float64 + VideoQuality float64 + FrameSize string + FrameRate float64 +} + +// Status of a node. This is used to report load and stream status. +type NodeStatus struct { + // Public IP of this server. + IP string + + // Is the Icecast server up? + IcecastUp bool + + // List of + Mounts []IcecastMountStatus `xml:"mount"` +} + +func (ns *NodeStatus) NumListeners() int { + listeners := 0 + for _, m := range ns.Mounts { + listeners += m.Listeners + } + return listeners +} + // Cache the list of active nodes (the front-ends that need to // retrieve this information continuously, so we limit them to 2qps). type nodesCache struct { ttl time.Duration - nodes []string + nodes []*NodeStatus deadline time.Time lock sync.Mutex } -type getNodesFunc func() ([]string, error) +type getNodesFunc func() ([]*NodeStatus, error) func newNodesCache() *nodesCache { return &nodesCache{ @@ -64,14 +95,14 @@ func newNodesCache() *nodesCache { // expired and we get an error from 'fn', we will attempt to return // the previously cached value anyway, along with the error: the // caller can then pick the right failure behavior. -func (nc *nodesCache) Get(fn getNodesFunc) ([]string, error) { +func (nc *nodesCache) Get(fn getNodesFunc) ([]*NodeStatus, error) { nc.lock.Lock() defer nc.lock.Unlock() var err error now := time.Now() if now.After(nc.deadline) { - var nodes []string + var nodes []*NodeStatus if nodes, err = fn(); err == nil { nc.nodes = nodes nc.deadline = now.Add(nc.ttl) @@ -162,7 +193,7 @@ func (r *RadioAPI) GetMasterAddr() (string, error) { } // GetNodes returns the list of active cluster nodes. -func (r *RadioAPI) doGetNodes() ([]string, error) { +func (r *RadioAPI) doGetNodes() ([]*NodeStatus, error) { response, err := r.client.Get(NodePrefix, false) if err != nil { return nil, err @@ -170,17 +201,32 @@ func (r *RadioAPI) doGetNodes() ([]string, error) { if !response.Dir { return nil, ErrIsFile } - result := make([]string, 0, len(response.Kvs)) + result := make([]*NodeStatus, 0, len(response.Kvs)) for _, entry := range response.Kvs { - result = append(result, entry.Value) + var ns NodeStatus + if err := json.NewDecoder(strings.NewReader(entry.Value)).Decode(&ns); err == nil { + result = append(result, &ns) + } } return result, nil } -func (r *RadioAPI) GetNodes() ([]string, error) { +func (r *RadioAPI) GetNodes() ([]*NodeStatus, error) { return r.activeNodesCache.Get(r.doGetNodes) } +func (r *RadioAPI) GetNodeIPs() ([]string, error) { + nodes, err := r.GetNodes() + if err != nil { + return nil, err + } + ips := make([]string, 0, len(nodes)) + for _, n := range(nodes) { + ips = append(ips, n.IP) + } + return ips, nil +} + // GeneratePassword returns a new random password. func GeneratePassword() string { b := make([]byte, 6) diff --git a/cmd/redirectord/redirectord.go b/cmd/redirectord/redirectord.go index f63c71ef80c28edea550705e8dd47a82c61139e3..5e862cea2b106baad35d78cd67c78f41adce272a 100644 --- a/cmd/redirectord/redirectord.go +++ b/cmd/redirectord/redirectord.go @@ -18,6 +18,8 @@ var ( staticDir = flag.String("static-dir", "/usr/share/radioai/htdocs/static", "Static content directory") templateDir = flag.String("template-dir", "/usr/share/radioai/htdocs/templates", "HTML templates directory") + lbPolicy = flag.String("lb-policy", "weighted", "Load balancing policy (weighted, leastloaded)") + // Default DNS TTL (seconds). dnsTtl = 5 ) @@ -35,6 +37,6 @@ func main() { dnsRed := fe.NewDnsRedirector(api, *domain, *publicIp, dnsTtl) dnsRed.Run(fmt.Sprintf(":%d", *dnsPort)) - red := fe.NewHttpRedirector(api, *domain) + red := fe.NewHttpRedirector(api, *domain, *lbPolicy) red.Run(fmt.Sprintf(":%d", *httpPort), *staticDir, *templateDir) } diff --git a/debian/radioai.xsl b/debian/radioai.xsl new file mode 100644 index 0000000000000000000000000000000000000000..a24302be283b37f03913b7273d56cbdc2ec826c0 --- /dev/null +++ b/debian/radioai.xsl @@ -0,0 +1,19 @@ +<xsl:stylesheet version="1.0" + xmlns:xsl="http://www.w3.org/1999/XSL/Transform"> + <xsl:output method="xml"></xsl:output> + <xsl:template match="/icestats"> +<status> + <xsl:for-each select="source"> + <mount> + <xsl:attribute name="name"><xsl:value-of select="@mount"></xsl:value-of></xsl:attribute> + <listeners><xsl:value-of select="listeners"></xsl:value-of></listeners> + <bitrate><xsl:value-of select="bitrate"></xsl:value-of></bitrate> + <quality><xsl:value-of select="quality"></xsl:value-of></quality> + <video-quality><xsl:value-of select="video_quality"></xsl:value-of></video-quality> + <frame-size><xsl:value-of select="frame_size"></xsl:value-of></frame-size> + <frame-rate><xsl:value-of select="frame_rate"></xsl:value-of></frame-rate> + </mount> + </xsl:for-each> +</status> + </xsl:template> +</xsl:stylesheet> diff --git a/debian/rules b/debian/rules index ba1c3e92abe0e623a8b5a62671bf1597832629b9..7b6a62bad15daed13cca8c43cfb68c39489a1f54 100755 --- a/debian/rules +++ b/debian/rules @@ -63,3 +63,7 @@ override_dh_install: chmod 0755 $(SVCDIR)/$$f/log/run ; \ done) + # Icecast2 status XSL template. + install -d -m 755 -o root -g root $(DESTDIR)/usr/share/icecast2/web + install -m 644 -o root -g root $(CURDIR)/debian/radioai.xsl \ + $(DESTDIR)/usr/share/icecast2/web/status-radioai.xsl diff --git a/fe/dns.go b/fe/dns.go index 09ea0b84a96f865ba565632474c9d346094e1a97..b2a827e309b2b5611f8d6a38d97c6630012dde85 100644 --- a/fe/dns.go +++ b/fe/dns.go @@ -1,6 +1,7 @@ package fe import ( + "expvar" "fmt" "log" "math/rand" @@ -8,8 +9,8 @@ import ( "strings" "time" - "github.com/miekg/dns" "git.autistici.org/ale/radioai" + "github.com/miekg/dns" ) var ( @@ -29,6 +30,8 @@ var ( // DNS server. type DnsRedirector struct { client *radioai.RadioAPI + queryStats *expvar.Map + targetStats *expvar.Map origin string originNumParts int publicIp string @@ -60,14 +63,20 @@ func NewDnsRedirector(client *radioai.RadioAPI, origin, publicIp string, ttl int Minttl: uint32(ttl), } - return &DnsRedirector{ + d := &DnsRedirector{ client: client, origin: origin, originNumParts: len(dns.SplitDomainName(origin)), publicIp: publicIp, ttl: ttl, soa: soa, + queryStats: new(expvar.Map).Init(), + targetStats: new(expvar.Map).Init(), } + statsMap := expvar.NewMap("dns") + statsMap.Set("queries", d.queryStats) + statsMap.Set("targets", d.targetStats) + return d } // Randomly shuffle a list of strings. @@ -157,7 +166,7 @@ func (d *DnsRedirector) serveDNS(w dns.ResponseWriter, req *dns.Msg) { } // Serve all active nodes on every request. - ips, _ := d.client.GetNodes() + ips, _ := d.client.GetNodeIPs() if ips == nil || len(ips) == 0 { // In case of errors retrieving the list of @@ -179,6 +188,7 @@ func (d *DnsRedirector) serveDNS(w dns.ResponseWriter, req *dns.Msg) { for _, ip := range ips { rec := d.recordForIp(query, ip) m.Answer = append(m.Answer, rec) + d.targetStats.Add(ip, 1) } responseMsg = fmt.Sprintf("%v", ips) @@ -188,6 +198,12 @@ func (d *DnsRedirector) serveDNS(w dns.ResponseWriter, req *dns.Msg) { responseMsg = "NXDOMAIN" } + if responseMsg == "NXDOMAIN" { + d.queryStats.Add("errors", 1) + } else { + d.queryStats.Add("ok", 1) + } + log.Printf("[%d] %s.%s %s (from %s) -> %s", req.MsgHdr.Id, query, d.origin, dns.TypeToString[req.Question[0].Qtype], w.RemoteAddr(), responseMsg) ednsFromRequest(req, m) diff --git a/fe/http.go b/fe/http.go index 1159efc58148f6b736c56979da3051671ded8f84..863244c96aaee4d47280ab8ad732ff5e92665922 100644 --- a/fe/http.go +++ b/fe/http.go @@ -2,11 +2,11 @@ package fe import ( "bytes" + "expvar" "fmt" "html/template" "io" "log" - "math/rand" "net" "net/http" "path/filepath" @@ -21,29 +21,74 @@ import ( "github.com/PuerkitoBio/ghost/handlers" ) +type statsResponseWriter struct { + http.ResponseWriter + code int +} + +func (w *statsResponseWriter) WriteHeader(code int) { + w.code = code + w.ResponseWriter.WriteHeader(code) +} + +func statsHandler(h http.Handler, stats *expvar.Map) http.HandlerFunc { + // Add a sub-map to hold aggregate HTTP status stats. + statusMap := new(expvar.Map).Init() + stats.Set("status", statusMap) + return func(w http.ResponseWriter, r *http.Request) { + wrapw := &statsResponseWriter{w, 200} + h.ServeHTTP(wrapw, r) + statusMap.Add(strconv.Itoa(wrapw.code), 1) + } +} + // HTTP redirector. type HttpRedirector struct { - domain string - client *radioai.RadioAPI - template *template.Template + domain string + lb LoadBalancingPolicy + client *radioai.RadioAPI + template *template.Template + stats *expvar.Map + targetStats *expvar.Map } -func NewHttpRedirector(client *radioai.RadioAPI, domain string) *HttpRedirector { +func NewHttpRedirector(client *radioai.RadioAPI, domain string, lbpolicy string) *HttpRedirector { + targetStats := new(expvar.Map).Init() + stats := expvar.NewMap("http") + stats.Set("targets", targetStats) return &HttpRedirector{ - client: client, - domain: domain, + client: client, + domain: domain, + lb: getNamedLoadBalancingPolicy(lbpolicy), + stats: stats, + targetStats: targetStats, } } -// Return an active node, chosen randomly (this is currently our load -// balancing policy, since there is no status information about the -// nodes yet). +// Return an active node, chosen according to the current load +// balancing policy. func (h *HttpRedirector) pickActiveNode() string { nodes, _ := h.client.GetNodes() - if nodes != nil && len(nodes) > 0 { - return nodes[rand.Intn(len(nodes))] + if nodes == nil { + return "" + } + + // Filter nodes where Icecast is reported to be up. + okNodes := make([]*radioai.NodeStatus, 0, len(nodes)) + for _, n := range nodes { + if n.IcecastUp { + okNodes = append(okNodes, n) + } + } + if len(okNodes) == 0 { + return "" + } + + result := h.lb.GetNode(okNodes) + if result == nil { + return "" } - return "" + return result.IP } // Parse the request and extract the mount path. @@ -73,6 +118,7 @@ func (h *HttpRedirector) serveRelay(w http.ResponseWriter, r *http.Request) { http.Error(w, "No active nodes", http.StatusServiceUnavailable) return } + h.targetStats.Add(relayAddr, 1) // Create the m3u response. m3u := fmt.Sprintf("http://%s%s\n", makeIcecastUrl(relayAddr), mount.Name) @@ -88,14 +134,41 @@ func (h *HttpRedirector) serveSource(w http.ResponseWriter, r *http.Request) { if err != nil { log.Printf("source: error retrieving mount for %+v: %s", r, err) http.Error(w, "Not Found", http.StatusNotFound) + h.stats.Add("source_404", 1) return } + h.stats.Add("source_connections", 1) + + // Handy function to report a source error (the ones we care + // the most about because they measure failures in the + // coordination infrastructure). + sendErr := func(err error) { + http.Error(w, err.Error(), http.StatusServiceUnavailable) + h.stats.Add("source_errors", 1) + } + // Find the current master node. masterAddr, err := h.client.GetMasterAddr() if err != nil { log.Printf("source: no master: %s", err) - http.Error(w, err.Error(), http.StatusServiceUnavailable) + sendErr(err) + return + } + + // Create the upstream connection, and write the original + // request to it as-is (the URL path on the backend is the + // same, and the headers do not need to change). + upstream, err := net.Dial("tcp", makeIcecastUrl(masterAddr)) + if err != nil { + log.Printf("source: dial upstream: %v", err) + sendErr(err) + return + } + defer upstream.Close() + if err := r.Write(upstream); err != nil { + log.Printf("source: write upstream request: %v", err) + sendErr(err) return } @@ -115,7 +188,7 @@ func (h *HttpRedirector) serveSource(w http.ResponseWriter, r *http.Request) { conn, _, err := w.(http.Hijacker).Hijack() if err != nil { log.Printf("source: hijack failed: %v", err) - http.Error(w, err.Error(), http.StatusServiceUnavailable) + sendErr(err) return } defer conn.Close() @@ -123,20 +196,6 @@ func (h *HttpRedirector) serveSource(w http.ResponseWriter, r *http.Request) { log.Printf("source: could not reset deadline: %v", err) } - // Create the upstream connection, and write the original - // request to it as-is (the URL path on the backend is the - // same, and the headers do not need to change). - upstream, err := net.Dial("tcp", makeIcecastUrl(masterAddr)) - if err != nil { - log.Printf("source: dial upstream: %v", err) - return - } - defer upstream.Close() - if err := r.Write(upstream); err != nil { - log.Printf("source: write upstream request: %v", err) - return - } - // Start two copiers, one for the source data, one for the // replies. Wait until both are done. var wg sync.WaitGroup @@ -161,7 +220,7 @@ func (h *HttpRedirector) serveStatusPage(w http.ResponseWriter, r *http.Request) mounts, _ := h.client.ListMounts() ctx := struct { Domain string - Nodes []string + Nodes []*radioai.NodeStatus Mounts []*radioai.Mount }{h.domain, nodes, mounts} @@ -172,6 +231,8 @@ func (h *HttpRedirector) serveStatusPage(w http.ResponseWriter, r *http.Request) } w.Header().Set("Content-Type", "text/html; charset=utf-8") w.Header().Set("Content-Length", strconv.Itoa(buf.Len())) + w.Header().Set("Expires", "-1") + w.Header().Set("Cache-Control", "private, max-age=0") w.Write(buf.Bytes()) } @@ -206,10 +267,12 @@ func (h *HttpRedirector) Run(addr, staticDir, templateDir string) { wraph := handlers.GZIPHandler(mux, nil) logopts := handlers.NewLogOptions(nil, handlers.Lshort) wraph = handlers.LogHandler(wraph, logopts) + wraph = statsHandler(wraph, h.stats) // Serve SOURCE requests bypassing the logging and gzip // handlers: since they wrap the ResponseWriter, we would be // unable to hijack the underlying connection for proxying. + // TODO: look into using WrapWriter. rooth := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.Method == "SOURCE" { h.serveSource(w, r) diff --git a/fe/loadbalancing.go b/fe/loadbalancing.go new file mode 100644 index 0000000000000000000000000000000000000000..4a1afe5f8bfd847c20e6d807d3f32d6fde7ebf12 --- /dev/null +++ b/fe/loadbalancing.go @@ -0,0 +1,60 @@ +package fe + +import ( + "log" + + "git.autistici.org/ale/radioai" + "github.com/jmcvetta/randutil" +) + +// A load balancing policy selects a single node from the pool of +// currently active ones. +type LoadBalancingPolicy interface { + GetNode([]*radioai.NodeStatus) *radioai.NodeStatus +} + +// Simple load balancing policy that always returns the nodes with the +// least amount of listeners. +type leastListenersPolicy struct{} + +func (llp leastListenersPolicy) GetNode(nodes []*radioai.NodeStatus) *radioai.NodeStatus { + minIdx := 0 + min := 1000000 + for i, n := range nodes { + if listeners := n.NumListeners(); listeners < min { + minIdx = i + min = listeners + } + } + return nodes[minIdx] +} + +// Simple load balancing policy that selects a node randomly, +// associating a weight to each node inversely proportional to the +// number of active listeners. +type weightedListenersPolicy struct{} + +func (wlp weightedListenersPolicy) GetNode(nodes []*radioai.NodeStatus) *radioai.NodeStatus { + choices := make([]randutil.Choice, 0, len(nodes)) + weightBase := 1000000 + for _, n := range nodes { + w := weightBase / (n.NumListeners() + 1) + choices = append(choices, randutil.Choice{w, n}) + } + result, err := randutil.WeightedChoice(choices) + if err != nil { + return nil + } + return result.Item.(*radioai.NodeStatus) +} + +func getNamedLoadBalancingPolicy(lbpolicy string) LoadBalancingPolicy { + switch lbpolicy { + case "leastloaded", "ll": + return &leastListenersPolicy{} + case "weighted", "wl": + return &weightedListenersPolicy{} + } + log.Fatalf("Unknown load-balancing policy '%s'", lbpolicy) + return nil +} diff --git a/fe/static/style.css b/fe/static/style.css index 6a2dbdaa75cdc874edb07a6e44947492749bb0b7..37cb70c7a9142e7b8977d44ae0f0729dd50ab533 100644 --- a/fe/static/style.css +++ b/fe/static/style.css @@ -71,4 +71,8 @@ body { overflow: hidden; z-index: -100; opacity: 0.3; +} + +.error { + color: red } \ No newline at end of file diff --git a/fe/templates/index.html b/fe/templates/index.html index 973fc678b0da52d51b8e26b9bae649fdb12654f3..7b7ea7ab2976845dd7dbc7373f9ae0a1e71b306d 100644 --- a/fe/templates/index.html +++ b/fe/templates/index.html @@ -35,7 +35,9 @@ <div class="col-lg-6"> <h4>Nodes</h4> {{range .Nodes}} - <p>{{.}}</p> + <p>{{.IP}} ({{.NumListeners}}) + {{if not .IcecastUp}}<span class="error">(IC_DOWN)</span>{{end}} + </p> {{end}} </div> </div> diff --git a/node/icecast.go b/node/icecast.go index 50b98a28c219588b52b165c664e936088c4038ee..88aa0c85594fc324be51efbf70ef895f9ddee2f2 100644 --- a/node/icecast.go +++ b/node/icecast.go @@ -1,24 +1,66 @@ package node import ( + "encoding/xml" "errors" + "io" + "log" + "net/http" "os" "os/exec" + "strconv" + "time" + + "git.autistici.org/ale/radioai" +) + +var ( + statusPageUrl = "http://localhost:8000/status-radioai.xsl" ) +// Icecast returns empty fields in our status handler, which we'll +// need to turn into integers (the xml unmarshaler will return an +// error in this specific case), so we use a separate type for +// decoding the status page output. This would be much simpler if I +// knew how to get the XSLT to put a default value in the output +// instead of an empty field... +type icecastMountStatusUnparsed struct { + Name string `xml:"name,attr"` + Listeners string `xml:"listeners"` + BitRate string `xml:"bitrate"` + Quality string `xml:"quality"` + VideoQuality string `xml:"video-quality"` + FrameSize string `xml:"frame-size"` + FrameRate string `xml:"frame-rate"` +} + +type icecastStatusUnparsed struct { + XMLName xml.Name `xml:"status"` + Mounts []icecastMountStatusUnparsed `xml:"mount"` +} + +type IcecastStatus struct { + Mounts []radioai.IcecastMountStatus + Up bool +} + type IcecastController struct { PublicIp string ConfigFile string InitScript string config *icecastConfig + status *IcecastStatus + stop chan bool } -func NewIcecastController(publicIp string) *IcecastController { +func NewIcecastController(publicIp string, stop chan bool) *IcecastController { return &IcecastController{ PublicIp: publicIp, ConfigFile: "/etc/icecast2/icecast.xml", InitScript: "/etc/init.d/icecast2", config: newIcecastConfig(publicIp), + status: &IcecastStatus{}, + stop: make(chan bool, 1), } } @@ -50,3 +92,78 @@ func (ic *IcecastController) Update(conf *ClusterConfig, isMaster bool, masterAd return ic.reload() } + +func (ic *IcecastController) GetStatus() *IcecastStatus { + return ic.status +} + +func (ic *IcecastController) statusUpdater() { + t := time.NewTicker(3 * time.Second) + downStatus := &IcecastStatus{} + for { + select { + case <-t.C: + if status, err := ic.fetchStatus(); err == nil { + ic.status = status + } else { + log.Printf("bad status from iceast: %v", err) + ic.status = downStatus + } + case <-ic.stop: + return + } + } +} + +func (ic *IcecastController) fetchStatus() (*IcecastStatus, error) { + resp, err := http.Get(statusPageUrl) + if err != nil { + return nil, err + } + defer resp.Body.Close() + return ic.parseStatusPage(resp.Body) +} + +func (ic *IcecastController) parseStatusPage(input io.Reader) (*IcecastStatus, error) { + var ustatus icecastStatusUnparsed + if err := xml.NewDecoder(input).Decode(&ustatus); err != nil { + return nil, err + } + + // Quick converters from string that default to 0. + toi := func(s string) int { + if i, err := strconv.Atoi(s); err == nil { + return i + } + return 0 + } + tof := func(s string) float64 { + if f, err := strconv.ParseFloat(s, 64); err == nil { + return f + } + return 0 + } + + status := IcecastStatus{ + Up: true, + Mounts: make([]radioai.IcecastMountStatus, 0, len(ustatus.Mounts)), + } + for _, um := range ustatus.Mounts { + m := radioai.IcecastMountStatus{ + Name: um.Name, + Listeners: toi(um.Listeners), + BitRate: toi(um.BitRate), + Quality: tof(um.Quality), + VideoQuality: tof(um.VideoQuality), + FrameSize: um.FrameSize, + FrameRate: tof(um.FrameRate), + } + status.Mounts = append(status.Mounts, m) + } + + return &status, nil +} + +func (ic *IcecastController) Run() { + ic.statusUpdater() +} diff --git a/node/icecast_test.go b/node/icecast_test.go new file mode 100644 index 0000000000000000000000000000000000000000..2e49694083bd0d7f347bec0fb20b83e76be28e6d --- /dev/null +++ b/node/icecast_test.go @@ -0,0 +1,26 @@ +package node + +import ( + "strings" + "testing" +) + +func TestIcecast_TestParseStatusPage(t *testing.T) { + xml := `<?xml version="1.0"?> +<status><mount name="/test.ogg"><listeners>3</listeners><bitrate/><quality/><video-quality/><frame-size/><frame-rate/></mount></status>` + + ic := NewIcecastController("1.2.3.4", make(chan bool)) + result, err := ic.parseStatusPage(strings.NewReader(xml)) + if err != nil { + t.Fatal(err) + } + if len(result.Mounts) != 1 { + t.Fatalf("Bad number of mounts: %+v", result) + } + if result.Mounts[0].Name != "/test.ogg" { + t.Fatalf("Bad mount name: %+v", result) + } + if result.Mounts[0].Listeners != 3 { + t.Fatalf("Bad # of listeners: %+v", result) + } +} diff --git a/node/node.go b/node/node.go index e4cd94886941030b04f757b362db782bedf864fc..390c8cf5f24d248adb9879458b2e31c0f5951930 100644 --- a/node/node.go +++ b/node/node.go @@ -1,6 +1,7 @@ package node import ( + "bytes" "encoding/json" "log" "strings" @@ -225,7 +226,7 @@ func NewRadioNode(ip string, client *etcd.Client) *RadioNode { mech, stopch), watcher: NewConfigSyncer(client, config, upch, stopch), - icecast: NewIcecastController(ip), + icecast: NewIcecastController(ip, stopch), livenessTtl: 2, upch: upch, stop: stopch, @@ -240,9 +241,21 @@ func (rc *RadioNode) presence() { for { select { case <-ticker.C: - if _, err := rc.client.Set(radioai.NodePrefix+rc.ip, rc.ip, rc.livenessTtl); err != nil { + // Build our NodeStatus. + icecastSt := rc.icecast.GetStatus() + nodeSt := radioai.NodeStatus{ + IP: rc.ip, + IcecastUp: icecastSt.Up, + Mounts: icecastSt.Mounts, + } + + // Update our node entry in the database. + var buf bytes.Buffer + json.NewEncoder(&buf).Encode(&nodeSt) + if _, err := rc.client.Set(radioai.NodePrefix+rc.ip, buf.String(), rc.livenessTtl); err != nil { log.Printf("presence: Set(): %s", err) } + case <-rc.stop: return } @@ -266,6 +279,9 @@ func (rc *RadioNode) Run() { // initialize. time.Sleep(200 * time.Millisecond) + // Start the Icecast status checker. + go rc.icecast.Run() + log.Printf("starting icecast updater") for { select {