From bc86eb31923da36d2d7ff69fe2234783204487fa Mon Sep 17 00:00:00 2001
From: ale <ale@incal.net>
Date: Sat, 27 Dec 2014 16:47:06 +0000
Subject: [PATCH] refactor RadioNode for robustness

Increase robustness of the config watcher by reloading the full
configuration on Etcd errors (fixes an issue with the very common error
401).

Make sure that the Icecast config reload is only triggered if the
configuration has actually changed.

Added a way to stop the node properly, which ensures that we are
tracking all the resources and that there are no deadlocks in the
internal communications.

MasterElection now closes the update channel when it's done.
---
 api.go                                |   4 +
 masterelection/masterelection.go      |  61 ++---
 masterelection/masterelection_test.go |   5 +-
 node/icecast.go                       |  13 +-
 node/icecast_config_test.go           |   2 +-
 node/icecast_test.go                  |   2 +-
 node/node.go                          | 332 +++++++++++++++-----------
 node/node_test.go                     |  68 ++++--
 8 files changed, 287 insertions(+), 200 deletions(-)

diff --git a/api.go b/api.go
index 2d46f946..30fd0f1f 100644
--- a/api.go
+++ b/api.go
@@ -45,6 +45,10 @@ type Mount struct {
 	Fallback string
 }
 
+func (m *Mount) Equal(other *Mount) bool {
+	return *m == *other
+}
+
 func (m *Mount) IsRelay() bool {
 	return m.RelayUrl != ""
 }
diff --git a/masterelection/masterelection.go b/masterelection/masterelection.go
index 5eabdb2b..b8da1825 100644
--- a/masterelection/masterelection.go
+++ b/masterelection/masterelection.go
@@ -2,6 +2,7 @@ package masterelection
 
 import (
 	"log"
+	"os"
 	"sync"
 	"time"
 
@@ -38,32 +39,34 @@ func (s State) Equal(other State) bool {
 
 type MasterElection struct {
 	client autoradio.EtcdClient
-	stop   chan bool
 
 	Data string
 	Path string
 	TTL  uint64
 
-	LogPrefix string
+	Log *log.Logger
 
 	stateLock sync.Mutex
 	stateCh   chan State
 	state     State
 }
 
-func NewMasterElection(client autoradio.EtcdClient, path, data string, ttl uint64, sch chan State, stop chan bool) *MasterElection {
+// NewMasterElection creates a new MasterElection object that will
+// establish a lock on 'path'. It will send state transitions to
+// 'sch', if provided. If 'sch' is not nil, it will be closed when
+// Run() terminates.
+func NewMasterElection(client autoradio.EtcdClient, path, data string, ttl uint64, sch chan State) *MasterElection {
 	if ttl < 1 {
 		ttl = 1
 	}
 	return &MasterElection{
-		client:    client,
-		Path:      path,
-		Data:      data,
-		TTL:       ttl,
-		state:     State{Role: ROLE_UNKNOWN},
-		stateCh:   sch,
-		stop:      stop,
-		LogPrefix: "masterelection",
+		client:  client,
+		Path:    path,
+		Data:    data,
+		TTL:     ttl,
+		state:   State{Role: ROLE_UNKNOWN},
+		stateCh: sch,
+		Log:     log.New(os.Stderr, "masterelection: ", 0),
 	}
 }
 
@@ -93,7 +96,7 @@ func (m *MasterElection) setState(role Role, masterData string) {
 	var changed bool
 	m.stateLock.Lock()
 	if changed = !m.state.Equal(state); changed {
-		log.Printf("%s: %s (%s) -> %s (%s)", m.LogPrefix, m.state.Role.String(), m.state.MasterData, role.String(), masterData)
+		m.Log.Printf("%s (%s) -> %s (%s)", m.state.Role.String(), m.state.MasterData, role.String(), masterData)
 		m.state = state
 	}
 	m.stateLock.Unlock()
@@ -102,7 +105,7 @@ func (m *MasterElection) setState(role Role, masterData string) {
 	}
 }
 
-func (m *MasterElection) runMaster(index uint64) {
+func (m *MasterElection) runMaster(index uint64, stop chan bool) {
 	m.setState(ROLE_MASTER, m.Data)
 
 	// If we renew the lease every TTL / N, we allow N renewal
@@ -122,28 +125,28 @@ func (m *MasterElection) runMaster(index uint64) {
 			// will be updated (and the lock renewed).
 			resp, err := m.client.CompareAndSwap(m.Path, m.Data, m.TTL, m.Data, index)
 			if err != nil {
-				log.Printf("%s: error updating lock: %s", m.LogPrefix, err)
+				m.Log.Printf("error updating lock: %s", err)
 
 				// If we can't renew the lock for a
 				// TTL, we must assume we lost it.
 				if t.Sub(lastUpdate) > ttl {
-					log.Printf("%s: too many errors, lost lock", m.LogPrefix)
+					m.Log.Println("too many errors, lost lock")
 					return
 				}
 			}
 			index = resp.EtcdIndex
 			lastUpdate = t
-		case <-m.stop:
+		case <-stop:
 			// Facilitate a master re-election by dropping
 			// the lock rather than letting it expire.
-			log.Printf("%s: releasing masterelection lock", m.LogPrefix)
+			m.Log.Println("releasing masterelection lock")
 			m.client.Delete(m.Path, false)
 			return
 		}
 	}
 }
 
-func (m *MasterElection) runSlave(index uint64) {
+func (m *MasterElection) runSlave(index uint64, stop chan bool) {
 	// It would be best if we could simply retrieve the master
 	// lock file at the specified index. But we can only Get from
 	// HEAD, so in case of a quick master transition we might
@@ -158,10 +161,10 @@ func (m *MasterElection) runSlave(index uint64) {
 	}
 
 	for {
-		resp, err = m.client.Watch(m.Path, index+1, false, nil, m.stop)
+		resp, err = m.client.Watch(m.Path, index+1, false, nil, stop)
 		if err != nil {
 			if err != etcd.ErrWatchStoppedByUser {
-				log.Printf("%s: slave Watch() error: %v", m.LogPrefix, err)
+				m.Log.Printf("slave Watch() error: %v", err)
 			}
 			return
 		}
@@ -169,7 +172,7 @@ func (m *MasterElection) runSlave(index uint64) {
 		// If the lock has expired, or it has been removed,
 		// try to acquire it again.
 		if resp.Action == "delete" || resp.Action == "expire" {
-			log.Printf("%s: lock is gone", m.LogPrefix)
+			m.Log.Println("lock is gone")
 			return
 		}
 
@@ -178,11 +181,15 @@ func (m *MasterElection) runSlave(index uint64) {
 	}
 }
 
-func (m *MasterElection) Run() {
+func (m *MasterElection) Run(stop chan bool) {
+	if m.stateCh != nil {
+		defer close(m.stateCh)
+	}
+
 	for {
 		// Quick non-blocking check for the stop channel.
 		select {
-		case <-m.stop:
+		case <-stop:
 			return
 		default:
 		}
@@ -196,16 +203,14 @@ func (m *MasterElection) Run() {
 		if err == nil {
 			// Howdy, we're the master now. Wait a while
 			// and renew our TTL.
-			log.Printf("%s: we are the master", m.LogPrefix)
-			m.runMaster(response.EtcdIndex)
+			m.runMaster(response.EtcdIndex, stop)
 		} else if etcdErr, ok := err.(*etcd.EtcdError); ok {
 			// We're not the master. Wait until the lock
 			// is deleted or expires.
-			log.Printf("%s: running as slave (%v)", m.LogPrefix, etcdErr)
-			m.runSlave(etcdErr.Index)
+			m.runSlave(etcdErr.Index, stop)
 		} else {
 			// An error of some other sort! Retry.
-			log.Printf("%s: unexpected error: %v", m.LogPrefix, err)
+			m.Log.Printf("unexpected error: %v", err)
 		}
 
 	}
diff --git a/masterelection/masterelection_test.go b/masterelection/masterelection_test.go
index ae495ca3..a98247f4 100644
--- a/masterelection/masterelection_test.go
+++ b/masterelection/masterelection_test.go
@@ -48,10 +48,9 @@ func TestMasterElection(t *testing.T) {
 			lockPath,
 			fmt.Sprintf("%d", i),
 			1,
-			nil,
-			stopCh)
+			nil)
 		m.LogPrefix = fmt.Sprintf("node%d: masterelection", i+1)
-		go m.Run()
+		go m.Run(stopCh)
 		nodes = append(nodes, m)
 		stop = append(stop, stopCh)
 	}
diff --git a/node/icecast.go b/node/icecast.go
index 93248e79..f123d3bc 100644
--- a/node/icecast.go
+++ b/node/icecast.go
@@ -28,7 +28,7 @@ var (
 type Controller interface {
 	Update(*clusterConfig, bool, net.IP) error
 	GetStatus() *IcecastStatus
-	Run()
+	Run(chan bool)
 }
 
 // Icecast returns empty fields in our status handler, which we'll
@@ -63,11 +63,10 @@ type icecastController struct {
 	stop   chan bool
 }
 
-func NewIcecastController(publicIp string, stop chan bool) *icecastController {
+func NewIcecastController(publicIp string) *icecastController {
 	return &icecastController{
 		config: newIcecastConfig(publicIp),
 		status: &IcecastStatus{},
-		stop:   make(chan bool, 1),
 	}
 }
 
@@ -138,7 +137,7 @@ func (ic *icecastController) GetStatus() *IcecastStatus {
 	return ic.status
 }
 
-func (ic *icecastController) statusUpdater() {
+func (ic *icecastController) Run(stop chan bool) {
 	t := time.NewTicker(3 * time.Second)
 	downStatus := &IcecastStatus{}
 	for {
@@ -152,7 +151,7 @@ func (ic *icecastController) statusUpdater() {
 				ic.status = downStatus
 				icecastOk.Set(0)
 			}
-		case <-ic.stop:
+		case <-stop:
 			return
 		}
 	}
@@ -206,7 +205,3 @@ func (ic *icecastController) parseStatusPage(input io.Reader) (*IcecastStatus, e
 
 	return &status, nil
 }
-
-func (ic *icecastController) Run() {
-	ic.statusUpdater()
-}
diff --git a/node/icecast_config_test.go b/node/icecast_config_test.go
index 23ac6398..ede5529c 100644
--- a/node/icecast_config_test.go
+++ b/node/icecast_config_test.go
@@ -14,7 +14,7 @@ func TestIcecastConfig(t *testing.T) {
 		Password: "pass",
 	}
 	c := newClusterConfig()
-	c.setMount(mount)
+	c.setMountIfChanged(mount)
 
 	// Test a relay config.
 	ice := newIcecastConfig("1.2.3.4")
diff --git a/node/icecast_test.go b/node/icecast_test.go
index 2e496940..3bdc3c62 100644
--- a/node/icecast_test.go
+++ b/node/icecast_test.go
@@ -9,7 +9,7 @@ func TestIcecast_TestParseStatusPage(t *testing.T) {
 	xml := `<?xml version="1.0"?>
 <status><mount name="/test.ogg"><listeners>3</listeners><bitrate/><quality/><video-quality/><frame-size/><frame-rate/></mount></status>`
 
-	ic := NewIcecastController("1.2.3.4", make(chan bool))
+	ic := NewIcecastController("1.2.3.4")
 	result, err := ic.parseStatusPage(strings.NewReader(xml))
 	if err != nil {
 		t.Fatal(err)
diff --git a/node/node.go b/node/node.go
index c9c2bb8f..21fbcb02 100644
--- a/node/node.go
+++ b/node/node.go
@@ -5,6 +5,7 @@ import (
 	"encoding/json"
 	"log"
 	"net"
+	"os"
 	"strings"
 	"sync"
 	"time"
@@ -66,11 +67,16 @@ func (c *clusterConfig) ListMounts() []*autoradio.Mount {
 	return result
 }
 
-// Update a mount (in-memory only).
-func (c *clusterConfig) setMount(m *autoradio.Mount) {
+// Update a mount (in-memory only). Returns true if the new value is
+// different than the old one.
+func (c *clusterConfig) setMountIfChanged(m *autoradio.Mount) bool {
 	c.lock.Lock()
 	defer c.lock.Unlock()
+	if prev, ok := c.mounts[m.Name]; ok && prev.Equal(m) {
+		return false
+	}
 	c.mounts[m.Name] = m
+	return true
 }
 
 // Delete a mount (in-memory only).
@@ -86,152 +92,163 @@ type configWatcher struct {
 	client autoradio.EtcdClient
 	config *clusterConfig
 	upch   chan bool
-	stop   chan bool
-	index  uint64
 }
 
-func newConfigSyncer(client autoradio.EtcdClient, config *clusterConfig, upch chan bool, stop chan bool) *configWatcher {
+func newConfigSyncer(client autoradio.EtcdClient, config *clusterConfig, upch chan bool) *configWatcher {
 	return &configWatcher{
 		client: client,
 		config: config,
 		upch:   upch,
-		stop:   stop,
 	}
 }
 
-func (w *configWatcher) setIndex(index uint64) {
-	w.index = index
-	configIndex.Set(int64(index))
-}
-
-func (w *configWatcher) updateConfigWithResponse(index uint64, key, value string) {
+// Returns true if the mount configuration has been modified.
+func (w *configWatcher) updateMount(key, value string, index uint64) bool {
 	mountName := keyToMount(key)
-	log.Printf("updating mount %s [@%d]: %s", mountName, index, value)
 	var m autoradio.Mount
 	if err := json.NewDecoder(strings.NewReader(value)).Decode(&m); err != nil {
-		log.Printf("corrupted data: %s: %s", value, err)
+		log.Printf("error updating mount %s [@%d]: corrupted data: %v", mountName, index, err)
 	} else {
-		w.config.setMount(&m)
+		if w.config.setMountIfChanged(&m) {
+			log.Printf("updated mount %s [@%d]: %s", keyToMount(key), index, value)
+			return true
+		}
 	}
+	return false
 }
 
-func (w *configWatcher) watcher(ch chan *etcd.Response) {
+func (w *configWatcher) watcher(index uint64, stop chan bool) {
 	for {
-		select {
-		case response, ok := <-ch:
-			if !ok {
+		configIndex.Set(int64(index))
+		resp, err := w.client.Watch(autoradio.MountPrefix, index+1, true, nil, stop)
+		if err != nil {
+			if err == etcd.ErrWatchStoppedByUser {
 				return
 			}
-
-			// Update the 'last seen' index, so that if
-			// the Watcher dies, it knows where to start
-			// from and we do not have to download the
-			// full configuration again. Do this even for
-			// actions we don't care about, just in case.
-			w.setIndex(response.EtcdIndex)
-
-			if response.Action == "delete" {
-				mountName := keyToMount(response.Node.Key)
-				log.Printf("deleted mount %s", mountName)
-				w.config.delMount(mountName)
-			} else if response.Action == "set" || response.Action == "create" || response.Action == "update" {
-				w.updateConfigWithResponse(response.EtcdIndex, response.Node.Key, response.Node.Value)
+			if etcdErr, ok := err.(*etcd.EtcdError); ok {
+				// If the index passed to Watch falls
+				// behind the current log position by
+				// more than 1000 entries (which is
+				// bound to happen regularly in
+				// presence of a sustained write
+				// rate), etcd will return an error
+				// 401 ("index out of date"). The safe
+				// thing to do is just to run a full
+				// load again, and resume watching
+				// from the current EtcdIndex.
+				if etcdErr.ErrorCode == 401 {
+					index = w.load(stop)
+					continue
+				}
+				// Log other etcd errors.
+				log.Printf("config Watch(): %v", err)
+				// Follow the etcd index even in case
+				// of errors.
+				index = etcdErr.Index
 			} else {
-				continue
+				// Generic error, just retry the call.
+				log.Printf("config Watch(): %v", err)
 			}
+			time.Sleep(200 * time.Millisecond)
+			continue
+		}
 
-			// Trigger an update.
-			trigger(w.upch)
-
-		case <-w.stop:
-			return
+		index = resp.EtcdIndex
+
+		switch resp.Action {
+		case "delete":
+			mountName := keyToMount(resp.Node.Key)
+			log.Printf("deleted mount %s", mountName)
+			w.config.delMount(mountName)
+		case "set", "create", "update":
+			if !w.updateMount(resp.Node.Key, resp.Node.Value, index) {
+				// Do not trigger an update if the
+				// update had no effect.
+				continue
+			}
+		default:
+			continue
 		}
+
+		// Trigger an update.
+		trigger(w.upch)
 	}
 }
 
-// Load full configuration from etcd. This will trigger the update channel.
-func (w *configWatcher) loadFullConfig() {
+// Load full configuration from etcd. It will wait until the
+// configuration can be read successfully, and it will trigger the
+// update channel after it's done.
+func (w *configWatcher) load(stop chan bool) uint64 {
+	var err error
+	var resp *etcd.Response
 	for {
-		response, err := w.client.Get(autoradio.MountPrefix, false, false)
-		if err == nil && response.Node != nil && response.Node.Dir {
-			// Directly update the configuration.
-			for _, n := range response.Node.Nodes {
-				w.updateConfigWithResponse(response.EtcdIndex, n.Key, n.Value)
-			}
-			log.Printf("got configuration at index %d", response.EtcdIndex)
-			w.setIndex(response.EtcdIndex)
+		if resp, err = w.client.Get(autoradio.MountPrefix, false, false); err != nil {
+			log.Printf("config load error: %v", err)
+		} else if resp.Node == nil || !resp.Node.Dir {
+			log.Printf("config load error: no configuration found")
+		} else {
 			break
 		}
-		log.Printf("Get error: %s", err)
 
 		// Wait 1 second, but watch the stop channel.
 		select {
 		case <-time.After(1 * time.Second):
-		case <-w.stop:
-			return
+		case <-stop:
+			return 0
+		}
+	}
+
+	index := resp.EtcdIndex
+	changed := false
+	for _, n := range resp.Node.Nodes {
+		if w.updateMount(n.Key, n.Value, index) {
+			changed = true
 		}
 	}
 
-	// Update the icecast daemon now that we have a full config.
-	trigger(w.upch)
+	if changed {
+		// Update the icecast daemon now that we have a full config.
+		trigger(w.upch)
+	}
+
+	return index
 }
 
 // Start the configWatcher in the background. It will wait for
 // initialization to complete, so that when this function returns, the
 // in-memory configuration has already been fully synchronized.
-func (w *configWatcher) Start() {
-	// Run until the first successful Get().
-	log.Printf("retrieving initial config")
-	w.loadFullConfig()
-
-	// Main watch loop. Remember that etcd.Watch() will close the
-	// receiver channel when it returns, so we need to start a new
-	// syncer every time.
-	go func() {
-		for {
-			ch := make(chan *etcd.Response)
-			go w.watcher(ch)
-
-			curIndex := w.index + 1
-			log.Printf("starting watcher at index %d", curIndex)
-			_, err := w.client.Watch(autoradio.MountPrefix, curIndex, true, ch, w.stop)
-			if err == etcd.ErrWatchStoppedByUser {
-				return
-			} else if err != nil {
-				// Log the error and start over.
-				log.Printf("Watch(): %s", err)
-
-				// If the error is code 401 ("index
-				// out of date"), start from the
-				// current index. TODO: consider
-				// calling loadFullConfig, or we might
-				// potentially lose some updates?
-				if etcdErr, ok := err.(*etcd.EtcdError); ok && etcdErr.ErrorCode == 401 {
-					w.setIndex(etcdErr.Index)
-				}
-			}
-		}
-	}()
+func (w *configWatcher) Start(stop chan bool) func(chan bool) {
+	log.Printf("loading configuration")
+	index := w.load(stop)
+	return func(stop chan bool) {
+		w.watcher(index, stop)
+	}
 }
 
 // An active streaming node, managing the local icecast server.
 type RadioNode struct {
-	client    autoradio.EtcdClient
-	config    *clusterConfig
-	name      string
-	ips       []net.IP
-	me        *masterelection.MasterElection
-	watcher   *configWatcher
-	icecast   Controller
-	bw        *bwmonitor.BandwidthUsageMonitor
-	heartbeat uint64
-	upch      chan bool
-	stop      chan bool
+	client      autoradio.EtcdClient
+	config      *clusterConfig
+	name        string
+	ips         []net.IP
+	me          *masterelection.MasterElection
+	watcher     *configWatcher
+	icecast     Controller
+	bw          *bwmonitor.BandwidthUsageMonitor
+	heartbeat   uint64
+	reloadDelay time.Duration
+	upch        chan bool
+	stop        chan bool
+	wg          sync.WaitGroup
+	Log         *log.Logger
 }
 
 func NewRadioNode(name string, ips []net.IP, netDev string, bwLimit float64, client autoradio.EtcdClient) *RadioNode {
 	config := newClusterConfig()
 
+	// Global 'stop' channel.
+	stopch := make(chan bool)
+
 	// Network updates trigger icecast reconfiguration. This
 	// channel is used as an event signal.
 	upch := make(chan bool, 1)
@@ -239,14 +256,11 @@ func NewRadioNode(name string, ips []net.IP, netDev string, bwLimit float64, cli
 	// MasterElection changes trigger an update.
 	mech := make(chan masterelection.State)
 	go func() {
-		for _ = range mech {
+		for range mech {
 			trigger(upch)
 		}
 	}()
 
-	// Global 'stop' channel.
-	stopch := make(chan bool)
-
 	// Location information advertised when this node is master.
 	minfo := &autoradio.MasterNodeInfo{
 		Name: name,
@@ -267,20 +281,21 @@ func NewRadioNode(name string, ips []net.IP, netDev string, bwLimit float64, cli
 			autoradio.MasterElectionPath,
 			string(minfodata),
 			5,
-			mech,
-			stopch),
-		watcher:   newConfigSyncer(client, config, upch, stopch),
-		icecast:   NewIcecastController(name, stopch),
-		heartbeat: 2,
-		bw:        bwmonitor.NewBandwidthUsageMonitor(netDev, bwLimit),
-		upch:      upch,
-		stop:      stopch,
+			mech),
+		watcher:     newConfigSyncer(client, config, upch),
+		icecast:     NewIcecastController(name),
+		reloadDelay: 1000 * time.Millisecond,
+		heartbeat:   2,
+		bw:          bwmonitor.NewBandwidthUsageMonitor(netDev, bwLimit),
+		upch:        upch,
+		stop:        stopch,
+		Log:         log.New(os.Stderr, "node: ", 0),
 	}
 }
 
 // The presence goroutine periodically updates our entry in the list
 // of nodes with the current node statistics.
-func (rc *RadioNode) presence() {
+func (rc *RadioNode) presence(stop chan bool) {
 	ticker := time.NewTicker(time.Duration(rc.heartbeat) * time.Second / 3)
 
 	// Register ourselves using the node name.
@@ -303,10 +318,10 @@ func (rc *RadioNode) presence() {
 			var buf bytes.Buffer
 			json.NewEncoder(&buf).Encode(&nodeStatus)
 			if _, err := rc.client.Set(key, buf.String(), rc.heartbeat); err != nil {
-				log.Printf("presence: Set(): %s", err)
+				rc.Log.Printf("presence: Set(): %s", err)
 			}
 
-		case <-rc.stop:
+		case <-stop:
 			return
 		}
 	}
@@ -323,51 +338,86 @@ func (rc *RadioNode) getMasterAddr() net.IP {
 	return info.IP[0]
 }
 
-// Run the node. This method does not return until someone calls Stop().
-func (rc *RadioNode) Run() {
-	// Bootstrap the config watcher. This ensures that we have a
-	// full configuration (thanks to the Get() call) before we
-	// start managing the icecast server.
-	rc.watcher.Start()
-
-	// Start the presence heartbeat.
-	go rc.presence()
-
-	// Start the masterelection runner.
-	go rc.me.Run()
-
-	// Wait an instant to give a chance to the other services to
-	// initialize.
+// Reload the icecast configuration when needed.
+func (rc *RadioNode) updater(stop chan bool) {
+	// Wait an instant to give a chance to the services to
+	// initialize properly.
 	time.Sleep(200 * time.Millisecond)
 
-	// Start the Icecast status checker.
-	go rc.icecast.Run()
-
-	log.Printf("starting icecast updater")
+	rc.Log.Printf("starting icecast updater")
 	for {
 		select {
 		case <-rc.upch:
 			icecastReloads.Incr()
-			log.Printf("reloading icecast config")
+			rc.Log.Printf("reloading icecast config")
 			if err := rc.icecast.Update(rc.config, rc.me.IsMaster(), rc.getMasterAddr()); err != nil {
 				icecastReloadErrors.Incr()
-				log.Printf("Update(): %s", err)
+				rc.Log.Printf("Update(): %s", err)
 			}
 
-			// Do not reload icecast more often than once
-			// every two seconds.
-			time.Sleep(2 * time.Second)
+			// Limit the rate of icecast reloads.
+			if rc.reloadDelay > 0 {
+				time.Sleep(rc.reloadDelay)
+			}
 
-		case <-rc.stop:
+		case <-stop:
 			return
 		}
 	}
 }
 
+// Start the node. Returns immediately.
+func (rc *RadioNode) Start() {
+	// Bootstrap the config watcher first. This ensures that we
+	// have a full configuration (thanks to the Get() call) before
+	// we start managing the icecast server.
+	watcherFn := rc.watcher.Start(rc.stop)
+
+	// Start auxiliary services and event listeners on their own
+	// goroutines. Put them all in a WaitGroup so we can wait for
+	// their termination.
+	bgfuncs := []func(chan bool){
+		// Config watcher.
+		watcherFn,
+
+		// Presence heartbeat.
+		rc.presence,
+
+		// Icecast updater,
+		rc.updater,
+
+		// Master election runner.
+		rc.me.Run,
+
+		// Icecast status checker.
+		rc.icecast.Run,
+	}
+
+	for _, fn := range bgfuncs {
+		rc.wg.Add(1)
+		go func(fn func(stop chan bool)) {
+			fn(rc.stop)
+			rc.wg.Done()
+		}(fn)
+	}
+}
+
+// Wait until all processing associated with this node has terminated.
+func (rc *RadioNode) Wait() {
+	rc.wg.Wait()
+	// Let's leave this around for garbage collection, otherwise
+	// the in-memory etcd mock server might try to send values to
+	// it due to a Watch-trigger desynchronization...
+	//close(rc.upch)
+}
+
+// Run the node, waiting for termination.
+func (rc *RadioNode) Run() {
+	rc.Start()
+	rc.Wait()
+}
+
 // Stop everything.
 func (rc *RadioNode) Stop() {
 	close(rc.stop)
-
-	// We should use WaitGroups here. Instead, wait 2 seconds.
-	time.Sleep(2 * time.Second)
 }
diff --git a/node/node_test.go b/node/node_test.go
index f71bc602..b7b67fd3 100644
--- a/node/node_test.go
+++ b/node/node_test.go
@@ -2,6 +2,7 @@ package node
 
 import (
 	"fmt"
+	"log"
 	"net"
 	"testing"
 	"time"
@@ -14,15 +15,18 @@ type mockController struct {
 	mounts     []*autoradio.Mount
 	isMaster   bool
 	masterAddr net.IP
+	numUpdates int
 }
 
-func (m *mockController) Run() {
+func (m *mockController) Run(stop chan bool) {
+	<-stop
 }
 
 func (m *mockController) Update(conf *clusterConfig, isMaster bool, masterAddr net.IP) error {
 	m.mounts = conf.ListMounts()
 	m.isMaster = isMaster
 	m.masterAddr = masterAddr
+	m.numUpdates++
 	return nil
 }
 
@@ -41,13 +45,22 @@ func startTestNodes(n int, etcd autoradio.EtcdClient) []*RadioNode {
 			1000,
 			etcd)
 		node.icecast = &mockController{}
-		go node.Run()
+		node.reloadDelay = time.Duration(0)
+		node.Start()
+		node.Log.SetPrefix(fmt.Sprintf("node%d: ", i+1))
+		node.me.Log.SetPrefix(fmt.Sprintf("node%d: masterelection: ", i+1))
 		nodes = append(nodes, node)
 	}
 	time.Sleep(1100 * time.Millisecond)
 	return nodes
 }
 
+func waitTestNodes(nodes []*RadioNode) {
+	for _, n := range nodes {
+		n.Wait()
+	}
+}
+
 func loadTestData(etcd autoradio.EtcdClient) {
 	etcd.Set(autoradio.MountPrefix+"test.ogg",
 		`{"Name": "/test.ogg", "Username": "source1", "Password": "foo"}`,
@@ -59,24 +72,30 @@ func TestRadioNode_MasterElection(t *testing.T) {
 	loadTestData(etcd)
 	nodes := startTestNodes(3, etcd)
 
-	countMasters := func(nodes []*RadioNode) int {
-		var masters int
-		for _, n := range nodes {
+	// Force master transitions by shutting down the nodes one by
+	// one as the become the master, and verify that there is a
+	// single master among the remaining ones.
+	curNodes := nodes
+	for len(curNodes) > 0 {
+		var tmp []*RadioNode
+		masterIdx := -1
+		numMasters := 0
+		for i, n := range curNodes {
 			if n.me.IsMaster() {
-				masters++
+				numMasters++
+				masterIdx = i
+			} else {
+				tmp = append(tmp, n)
 			}
 		}
-		return masters
-	}
-
-	// Shut down the nodes one by one, and verify that there is a
-	// single master among the remaining ones.
-	for i := 0; i < 3; i++ {
-		if nm := countMasters(nodes[i:len(nodes)]); nm != 1 {
-			t.Fatalf("@%d: masters=%d (expected 1)", i, nm)
+		if numMasters != 1 {
+			t.Fatalf("masters=%d (expected 1): %#v", numMasters, curNodes)
 		}
-		nodes[i].Stop()
-		time.Sleep(10 * time.Millisecond)
+		curNodes[masterIdx].Stop()
+		curNodes[masterIdx].Wait()
+		curNodes = tmp
+
+		time.Sleep(20 * time.Millisecond)
 	}
 }
 
@@ -98,6 +117,21 @@ func TestRadioNode_ConfigChangePropagation(t *testing.T) {
 		if username != "source2" {
 			t.Errorf("change did not propagate to node %d", i+1)
 		}
-		nodes[i].Stop()
+	}
+
+	// Verify that the controller has received the updates. There
+	// should be two of them: the initial config load, and the
+	// test update above.
+	for i := 0; i < 3; i++ {
+		numUpdates := nodes[i].icecast.(*mockController).numUpdates
+		if numUpdates != 2 {
+			t.Errorf("node %d received %d updates (expected 2)", i+1, numUpdates)
+		}
+	}
+
+	log.Printf("cleanup")
+	for _, n := range nodes {
+		n.Stop()
+		n.Wait()
 	}
 }
-- 
GitLab