diff --git a/api.go b/api.go index 5d8e9311488cced157ca3e1ac031ade7bb57dfdc..090a032088c9dc708a489a5b48f04f16797f8651 100644 --- a/api.go +++ b/api.go @@ -43,16 +43,47 @@ func mountPath(mountName string) string { return MountPrefix + mountName[1:] } +// Status of a mount on an individual Icecast server. +type IcecastMountStatus struct { + Name string `xml:"name,attr"` + Listeners int `xml:"listeners"` + BitRate int `xml:"bitrate"` + Quality float32 `xml:"quality"` + VideoQuality float32 `xml:"video-quality"` + FrameSize string `xml:"frame-size"` + FrameRate float32 `xml:"frame-rate"` +} + +// Status of a node. This is used to report load and stream status. +type NodeStatus struct { + // Public IP of this server. + IP string + + // Is the Icecast server up? + IcecastUp bool + + // List of + Mounts []IcecastMountStatus `xml:"mount"` +} + +func (ns *NodeStatus) NumListeners() int { + listeners := 0 + for _, m := range ns.Mounts { + listeners += m.Listeners + } + return listeners +} + // Cache the list of active nodes (the front-ends that need to // retrieve this information continuously, so we limit them to 2qps). type nodesCache struct { ttl time.Duration - nodes []string + nodes []*NodeStatus deadline time.Time lock sync.Mutex } -type getNodesFunc func() ([]string, error) +type getNodesFunc func() ([]*NodeStatus, error) func newNodesCache() *nodesCache { return &nodesCache{ @@ -64,14 +95,14 @@ func newNodesCache() *nodesCache { // expired and we get an error from 'fn', we will attempt to return // the previously cached value anyway, along with the error: the // caller can then pick the right failure behavior. -func (nc *nodesCache) Get(fn getNodesFunc) ([]string, error) { +func (nc *nodesCache) Get(fn getNodesFunc) ([]*NodeStatus, error) { nc.lock.Lock() defer nc.lock.Unlock() var err error now := time.Now() if now.After(nc.deadline) { - var nodes []string + var nodes []*NodeStatus if nodes, err = fn(); err == nil { nc.nodes = nodes nc.deadline = now.Add(nc.ttl) @@ -162,7 +193,7 @@ func (r *RadioAPI) GetMasterAddr() (string, error) { } // GetNodes returns the list of active cluster nodes. -func (r *RadioAPI) doGetNodes() ([]string, error) { +func (r *RadioAPI) doGetNodes() ([]*NodeStatus, error) { response, err := r.client.Get(NodePrefix, false) if err != nil { return nil, err @@ -170,17 +201,32 @@ func (r *RadioAPI) doGetNodes() ([]string, error) { if !response.Dir { return nil, ErrIsFile } - result := make([]string, 0, len(response.Kvs)) + result := make([]*NodeStatus, 0, len(response.Kvs)) for _, entry := range response.Kvs { - result = append(result, entry.Value) + var ns NodeStatus + if err := json.NewDecoder(strings.NewReader(entry.Value)).Decode(&ns); err == nil { + result = append(result, &ns) + } } return result, nil } -func (r *RadioAPI) GetNodes() ([]string, error) { +func (r *RadioAPI) GetNodes() ([]*NodeStatus, error) { return r.activeNodesCache.Get(r.doGetNodes) } +func (r *RadioAPI) GetNodeIPs() ([]string, error) { + nodes, err := r.GetNodes() + if err != nil { + return nil, err + } + ips := make([]string, 0, len(nodes)) + for _, n := range(nodes) { + ips = append(ips, n.IP) + } + return ips, nil +} + // GeneratePassword returns a new random password. func GeneratePassword() string { b := make([]byte, 6) diff --git a/fe/dns.go b/fe/dns.go index 09ea0b84a96f865ba565632474c9d346094e1a97..309d3bbd9abfe866bcc3eee777208af0d581d8af 100644 --- a/fe/dns.go +++ b/fe/dns.go @@ -157,7 +157,7 @@ func (d *DnsRedirector) serveDNS(w dns.ResponseWriter, req *dns.Msg) { } // Serve all active nodes on every request. - ips, _ := d.client.GetNodes() + ips, _ := d.client.GetNodeIPs() if ips == nil || len(ips) == 0 { // In case of errors retrieving the list of diff --git a/fe/http.go b/fe/http.go index ff0333bbfc98b6ab9e9a0432266351d9b32f2743..09ad8058a64ad07dafd5bff82b038c5c82dbb712 100644 --- a/fe/http.go +++ b/fe/http.go @@ -40,8 +40,20 @@ func NewHttpRedirector(client *radioai.RadioAPI, domain string) *HttpRedirector // nodes yet). func (h *HttpRedirector) pickActiveNode() string { nodes, _ := h.client.GetNodes() - if nodes != nil && len(nodes) > 0 { - return nodes[rand.Intn(len(nodes))] + if nodes == nil { + return "" + } + + // Filter nodes where Icecast is reported to be up. + okNodes := make([]string, 0, len(nodes)) + for _, n := range nodes { + if n.IcecastUp { + okNodes = append(okNodes, n.IP) + } + } + + if len(okNodes) > 0 { + return okNodes[rand.Intn(len(okNodes))] } return "" } @@ -152,7 +164,7 @@ func (h *HttpRedirector) serveStatusPage(w http.ResponseWriter, r *http.Request) mounts, _ := h.client.ListMounts() ctx := struct { Domain string - Nodes []string + Nodes []*radioai.NodeStatus Mounts []*radioai.Mount }{h.domain, nodes, mounts} diff --git a/fe/static/style.css b/fe/static/style.css index 6a2dbdaa75cdc874edb07a6e44947492749bb0b7..37cb70c7a9142e7b8977d44ae0f0729dd50ab533 100644 --- a/fe/static/style.css +++ b/fe/static/style.css @@ -71,4 +71,8 @@ body { overflow: hidden; z-index: -100; opacity: 0.3; +} + +.error { + color: red } \ No newline at end of file diff --git a/fe/templates/index.html b/fe/templates/index.html index 973fc678b0da52d51b8e26b9bae649fdb12654f3..7b7ea7ab2976845dd7dbc7373f9ae0a1e71b306d 100644 --- a/fe/templates/index.html +++ b/fe/templates/index.html @@ -35,7 +35,9 @@ <div class="col-lg-6"> <h4>Nodes</h4> {{range .Nodes}} - <p>{{.}}</p> + <p>{{.IP}} ({{.NumListeners}}) + {{if not .IcecastUp}}<span class="error">(IC_DOWN)</span>{{end}} + </p> {{end}} </div> </div> diff --git a/node/icecast.go b/node/icecast.go index 87c7df8ae4a6dc7484b3a6c9690ab285c96b1ea3..f973d8f306b7f8b360898a7e7c5ed3f815df5731 100644 --- a/node/icecast.go +++ b/node/icecast.go @@ -2,29 +2,24 @@ package node import ( "encoding/xml" + "errors" "io" "net/http" - "errors" "os" "os/exec" "time" + + "git.autistici.org/ale/radioai" ) var ( - statusPageUrl = "http://localhost:8000/radioai.xsl" + statusPageUrl = "http://localhost:8000/status-radioai.xsl" ) -type icecastStatusMount struct { - Title string `xml:"title"` - Name string `xml:"name"` - Url string `xml:"url"` - Listeners int `xml:"listeners"` - BitRate int `xml:"bitrate"` -} - type IcecastStatus struct { - XMLName xml.Name `xml:"config"` - Mounts []icecastStatusMount + XMLName xml.Name `xml:"status",json:"-"` + Mounts []radioai.IcecastMountStatus `xml:"mount"` + Up bool } type IcecastController struct { @@ -36,12 +31,13 @@ type IcecastController struct { stop chan bool } -func NewIcecastController(publicIp string) *IcecastController { +func NewIcecastController(publicIp string, stop chan bool) *IcecastController { return &IcecastController{ PublicIp: publicIp, ConfigFile: "/etc/icecast2/icecast.xml", InitScript: "/etc/init.d/icecast2", config: newIcecastConfig(publicIp), + status: &IcecastStatus{}, stop: make(chan bool, 1), } } @@ -75,13 +71,20 @@ func (ic *IcecastController) Update(conf *ClusterConfig, isMaster bool, masterAd return ic.reload() } +func (ic *IcecastController) GetStatus() *IcecastStatus { + return ic.status +} + func (ic *IcecastController) statusUpdater() { t := time.NewTicker(3 * time.Second) + downStatus := &IcecastStatus{} for { select { case <-t.C: if status, err := ic.fetchStatus(); err == nil { ic.status = status + } else { + ic.status = downStatus } case <-ic.stop: return @@ -103,14 +106,10 @@ func (ic *IcecastController) parseStatusPage(input io.Reader) (*IcecastStatus, e if err := xml.NewDecoder(input).Decode(&status); err != nil { return nil, err } - + status.Up = true return &status, nil } func (ic *IcecastController) Run() { go ic.statusUpdater() } - -func (ic *IcecastController) Stop() { - close(ic.stop) -} diff --git a/node/node.go b/node/node.go index e4cd94886941030b04f757b362db782bedf864fc..3a29336eff206072fabb07b995a254ac5183d479 100644 --- a/node/node.go +++ b/node/node.go @@ -1,6 +1,7 @@ package node import ( + "bytes" "encoding/json" "log" "strings" @@ -225,7 +226,7 @@ func NewRadioNode(ip string, client *etcd.Client) *RadioNode { mech, stopch), watcher: NewConfigSyncer(client, config, upch, stopch), - icecast: NewIcecastController(ip), + icecast: NewIcecastController(ip, stopch), livenessTtl: 2, upch: upch, stop: stopch, @@ -240,9 +241,21 @@ func (rc *RadioNode) presence() { for { select { case <-ticker.C: - if _, err := rc.client.Set(radioai.NodePrefix+rc.ip, rc.ip, rc.livenessTtl); err != nil { + // Build our NodeStatus. + icecastSt := rc.icecast.GetStatus() + nodeSt := radioai.NodeStatus{ + IP: rc.ip, + IcecastUp: icecastSt.Up, + Mounts: icecastSt.Mounts, + } + + // Update our node entry in the database. + var buf bytes.Buffer + json.NewEncoder(&buf).Encode(&nodeSt) + if _, err := rc.client.Set(radioai.NodePrefix+rc.ip, buf.String(), rc.livenessTtl); err != nil { log.Printf("presence: Set(): %s", err) } + case <-rc.stop: return }