Skip to content
Snippets Groups Projects
Commit 18fe4211 authored by ale's avatar ale
Browse files

report node status using the presence protocol

parent 4f7c34a0
No related branches found
No related tags found
No related merge requests found
......@@ -43,16 +43,47 @@ func mountPath(mountName string) string {
return MountPrefix + mountName[1:]
}
// Status of a mount on an individual Icecast server.
type IcecastMountStatus struct {
Name string `xml:"name,attr"`
Listeners int `xml:"listeners"`
BitRate int `xml:"bitrate"`
Quality float32 `xml:"quality"`
VideoQuality float32 `xml:"video-quality"`
FrameSize string `xml:"frame-size"`
FrameRate float32 `xml:"frame-rate"`
}
// Status of a node. This is used to report load and stream status.
type NodeStatus struct {
// Public IP of this server.
IP string
// Is the Icecast server up?
IcecastUp bool
// List of
Mounts []IcecastMountStatus `xml:"mount"`
}
func (ns *NodeStatus) NumListeners() int {
listeners := 0
for _, m := range ns.Mounts {
listeners += m.Listeners
}
return listeners
}
// Cache the list of active nodes (the front-ends that need to
// retrieve this information continuously, so we limit them to 2qps).
type nodesCache struct {
ttl time.Duration
nodes []string
nodes []*NodeStatus
deadline time.Time
lock sync.Mutex
}
type getNodesFunc func() ([]string, error)
type getNodesFunc func() ([]*NodeStatus, error)
func newNodesCache() *nodesCache {
return &nodesCache{
......@@ -64,14 +95,14 @@ func newNodesCache() *nodesCache {
// expired and we get an error from 'fn', we will attempt to return
// the previously cached value anyway, along with the error: the
// caller can then pick the right failure behavior.
func (nc *nodesCache) Get(fn getNodesFunc) ([]string, error) {
func (nc *nodesCache) Get(fn getNodesFunc) ([]*NodeStatus, error) {
nc.lock.Lock()
defer nc.lock.Unlock()
var err error
now := time.Now()
if now.After(nc.deadline) {
var nodes []string
var nodes []*NodeStatus
if nodes, err = fn(); err == nil {
nc.nodes = nodes
nc.deadline = now.Add(nc.ttl)
......@@ -162,7 +193,7 @@ func (r *RadioAPI) GetMasterAddr() (string, error) {
}
// GetNodes returns the list of active cluster nodes.
func (r *RadioAPI) doGetNodes() ([]string, error) {
func (r *RadioAPI) doGetNodes() ([]*NodeStatus, error) {
response, err := r.client.Get(NodePrefix, false)
if err != nil {
return nil, err
......@@ -170,17 +201,32 @@ func (r *RadioAPI) doGetNodes() ([]string, error) {
if !response.Dir {
return nil, ErrIsFile
}
result := make([]string, 0, len(response.Kvs))
result := make([]*NodeStatus, 0, len(response.Kvs))
for _, entry := range response.Kvs {
result = append(result, entry.Value)
var ns NodeStatus
if err := json.NewDecoder(strings.NewReader(entry.Value)).Decode(&ns); err == nil {
result = append(result, &ns)
}
}
return result, nil
}
func (r *RadioAPI) GetNodes() ([]string, error) {
func (r *RadioAPI) GetNodes() ([]*NodeStatus, error) {
return r.activeNodesCache.Get(r.doGetNodes)
}
func (r *RadioAPI) GetNodeIPs() ([]string, error) {
nodes, err := r.GetNodes()
if err != nil {
return nil, err
}
ips := make([]string, 0, len(nodes))
for _, n := range(nodes) {
ips = append(ips, n.IP)
}
return ips, nil
}
// GeneratePassword returns a new random password.
func GeneratePassword() string {
b := make([]byte, 6)
......
......@@ -157,7 +157,7 @@ func (d *DnsRedirector) serveDNS(w dns.ResponseWriter, req *dns.Msg) {
}
// Serve all active nodes on every request.
ips, _ := d.client.GetNodes()
ips, _ := d.client.GetNodeIPs()
if ips == nil || len(ips) == 0 {
// In case of errors retrieving the list of
......
......@@ -40,8 +40,20 @@ func NewHttpRedirector(client *radioai.RadioAPI, domain string) *HttpRedirector
// nodes yet).
func (h *HttpRedirector) pickActiveNode() string {
nodes, _ := h.client.GetNodes()
if nodes != nil && len(nodes) > 0 {
return nodes[rand.Intn(len(nodes))]
if nodes == nil {
return ""
}
// Filter nodes where Icecast is reported to be up.
okNodes := make([]string, 0, len(nodes))
for _, n := range nodes {
if n.IcecastUp {
okNodes = append(okNodes, n.IP)
}
}
if len(okNodes) > 0 {
return okNodes[rand.Intn(len(okNodes))]
}
return ""
}
......@@ -152,7 +164,7 @@ func (h *HttpRedirector) serveStatusPage(w http.ResponseWriter, r *http.Request)
mounts, _ := h.client.ListMounts()
ctx := struct {
Domain string
Nodes []string
Nodes []*radioai.NodeStatus
Mounts []*radioai.Mount
}{h.domain, nodes, mounts}
......
......@@ -71,4 +71,8 @@ body {
overflow: hidden;
z-index: -100;
opacity: 0.3;
}
.error {
color: red
}
\ No newline at end of file
......@@ -35,7 +35,9 @@
<div class="col-lg-6">
<h4>Nodes</h4>
{{range .Nodes}}
<p>{{.}}</p>
<p>{{.IP}} ({{.NumListeners}})
{{if not .IcecastUp}}<span class="error">(IC_DOWN)</span>{{end}}
</p>
{{end}}
</div>
</div>
......
......@@ -2,29 +2,24 @@ package node
import (
"encoding/xml"
"errors"
"io"
"net/http"
"errors"
"os"
"os/exec"
"time"
"git.autistici.org/ale/radioai"
)
var (
statusPageUrl = "http://localhost:8000/radioai.xsl"
statusPageUrl = "http://localhost:8000/status-radioai.xsl"
)
type icecastStatusMount struct {
Title string `xml:"title"`
Name string `xml:"name"`
Url string `xml:"url"`
Listeners int `xml:"listeners"`
BitRate int `xml:"bitrate"`
}
type IcecastStatus struct {
XMLName xml.Name `xml:"config"`
Mounts []icecastStatusMount
XMLName xml.Name `xml:"status",json:"-"`
Mounts []radioai.IcecastMountStatus `xml:"mount"`
Up bool
}
type IcecastController struct {
......@@ -36,12 +31,13 @@ type IcecastController struct {
stop chan bool
}
func NewIcecastController(publicIp string) *IcecastController {
func NewIcecastController(publicIp string, stop chan bool) *IcecastController {
return &IcecastController{
PublicIp: publicIp,
ConfigFile: "/etc/icecast2/icecast.xml",
InitScript: "/etc/init.d/icecast2",
config: newIcecastConfig(publicIp),
status: &IcecastStatus{},
stop: make(chan bool, 1),
}
}
......@@ -75,13 +71,20 @@ func (ic *IcecastController) Update(conf *ClusterConfig, isMaster bool, masterAd
return ic.reload()
}
func (ic *IcecastController) GetStatus() *IcecastStatus {
return ic.status
}
func (ic *IcecastController) statusUpdater() {
t := time.NewTicker(3 * time.Second)
downStatus := &IcecastStatus{}
for {
select {
case <-t.C:
if status, err := ic.fetchStatus(); err == nil {
ic.status = status
} else {
ic.status = downStatus
}
case <-ic.stop:
return
......@@ -103,14 +106,10 @@ func (ic *IcecastController) parseStatusPage(input io.Reader) (*IcecastStatus, e
if err := xml.NewDecoder(input).Decode(&status); err != nil {
return nil, err
}
status.Up = true
return &status, nil
}
func (ic *IcecastController) Run() {
go ic.statusUpdater()
}
func (ic *IcecastController) Stop() {
close(ic.stop)
}
package node
import (
"bytes"
"encoding/json"
"log"
"strings"
......@@ -225,7 +226,7 @@ func NewRadioNode(ip string, client *etcd.Client) *RadioNode {
mech,
stopch),
watcher: NewConfigSyncer(client, config, upch, stopch),
icecast: NewIcecastController(ip),
icecast: NewIcecastController(ip, stopch),
livenessTtl: 2,
upch: upch,
stop: stopch,
......@@ -240,9 +241,21 @@ func (rc *RadioNode) presence() {
for {
select {
case <-ticker.C:
if _, err := rc.client.Set(radioai.NodePrefix+rc.ip, rc.ip, rc.livenessTtl); err != nil {
// Build our NodeStatus.
icecastSt := rc.icecast.GetStatus()
nodeSt := radioai.NodeStatus{
IP: rc.ip,
IcecastUp: icecastSt.Up,
Mounts: icecastSt.Mounts,
}
// Update our node entry in the database.
var buf bytes.Buffer
json.NewEncoder(&buf).Encode(&nodeSt)
if _, err := rc.client.Set(radioai.NodePrefix+rc.ip, buf.String(), rc.livenessTtl); err != nil {
log.Printf("presence: Set(): %s", err)
}
case <-rc.stop:
return
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment