Commit dc5e2c0b authored by ale's avatar ale

minor fixes for readability

parent aa7a61cf
......@@ -240,18 +240,6 @@ func (r *RadioAPI) GetNodes() ([]*NodeStatus, error) {
return r.activeNodesCache.Get(r.doGetNodes)
}
func (r *RadioAPI) GetNodeIPs() ([]string, error) {
nodes, err := r.GetNodes()
if err != nil {
return nil, err
}
ips := make([]string, 0, len(nodes))
for _, n := range nodes {
ips = append(ips, n.IP)
}
return ips, nil
}
// GeneratePassword returns a new random password.
func GeneratePassword() string {
b := make([]byte, 6)
......
......@@ -130,6 +130,15 @@ func (d *DnsRedirector) getQuestionName(req *dns.Msg) string {
return strings.ToLower(strings.Join(ql, "."))
}
// Flatten IPs from the list of nodes.
func flattenIPs(nodes []*autoradio.NodeStatus) []string {
var ips []string
for _, n := range nodes {
ips = append(ips, n.IP)
}
return ips
}
func (d *DnsRedirector) serveDNS(w dns.ResponseWriter, req *dns.Msg) {
m := new(dns.Msg)
......@@ -160,10 +169,15 @@ func (d *DnsRedirector) serveDNS(w dns.ResponseWriter, req *dns.Msg) {
break
}
// Serve all active nodes on every request.
ips, _ := d.client.GetNodeIPs()
if ips == nil || len(ips) == 0 {
// Serve all active nodes on every request. We don't
// really care about errors from GetNodes as long as
// some nodes are returned (i.e. stale data from the
// cache is accepted).
var ips []string
nodes, _ := d.client.GetNodes()
if len(nodes) > 0 {
ips = flattenIPs(nodes)
} else {
// In case of errors retrieving the list of
// active nodes, fall back to serving our
// public IP (just to avoid returning an empty
......
......@@ -74,7 +74,7 @@ func (ic *IcecastController) reload() error {
}
// Kill sources connected to local streams.
func (ic *IcecastController) killSources(conf *ClusterConfig) error {
func (ic *IcecastController) killSources(conf *clusterConfig) error {
var anyErr error
client := &http.Client{}
for _, m := range conf.ListMounts() {
......@@ -98,7 +98,7 @@ func (ic *IcecastController) killSources(conf *ClusterConfig) error {
}
// Update reloads the Icecast daemon with a new configuration.
func (ic *IcecastController) Update(conf *ClusterConfig, isMaster bool, masterAddr string) error {
func (ic *IcecastController) Update(conf *clusterConfig, isMaster bool, masterAddr string) error {
if !isMaster && masterAddr == "" {
return errors.New("unknown system state")
}
......
......@@ -15,7 +15,11 @@ import (
)
var (
//shoutHttpPort = 8001
// The per-node icecast client limit is set to a very high
// value in order to disable the enforcement at the icecast
// level (if everything goes well, the front-end traffic
// management code should know better).
// TODO: make it a flag anyway.
maxClients = 10000
icecastAdminPwFile = "/etc/icecast/.admin_pw"
......@@ -105,8 +109,9 @@ type iceMountConfig struct {
OnDisconnect string `xml:"on-disconnect,omitempty"`
}
// Configuration of the local Icecast daemon (meant for serialization
// to XML).
// Configuration of the local Icecast daemon. This is a write-only
// object, meant for serialization to XML. We keep around a single
// copy of it and just update Relays and Mounts every time.
type icecastConfig struct {
XMLName xml.Name
Limits iceLimitsConfig `xml:"limits"`
......@@ -128,18 +133,17 @@ type icecastConfig struct {
// - It binds to the IcecastPort (defined in api.go) on all
// interfaces.
//
// - Local administration is practically disabled. A random admin
// password is created every time the daemon starts. Same goes for the
// global source password.
// - A random admin password is generated once on each node, and saved
// to a file for persistence. It is not really meant to be used by the
// operator.
//
// Some of the parameters should probably be command-line flags, so
// that it is possible to set them on a per-host basis.
// TODO: Some of the parameters should probably be command-line flags,
// so that it is possible to set them on a per-host basis.
//
func defaultDebianConfig(publicIp string) *icecastConfig {
// Set the icecast admin password once, on the first run, and
// save it on the filesystem. We don't use the global source
// password, but icecast is happier if it's set, so we just
// use a random password every time.
func newIcecastConfig(publicIp string) *icecastConfig {
// We don't use the global source password, but icecast is
// happier if it's set, so we just use a random password every
// time.
sourcePw := autoradio.GeneratePassword()
adminPw := getIcecastAdminPassword()
......@@ -178,15 +182,10 @@ func defaultDebianConfig(publicIp string) *icecastConfig {
Security: iceSecurityConfig{0},
Listen: []iceListenConfig{
{"0.0.0.0", autoradio.IcecastPort, 0},
//{"0.0.0.0", shoutHttpPort, 1},
},
}
}
func newIcecastConfig(publicIp string) *icecastConfig {
return defaultDebianConfig(publicIp)
}
// Encode the configuration to XML.
func (c *icecastConfig) Encode() ([]byte, error) {
var buf bytes.Buffer
......@@ -218,7 +217,7 @@ func (c *icecastConfig) EncodeToFile(path string) error {
return err
}
func mountToConfig(m *autoradio.Mount) iceMountConfig {
func masterMountToIcecastConfig(m *autoradio.Mount) iceMountConfig {
mconfig := iceMountConfig{
Name: autoradio.MountNameToIcecastPath(m.Name),
Username: m.Username,
......@@ -234,7 +233,7 @@ func mountToConfig(m *autoradio.Mount) iceMountConfig {
return mconfig
}
func relayToConfig(m *autoradio.Mount) (iceRelayConfig, bool) {
func relayToIcecastConfig(m *autoradio.Mount) (iceRelayConfig, bool) {
u, err := url.Parse(m.RelayUrl)
if err != nil {
// A failure here is almost invisible and not very
......@@ -267,10 +266,11 @@ func relayToConfig(m *autoradio.Mount) (iceRelayConfig, bool) {
return rc, true
}
func mountToRelayConfig(masterAddr string, m *autoradio.Mount) iceRelayConfig {
func slaveMountToIcecastConfig(masterAddr string, m *autoradio.Mount) iceRelayConfig {
path := autoradio.MountNameToIcecastPath(m.Name)
return iceRelayConfig{
Mount: autoradio.MountNameToIcecastPath(m.Name),
LocalMount: autoradio.MountNameToIcecastPath(m.Name),
Mount: path,
LocalMount: path,
Server: masterAddr,
Port: autoradio.IcecastPort,
Username: m.Username,
......@@ -283,29 +283,23 @@ func mountToRelayConfig(masterAddr string, m *autoradio.Mount) iceRelayConfig {
// Update the configuration with the current list of mounts and
// masterelection state. This will clear the Mounts and Relays fields
// and set them to new values.
func (ic *icecastConfig) Update(config *ClusterConfig, isMaster bool, masterAddr string) {
ic.Mounts = nil
ic.Relays = nil
mounts := make([]iceMountConfig, 0)
relays := make([]iceRelayConfig, 0)
func (ic *icecastConfig) Update(config *clusterConfig, isMaster bool, masterAddr string) {
var mounts []iceMountConfig
var relays []iceRelayConfig
for _, m := range config.ListMounts() {
switch {
case m.IsRelay():
if rc, ok := relayToConfig(m); ok {
if rc, ok := relayToIcecastConfig(m); ok {
relays = append(relays, rc)
}
case isMaster:
mounts = append(mounts, mountToConfig(m))
mounts = append(mounts, masterMountToIcecastConfig(m))
default:
relays = append(relays, mountToRelayConfig(masterAddr, m))
relays = append(relays, slaveMountToIcecastConfig(masterAddr, m))
}
}
if len(mounts) > 0 {
ic.Mounts = mounts
}
if len(relays) > 0 {
ic.Relays = relays
}
ic.Mounts = mounts
ic.Relays = relays
}
......@@ -13,7 +13,7 @@ func TestIcecastConfig(t *testing.T) {
Username: "user",
Password: "pass",
}
c := NewClusterConfig()
c := newClusterConfig()
c.setMount(mount)
// Test a relay config.
......
......@@ -28,34 +28,34 @@ func trigger(c chan bool) {
}
}
// Remove mountPrefix from the beginning of the path, but keep the
// leading slash.
// Converts an etcd key (a path) to the Icecast mount path. Removes
// mountPrefix from the beginning of the path, keeping the leading
// slash.
func keyToMount(key string) string {
return key[len(autoradio.MountPrefix)-1:]
}
// In-memory representation of the overall configuration (basically
// just a list of the known mounts).
type ClusterConfig struct {
type clusterConfig struct {
mounts map[string]*autoradio.Mount
lock sync.Mutex
}
func NewClusterConfig() *ClusterConfig {
return &ClusterConfig{
func newClusterConfig() *clusterConfig {
return &clusterConfig{
mounts: make(map[string]*autoradio.Mount),
}
}
// TODO: remove?
func (c *ClusterConfig) GetMount(name string) *autoradio.Mount {
func (c *clusterConfig) GetMount(name string) *autoradio.Mount {
c.lock.Lock()
defer c.lock.Unlock()
return c.mounts[name]
}
// TODO: remove?
func (c *ClusterConfig) ListMounts() []*autoradio.Mount {
func (c *clusterConfig) ListMounts() []*autoradio.Mount {
c.lock.Lock()
defer c.lock.Unlock()
result := make([]*autoradio.Mount, 0, len(c.mounts))
......@@ -66,32 +66,31 @@ func (c *ClusterConfig) ListMounts() []*autoradio.Mount {
}
// Update a mount (in-memory only).
func (c *ClusterConfig) setMount(m *autoradio.Mount) {
func (c *clusterConfig) setMount(m *autoradio.Mount) {
c.lock.Lock()
defer c.lock.Unlock()
c.mounts[m.Name] = m
}
// Delete a mount (in-memory only).
func (c *ClusterConfig) delMount(name string) {
func (c *clusterConfig) delMount(name string) {
c.lock.Lock()
defer c.lock.Unlock()
delete(c.mounts, name)
}
// Keeps the in-memory service configuration in sync with the
// distributed database. An update channel is triggered whenever the
// data changes.
type ConfigSyncer struct {
// Keeps the in-memory service configuration in sync with the etcd
// database. An update channel is triggered whenever the data changes.
type configWatcher struct {
client *etcd.Client
config *ClusterConfig
config *clusterConfig
upch chan bool
stop chan bool
index uint64
}
func NewConfigSyncer(client *etcd.Client, config *ClusterConfig, upch chan bool, stop chan bool) *ConfigSyncer {
return &ConfigSyncer{
func newConfigSyncer(client *etcd.Client, config *clusterConfig, upch chan bool, stop chan bool) *configWatcher {
return &configWatcher{
client: client,
config: config,
upch: upch,
......@@ -99,12 +98,12 @@ func NewConfigSyncer(client *etcd.Client, config *ClusterConfig, upch chan bool,
}
}
func (w *ConfigSyncer) setIndex(index uint64) {
func (w *configWatcher) setIndex(index uint64) {
w.index = index
configIndex.Set(int64(index))
}
func (w *ConfigSyncer) updateConfigWithResponse(index uint64, key, value string) {
func (w *configWatcher) updateConfigWithResponse(index uint64, key, value string) {
mountName := keyToMount(key)
log.Printf("updating mount %s [@%d]: %s", mountName, index, value)
var m autoradio.Mount
......@@ -115,7 +114,7 @@ func (w *ConfigSyncer) updateConfigWithResponse(index uint64, key, value string)
}
}
func (w *ConfigSyncer) syncer(ch chan *etcd.Response) {
func (w *configWatcher) watcher(ch chan *etcd.Response) {
for {
select {
case response, ok := <-ch:
......@@ -150,7 +149,7 @@ func (w *ConfigSyncer) syncer(ch chan *etcd.Response) {
}
// Load full configuration from etcd. This will trigger the update channel.
func (w *ConfigSyncer) loadFullConfig() {
func (w *configWatcher) loadFullConfig() {
for {
response, err := w.client.Get(autoradio.MountPrefix, false, false)
if err == nil && response.Node != nil && response.Node.Dir {
......@@ -176,10 +175,10 @@ func (w *ConfigSyncer) loadFullConfig() {
trigger(w.upch)
}
// Start the ConfigSyncer in the background. It will wait for
// Start the configWatcher in the background. It will wait for
// initialization to complete, so that when this function returns, the
// in-memory configuration has already been fully synchronized.
func (w *ConfigSyncer) Start() {
func (w *configWatcher) Start() {
// Run until the first successful Get().
log.Printf("retrieving initial config")
w.loadFullConfig()
......@@ -190,7 +189,7 @@ func (w *ConfigSyncer) Start() {
go func() {
for {
ch := make(chan *etcd.Response)
go w.syncer(ch)
go w.watcher(ch)
curIndex := w.index + 1
log.Printf("starting watcher at index %d", curIndex)
......@@ -216,12 +215,12 @@ func (w *ConfigSyncer) Start() {
// An active streaming node, managing the local icecast server.
type RadioNode struct {
Config *ClusterConfig
Config *clusterConfig
ip string
client *etcd.Client
me *masterelection.MasterElection
watcher *ConfigSyncer
watcher *configWatcher
icecast *IcecastController
bw *bwmonitor.BandwidthUsageMonitor
livenessTtl uint64
......@@ -230,7 +229,7 @@ type RadioNode struct {
}
func NewRadioNode(ip, netDev string, bwLimit float64, client *etcd.Client) *RadioNode {
config := NewClusterConfig()
config := newClusterConfig()
// Network updates trigger icecast reconfiguration. This
// channel is used as an 'event', no more than one entry will
......@@ -259,7 +258,7 @@ func NewRadioNode(ip, netDev string, bwLimit float64, client *etcd.Client) *Radi
5,
mech,
stopch),
watcher: NewConfigSyncer(client, config, upch, stopch),
watcher: newConfigSyncer(client, config, upch, stopch),
icecast: NewIcecastController(ip, stopch),
livenessTtl: 2,
bw: bwmonitor.NewBandwidthUsageMonitor(netDev, bwLimit),
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment