From dc5e2c0b093a273641107b596cc480939a2cc060 Mon Sep 17 00:00:00 2001 From: ale <ale@incal.net> Date: Sun, 30 Nov 2014 10:16:04 +0000 Subject: [PATCH] minor fixes for readability --- api.go | 12 ------- fe/dns.go | 22 +++++++++--- node/icecast.go | 4 +-- node/icecast_config.go | 68 +++++++++++++++++-------------------- node/icecast_config_test.go | 2 +- node/node.go | 55 +++++++++++++++--------------- 6 files changed, 79 insertions(+), 84 deletions(-) diff --git a/api.go b/api.go index 8a169251..4723534c 100644 --- a/api.go +++ b/api.go @@ -240,18 +240,6 @@ func (r *RadioAPI) GetNodes() ([]*NodeStatus, error) { return r.activeNodesCache.Get(r.doGetNodes) } -func (r *RadioAPI) GetNodeIPs() ([]string, error) { - nodes, err := r.GetNodes() - if err != nil { - return nil, err - } - ips := make([]string, 0, len(nodes)) - for _, n := range nodes { - ips = append(ips, n.IP) - } - return ips, nil -} - // GeneratePassword returns a new random password. func GeneratePassword() string { b := make([]byte, 6) diff --git a/fe/dns.go b/fe/dns.go index 15ae66cd..05425950 100644 --- a/fe/dns.go +++ b/fe/dns.go @@ -130,6 +130,15 @@ func (d *DnsRedirector) getQuestionName(req *dns.Msg) string { return strings.ToLower(strings.Join(ql, ".")) } +// Flatten IPs from the list of nodes. +func flattenIPs(nodes []*autoradio.NodeStatus) []string { + var ips []string + for _, n := range nodes { + ips = append(ips, n.IP) + } + return ips +} + func (d *DnsRedirector) serveDNS(w dns.ResponseWriter, req *dns.Msg) { m := new(dns.Msg) @@ -160,10 +169,15 @@ func (d *DnsRedirector) serveDNS(w dns.ResponseWriter, req *dns.Msg) { break } - // Serve all active nodes on every request. - ips, _ := d.client.GetNodeIPs() - - if ips == nil || len(ips) == 0 { + // Serve all active nodes on every request. We don't + // really care about errors from GetNodes as long as + // some nodes are returned (i.e. stale data from the + // cache is accepted). + var ips []string + nodes, _ := d.client.GetNodes() + if len(nodes) > 0 { + ips = flattenIPs(nodes) + } else { // In case of errors retrieving the list of // active nodes, fall back to serving our // public IP (just to avoid returning an empty diff --git a/node/icecast.go b/node/icecast.go index ac490f8e..f3a9c048 100644 --- a/node/icecast.go +++ b/node/icecast.go @@ -74,7 +74,7 @@ func (ic *IcecastController) reload() error { } // Kill sources connected to local streams. -func (ic *IcecastController) killSources(conf *ClusterConfig) error { +func (ic *IcecastController) killSources(conf *clusterConfig) error { var anyErr error client := &http.Client{} for _, m := range conf.ListMounts() { @@ -98,7 +98,7 @@ func (ic *IcecastController) killSources(conf *ClusterConfig) error { } // Update reloads the Icecast daemon with a new configuration. -func (ic *IcecastController) Update(conf *ClusterConfig, isMaster bool, masterAddr string) error { +func (ic *IcecastController) Update(conf *clusterConfig, isMaster bool, masterAddr string) error { if !isMaster && masterAddr == "" { return errors.New("unknown system state") } diff --git a/node/icecast_config.go b/node/icecast_config.go index e74482ea..746a1dec 100644 --- a/node/icecast_config.go +++ b/node/icecast_config.go @@ -15,7 +15,11 @@ import ( ) var ( - //shoutHttpPort = 8001 + // The per-node icecast client limit is set to a very high + // value in order to disable the enforcement at the icecast + // level (if everything goes well, the front-end traffic + // management code should know better). + // TODO: make it a flag anyway. maxClients = 10000 icecastAdminPwFile = "/etc/icecast/.admin_pw" @@ -105,8 +109,9 @@ type iceMountConfig struct { OnDisconnect string `xml:"on-disconnect,omitempty"` } -// Configuration of the local Icecast daemon (meant for serialization -// to XML). +// Configuration of the local Icecast daemon. This is a write-only +// object, meant for serialization to XML. We keep around a single +// copy of it and just update Relays and Mounts every time. type icecastConfig struct { XMLName xml.Name Limits iceLimitsConfig `xml:"limits"` @@ -128,18 +133,17 @@ type icecastConfig struct { // - It binds to the IcecastPort (defined in api.go) on all // interfaces. // -// - Local administration is practically disabled. A random admin -// password is created every time the daemon starts. Same goes for the -// global source password. +// - A random admin password is generated once on each node, and saved +// to a file for persistence. It is not really meant to be used by the +// operator. // -// Some of the parameters should probably be command-line flags, so -// that it is possible to set them on a per-host basis. +// TODO: Some of the parameters should probably be command-line flags, +// so that it is possible to set them on a per-host basis. // -func defaultDebianConfig(publicIp string) *icecastConfig { - // Set the icecast admin password once, on the first run, and - // save it on the filesystem. We don't use the global source - // password, but icecast is happier if it's set, so we just - // use a random password every time. +func newIcecastConfig(publicIp string) *icecastConfig { + // We don't use the global source password, but icecast is + // happier if it's set, so we just use a random password every + // time. sourcePw := autoradio.GeneratePassword() adminPw := getIcecastAdminPassword() @@ -178,15 +182,10 @@ func defaultDebianConfig(publicIp string) *icecastConfig { Security: iceSecurityConfig{0}, Listen: []iceListenConfig{ {"0.0.0.0", autoradio.IcecastPort, 0}, - //{"0.0.0.0", shoutHttpPort, 1}, }, } } -func newIcecastConfig(publicIp string) *icecastConfig { - return defaultDebianConfig(publicIp) -} - // Encode the configuration to XML. func (c *icecastConfig) Encode() ([]byte, error) { var buf bytes.Buffer @@ -218,7 +217,7 @@ func (c *icecastConfig) EncodeToFile(path string) error { return err } -func mountToConfig(m *autoradio.Mount) iceMountConfig { +func masterMountToIcecastConfig(m *autoradio.Mount) iceMountConfig { mconfig := iceMountConfig{ Name: autoradio.MountNameToIcecastPath(m.Name), Username: m.Username, @@ -234,7 +233,7 @@ func mountToConfig(m *autoradio.Mount) iceMountConfig { return mconfig } -func relayToConfig(m *autoradio.Mount) (iceRelayConfig, bool) { +func relayToIcecastConfig(m *autoradio.Mount) (iceRelayConfig, bool) { u, err := url.Parse(m.RelayUrl) if err != nil { // A failure here is almost invisible and not very @@ -267,10 +266,11 @@ func relayToConfig(m *autoradio.Mount) (iceRelayConfig, bool) { return rc, true } -func mountToRelayConfig(masterAddr string, m *autoradio.Mount) iceRelayConfig { +func slaveMountToIcecastConfig(masterAddr string, m *autoradio.Mount) iceRelayConfig { + path := autoradio.MountNameToIcecastPath(m.Name) return iceRelayConfig{ - Mount: autoradio.MountNameToIcecastPath(m.Name), - LocalMount: autoradio.MountNameToIcecastPath(m.Name), + Mount: path, + LocalMount: path, Server: masterAddr, Port: autoradio.IcecastPort, Username: m.Username, @@ -283,29 +283,23 @@ func mountToRelayConfig(masterAddr string, m *autoradio.Mount) iceRelayConfig { // Update the configuration with the current list of mounts and // masterelection state. This will clear the Mounts and Relays fields // and set them to new values. -func (ic *icecastConfig) Update(config *ClusterConfig, isMaster bool, masterAddr string) { - ic.Mounts = nil - ic.Relays = nil - mounts := make([]iceMountConfig, 0) - relays := make([]iceRelayConfig, 0) +func (ic *icecastConfig) Update(config *clusterConfig, isMaster bool, masterAddr string) { + var mounts []iceMountConfig + var relays []iceRelayConfig for _, m := range config.ListMounts() { switch { case m.IsRelay(): - if rc, ok := relayToConfig(m); ok { + if rc, ok := relayToIcecastConfig(m); ok { relays = append(relays, rc) } case isMaster: - mounts = append(mounts, mountToConfig(m)) + mounts = append(mounts, masterMountToIcecastConfig(m)) default: - relays = append(relays, mountToRelayConfig(masterAddr, m)) + relays = append(relays, slaveMountToIcecastConfig(masterAddr, m)) } } - if len(mounts) > 0 { - ic.Mounts = mounts - } - if len(relays) > 0 { - ic.Relays = relays - } + ic.Mounts = mounts + ic.Relays = relays } diff --git a/node/icecast_config_test.go b/node/icecast_config_test.go index 08099a72..23ac6398 100644 --- a/node/icecast_config_test.go +++ b/node/icecast_config_test.go @@ -13,7 +13,7 @@ func TestIcecastConfig(t *testing.T) { Username: "user", Password: "pass", } - c := NewClusterConfig() + c := newClusterConfig() c.setMount(mount) // Test a relay config. diff --git a/node/node.go b/node/node.go index 33db652d..a4e9466d 100644 --- a/node/node.go +++ b/node/node.go @@ -28,34 +28,34 @@ func trigger(c chan bool) { } } -// Remove mountPrefix from the beginning of the path, but keep the -// leading slash. +// Converts an etcd key (a path) to the Icecast mount path. Removes +// mountPrefix from the beginning of the path, keeping the leading +// slash. func keyToMount(key string) string { return key[len(autoradio.MountPrefix)-1:] } // In-memory representation of the overall configuration (basically // just a list of the known mounts). -type ClusterConfig struct { +type clusterConfig struct { mounts map[string]*autoradio.Mount lock sync.Mutex } -func NewClusterConfig() *ClusterConfig { - return &ClusterConfig{ +func newClusterConfig() *clusterConfig { + return &clusterConfig{ mounts: make(map[string]*autoradio.Mount), } } // TODO: remove? -func (c *ClusterConfig) GetMount(name string) *autoradio.Mount { +func (c *clusterConfig) GetMount(name string) *autoradio.Mount { c.lock.Lock() defer c.lock.Unlock() return c.mounts[name] } -// TODO: remove? -func (c *ClusterConfig) ListMounts() []*autoradio.Mount { +func (c *clusterConfig) ListMounts() []*autoradio.Mount { c.lock.Lock() defer c.lock.Unlock() result := make([]*autoradio.Mount, 0, len(c.mounts)) @@ -66,32 +66,31 @@ func (c *ClusterConfig) ListMounts() []*autoradio.Mount { } // Update a mount (in-memory only). -func (c *ClusterConfig) setMount(m *autoradio.Mount) { +func (c *clusterConfig) setMount(m *autoradio.Mount) { c.lock.Lock() defer c.lock.Unlock() c.mounts[m.Name] = m } // Delete a mount (in-memory only). -func (c *ClusterConfig) delMount(name string) { +func (c *clusterConfig) delMount(name string) { c.lock.Lock() defer c.lock.Unlock() delete(c.mounts, name) } -// Keeps the in-memory service configuration in sync with the -// distributed database. An update channel is triggered whenever the -// data changes. -type ConfigSyncer struct { +// Keeps the in-memory service configuration in sync with the etcd +// database. An update channel is triggered whenever the data changes. +type configWatcher struct { client *etcd.Client - config *ClusterConfig + config *clusterConfig upch chan bool stop chan bool index uint64 } -func NewConfigSyncer(client *etcd.Client, config *ClusterConfig, upch chan bool, stop chan bool) *ConfigSyncer { - return &ConfigSyncer{ +func newConfigSyncer(client *etcd.Client, config *clusterConfig, upch chan bool, stop chan bool) *configWatcher { + return &configWatcher{ client: client, config: config, upch: upch, @@ -99,12 +98,12 @@ func NewConfigSyncer(client *etcd.Client, config *ClusterConfig, upch chan bool, } } -func (w *ConfigSyncer) setIndex(index uint64) { +func (w *configWatcher) setIndex(index uint64) { w.index = index configIndex.Set(int64(index)) } -func (w *ConfigSyncer) updateConfigWithResponse(index uint64, key, value string) { +func (w *configWatcher) updateConfigWithResponse(index uint64, key, value string) { mountName := keyToMount(key) log.Printf("updating mount %s [@%d]: %s", mountName, index, value) var m autoradio.Mount @@ -115,7 +114,7 @@ func (w *ConfigSyncer) updateConfigWithResponse(index uint64, key, value string) } } -func (w *ConfigSyncer) syncer(ch chan *etcd.Response) { +func (w *configWatcher) watcher(ch chan *etcd.Response) { for { select { case response, ok := <-ch: @@ -150,7 +149,7 @@ func (w *ConfigSyncer) syncer(ch chan *etcd.Response) { } // Load full configuration from etcd. This will trigger the update channel. -func (w *ConfigSyncer) loadFullConfig() { +func (w *configWatcher) loadFullConfig() { for { response, err := w.client.Get(autoradio.MountPrefix, false, false) if err == nil && response.Node != nil && response.Node.Dir { @@ -176,10 +175,10 @@ func (w *ConfigSyncer) loadFullConfig() { trigger(w.upch) } -// Start the ConfigSyncer in the background. It will wait for +// Start the configWatcher in the background. It will wait for // initialization to complete, so that when this function returns, the // in-memory configuration has already been fully synchronized. -func (w *ConfigSyncer) Start() { +func (w *configWatcher) Start() { // Run until the first successful Get(). log.Printf("retrieving initial config") w.loadFullConfig() @@ -190,7 +189,7 @@ func (w *ConfigSyncer) Start() { go func() { for { ch := make(chan *etcd.Response) - go w.syncer(ch) + go w.watcher(ch) curIndex := w.index + 1 log.Printf("starting watcher at index %d", curIndex) @@ -216,12 +215,12 @@ func (w *ConfigSyncer) Start() { // An active streaming node, managing the local icecast server. type RadioNode struct { - Config *ClusterConfig + Config *clusterConfig ip string client *etcd.Client me *masterelection.MasterElection - watcher *ConfigSyncer + watcher *configWatcher icecast *IcecastController bw *bwmonitor.BandwidthUsageMonitor livenessTtl uint64 @@ -230,7 +229,7 @@ type RadioNode struct { } func NewRadioNode(ip, netDev string, bwLimit float64, client *etcd.Client) *RadioNode { - config := NewClusterConfig() + config := newClusterConfig() // Network updates trigger icecast reconfiguration. This // channel is used as an 'event', no more than one entry will @@ -259,7 +258,7 @@ func NewRadioNode(ip, netDev string, bwLimit float64, client *etcd.Client) *Radi 5, mech, stopch), - watcher: NewConfigSyncer(client, config, upch, stopch), + watcher: newConfigSyncer(client, config, upch, stopch), icecast: NewIcecastController(ip, stopch), livenessTtl: 2, bw: bwmonitor.NewBandwidthUsageMonitor(netDev, bwLimit), -- GitLab