diff --git a/api.go b/api.go index 2d46f946946df140d02763348c0d5eb015f9e3da..30fd0f1f8d4b430776ccf3ed33b700af610155dd 100644 --- a/api.go +++ b/api.go @@ -45,6 +45,10 @@ type Mount struct { Fallback string } +func (m *Mount) Equal(other *Mount) bool { + return *m == *other +} + func (m *Mount) IsRelay() bool { return m.RelayUrl != "" } diff --git a/masterelection/masterelection.go b/masterelection/masterelection.go index 5eabdb2b53887ef6ddd52a39e03bdd87885d4ad2..b8da182512ef115fb37a38186bf16dec4ba85511 100644 --- a/masterelection/masterelection.go +++ b/masterelection/masterelection.go @@ -2,6 +2,7 @@ package masterelection import ( "log" + "os" "sync" "time" @@ -38,32 +39,34 @@ func (s State) Equal(other State) bool { type MasterElection struct { client autoradio.EtcdClient - stop chan bool Data string Path string TTL uint64 - LogPrefix string + Log *log.Logger stateLock sync.Mutex stateCh chan State state State } -func NewMasterElection(client autoradio.EtcdClient, path, data string, ttl uint64, sch chan State, stop chan bool) *MasterElection { +// NewMasterElection creates a new MasterElection object that will +// establish a lock on 'path'. It will send state transitions to +// 'sch', if provided. If 'sch' is not nil, it will be closed when +// Run() terminates. +func NewMasterElection(client autoradio.EtcdClient, path, data string, ttl uint64, sch chan State) *MasterElection { if ttl < 1 { ttl = 1 } return &MasterElection{ - client: client, - Path: path, - Data: data, - TTL: ttl, - state: State{Role: ROLE_UNKNOWN}, - stateCh: sch, - stop: stop, - LogPrefix: "masterelection", + client: client, + Path: path, + Data: data, + TTL: ttl, + state: State{Role: ROLE_UNKNOWN}, + stateCh: sch, + Log: log.New(os.Stderr, "masterelection: ", 0), } } @@ -93,7 +96,7 @@ func (m *MasterElection) setState(role Role, masterData string) { var changed bool m.stateLock.Lock() if changed = !m.state.Equal(state); changed { - log.Printf("%s: %s (%s) -> %s (%s)", m.LogPrefix, m.state.Role.String(), m.state.MasterData, role.String(), masterData) + m.Log.Printf("%s (%s) -> %s (%s)", m.state.Role.String(), m.state.MasterData, role.String(), masterData) m.state = state } m.stateLock.Unlock() @@ -102,7 +105,7 @@ func (m *MasterElection) setState(role Role, masterData string) { } } -func (m *MasterElection) runMaster(index uint64) { +func (m *MasterElection) runMaster(index uint64, stop chan bool) { m.setState(ROLE_MASTER, m.Data) // If we renew the lease every TTL / N, we allow N renewal @@ -122,28 +125,28 @@ func (m *MasterElection) runMaster(index uint64) { // will be updated (and the lock renewed). resp, err := m.client.CompareAndSwap(m.Path, m.Data, m.TTL, m.Data, index) if err != nil { - log.Printf("%s: error updating lock: %s", m.LogPrefix, err) + m.Log.Printf("error updating lock: %s", err) // If we can't renew the lock for a // TTL, we must assume we lost it. if t.Sub(lastUpdate) > ttl { - log.Printf("%s: too many errors, lost lock", m.LogPrefix) + m.Log.Println("too many errors, lost lock") return } } index = resp.EtcdIndex lastUpdate = t - case <-m.stop: + case <-stop: // Facilitate a master re-election by dropping // the lock rather than letting it expire. - log.Printf("%s: releasing masterelection lock", m.LogPrefix) + m.Log.Println("releasing masterelection lock") m.client.Delete(m.Path, false) return } } } -func (m *MasterElection) runSlave(index uint64) { +func (m *MasterElection) runSlave(index uint64, stop chan bool) { // It would be best if we could simply retrieve the master // lock file at the specified index. But we can only Get from // HEAD, so in case of a quick master transition we might @@ -158,10 +161,10 @@ func (m *MasterElection) runSlave(index uint64) { } for { - resp, err = m.client.Watch(m.Path, index+1, false, nil, m.stop) + resp, err = m.client.Watch(m.Path, index+1, false, nil, stop) if err != nil { if err != etcd.ErrWatchStoppedByUser { - log.Printf("%s: slave Watch() error: %v", m.LogPrefix, err) + m.Log.Printf("slave Watch() error: %v", err) } return } @@ -169,7 +172,7 @@ func (m *MasterElection) runSlave(index uint64) { // If the lock has expired, or it has been removed, // try to acquire it again. if resp.Action == "delete" || resp.Action == "expire" { - log.Printf("%s: lock is gone", m.LogPrefix) + m.Log.Println("lock is gone") return } @@ -178,11 +181,15 @@ func (m *MasterElection) runSlave(index uint64) { } } -func (m *MasterElection) Run() { +func (m *MasterElection) Run(stop chan bool) { + if m.stateCh != nil { + defer close(m.stateCh) + } + for { // Quick non-blocking check for the stop channel. select { - case <-m.stop: + case <-stop: return default: } @@ -196,16 +203,14 @@ func (m *MasterElection) Run() { if err == nil { // Howdy, we're the master now. Wait a while // and renew our TTL. - log.Printf("%s: we are the master", m.LogPrefix) - m.runMaster(response.EtcdIndex) + m.runMaster(response.EtcdIndex, stop) } else if etcdErr, ok := err.(*etcd.EtcdError); ok { // We're not the master. Wait until the lock // is deleted or expires. - log.Printf("%s: running as slave (%v)", m.LogPrefix, etcdErr) - m.runSlave(etcdErr.Index) + m.runSlave(etcdErr.Index, stop) } else { // An error of some other sort! Retry. - log.Printf("%s: unexpected error: %v", m.LogPrefix, err) + m.Log.Printf("unexpected error: %v", err) } } diff --git a/masterelection/masterelection_test.go b/masterelection/masterelection_test.go index ae495ca3509fdcd0d7a07296747042f16fef9459..a98247f412c2373d7120b74b1bd0b6464c141bf5 100644 --- a/masterelection/masterelection_test.go +++ b/masterelection/masterelection_test.go @@ -48,10 +48,9 @@ func TestMasterElection(t *testing.T) { lockPath, fmt.Sprintf("%d", i), 1, - nil, - stopCh) + nil) m.LogPrefix = fmt.Sprintf("node%d: masterelection", i+1) - go m.Run() + go m.Run(stopCh) nodes = append(nodes, m) stop = append(stop, stopCh) } diff --git a/node/icecast.go b/node/icecast.go index 93248e79bbf86b4c883ae89f743bbd0b34353554..f123d3bc099f6316db549340e1a2df5486aba128 100644 --- a/node/icecast.go +++ b/node/icecast.go @@ -28,7 +28,7 @@ var ( type Controller interface { Update(*clusterConfig, bool, net.IP) error GetStatus() *IcecastStatus - Run() + Run(chan bool) } // Icecast returns empty fields in our status handler, which we'll @@ -63,11 +63,10 @@ type icecastController struct { stop chan bool } -func NewIcecastController(publicIp string, stop chan bool) *icecastController { +func NewIcecastController(publicIp string) *icecastController { return &icecastController{ config: newIcecastConfig(publicIp), status: &IcecastStatus{}, - stop: make(chan bool, 1), } } @@ -138,7 +137,7 @@ func (ic *icecastController) GetStatus() *IcecastStatus { return ic.status } -func (ic *icecastController) statusUpdater() { +func (ic *icecastController) Run(stop chan bool) { t := time.NewTicker(3 * time.Second) downStatus := &IcecastStatus{} for { @@ -152,7 +151,7 @@ func (ic *icecastController) statusUpdater() { ic.status = downStatus icecastOk.Set(0) } - case <-ic.stop: + case <-stop: return } } @@ -206,7 +205,3 @@ func (ic *icecastController) parseStatusPage(input io.Reader) (*IcecastStatus, e return &status, nil } - -func (ic *icecastController) Run() { - ic.statusUpdater() -} diff --git a/node/icecast_config_test.go b/node/icecast_config_test.go index 23ac639872d2411b0c24da4ddaeae55cb73bc6a0..ede5529cdda7d9d951cb37e7c20870d548dd2562 100644 --- a/node/icecast_config_test.go +++ b/node/icecast_config_test.go @@ -14,7 +14,7 @@ func TestIcecastConfig(t *testing.T) { Password: "pass", } c := newClusterConfig() - c.setMount(mount) + c.setMountIfChanged(mount) // Test a relay config. ice := newIcecastConfig("1.2.3.4") diff --git a/node/icecast_test.go b/node/icecast_test.go index 2e49694083bd0d7f347bec0fb20b83e76be28e6d..3bdc3c62b18aa460cccf1fecd78b09ce6c7c2c77 100644 --- a/node/icecast_test.go +++ b/node/icecast_test.go @@ -9,7 +9,7 @@ func TestIcecast_TestParseStatusPage(t *testing.T) { xml := `<?xml version="1.0"?> <status><mount name="/test.ogg"><listeners>3</listeners><bitrate/><quality/><video-quality/><frame-size/><frame-rate/></mount></status>` - ic := NewIcecastController("1.2.3.4", make(chan bool)) + ic := NewIcecastController("1.2.3.4") result, err := ic.parseStatusPage(strings.NewReader(xml)) if err != nil { t.Fatal(err) diff --git a/node/node.go b/node/node.go index c9c2bb8f414b45aef728744813c865717f8e14d3..21fbcb02179ec86567f4831de5e7ad1e8335d4ad 100644 --- a/node/node.go +++ b/node/node.go @@ -5,6 +5,7 @@ import ( "encoding/json" "log" "net" + "os" "strings" "sync" "time" @@ -66,11 +67,16 @@ func (c *clusterConfig) ListMounts() []*autoradio.Mount { return result } -// Update a mount (in-memory only). -func (c *clusterConfig) setMount(m *autoradio.Mount) { +// Update a mount (in-memory only). Returns true if the new value is +// different than the old one. +func (c *clusterConfig) setMountIfChanged(m *autoradio.Mount) bool { c.lock.Lock() defer c.lock.Unlock() + if prev, ok := c.mounts[m.Name]; ok && prev.Equal(m) { + return false + } c.mounts[m.Name] = m + return true } // Delete a mount (in-memory only). @@ -86,152 +92,163 @@ type configWatcher struct { client autoradio.EtcdClient config *clusterConfig upch chan bool - stop chan bool - index uint64 } -func newConfigSyncer(client autoradio.EtcdClient, config *clusterConfig, upch chan bool, stop chan bool) *configWatcher { +func newConfigSyncer(client autoradio.EtcdClient, config *clusterConfig, upch chan bool) *configWatcher { return &configWatcher{ client: client, config: config, upch: upch, - stop: stop, } } -func (w *configWatcher) setIndex(index uint64) { - w.index = index - configIndex.Set(int64(index)) -} - -func (w *configWatcher) updateConfigWithResponse(index uint64, key, value string) { +// Returns true if the mount configuration has been modified. +func (w *configWatcher) updateMount(key, value string, index uint64) bool { mountName := keyToMount(key) - log.Printf("updating mount %s [@%d]: %s", mountName, index, value) var m autoradio.Mount if err := json.NewDecoder(strings.NewReader(value)).Decode(&m); err != nil { - log.Printf("corrupted data: %s: %s", value, err) + log.Printf("error updating mount %s [@%d]: corrupted data: %v", mountName, index, err) } else { - w.config.setMount(&m) + if w.config.setMountIfChanged(&m) { + log.Printf("updated mount %s [@%d]: %s", keyToMount(key), index, value) + return true + } } + return false } -func (w *configWatcher) watcher(ch chan *etcd.Response) { +func (w *configWatcher) watcher(index uint64, stop chan bool) { for { - select { - case response, ok := <-ch: - if !ok { + configIndex.Set(int64(index)) + resp, err := w.client.Watch(autoradio.MountPrefix, index+1, true, nil, stop) + if err != nil { + if err == etcd.ErrWatchStoppedByUser { return } - - // Update the 'last seen' index, so that if - // the Watcher dies, it knows where to start - // from and we do not have to download the - // full configuration again. Do this even for - // actions we don't care about, just in case. - w.setIndex(response.EtcdIndex) - - if response.Action == "delete" { - mountName := keyToMount(response.Node.Key) - log.Printf("deleted mount %s", mountName) - w.config.delMount(mountName) - } else if response.Action == "set" || response.Action == "create" || response.Action == "update" { - w.updateConfigWithResponse(response.EtcdIndex, response.Node.Key, response.Node.Value) + if etcdErr, ok := err.(*etcd.EtcdError); ok { + // If the index passed to Watch falls + // behind the current log position by + // more than 1000 entries (which is + // bound to happen regularly in + // presence of a sustained write + // rate), etcd will return an error + // 401 ("index out of date"). The safe + // thing to do is just to run a full + // load again, and resume watching + // from the current EtcdIndex. + if etcdErr.ErrorCode == 401 { + index = w.load(stop) + continue + } + // Log other etcd errors. + log.Printf("config Watch(): %v", err) + // Follow the etcd index even in case + // of errors. + index = etcdErr.Index } else { - continue + // Generic error, just retry the call. + log.Printf("config Watch(): %v", err) } + time.Sleep(200 * time.Millisecond) + continue + } - // Trigger an update. - trigger(w.upch) - - case <-w.stop: - return + index = resp.EtcdIndex + + switch resp.Action { + case "delete": + mountName := keyToMount(resp.Node.Key) + log.Printf("deleted mount %s", mountName) + w.config.delMount(mountName) + case "set", "create", "update": + if !w.updateMount(resp.Node.Key, resp.Node.Value, index) { + // Do not trigger an update if the + // update had no effect. + continue + } + default: + continue } + + // Trigger an update. + trigger(w.upch) } } -// Load full configuration from etcd. This will trigger the update channel. -func (w *configWatcher) loadFullConfig() { +// Load full configuration from etcd. It will wait until the +// configuration can be read successfully, and it will trigger the +// update channel after it's done. +func (w *configWatcher) load(stop chan bool) uint64 { + var err error + var resp *etcd.Response for { - response, err := w.client.Get(autoradio.MountPrefix, false, false) - if err == nil && response.Node != nil && response.Node.Dir { - // Directly update the configuration. - for _, n := range response.Node.Nodes { - w.updateConfigWithResponse(response.EtcdIndex, n.Key, n.Value) - } - log.Printf("got configuration at index %d", response.EtcdIndex) - w.setIndex(response.EtcdIndex) + if resp, err = w.client.Get(autoradio.MountPrefix, false, false); err != nil { + log.Printf("config load error: %v", err) + } else if resp.Node == nil || !resp.Node.Dir { + log.Printf("config load error: no configuration found") + } else { break } - log.Printf("Get error: %s", err) // Wait 1 second, but watch the stop channel. select { case <-time.After(1 * time.Second): - case <-w.stop: - return + case <-stop: + return 0 + } + } + + index := resp.EtcdIndex + changed := false + for _, n := range resp.Node.Nodes { + if w.updateMount(n.Key, n.Value, index) { + changed = true } } - // Update the icecast daemon now that we have a full config. - trigger(w.upch) + if changed { + // Update the icecast daemon now that we have a full config. + trigger(w.upch) + } + + return index } // Start the configWatcher in the background. It will wait for // initialization to complete, so that when this function returns, the // in-memory configuration has already been fully synchronized. -func (w *configWatcher) Start() { - // Run until the first successful Get(). - log.Printf("retrieving initial config") - w.loadFullConfig() - - // Main watch loop. Remember that etcd.Watch() will close the - // receiver channel when it returns, so we need to start a new - // syncer every time. - go func() { - for { - ch := make(chan *etcd.Response) - go w.watcher(ch) - - curIndex := w.index + 1 - log.Printf("starting watcher at index %d", curIndex) - _, err := w.client.Watch(autoradio.MountPrefix, curIndex, true, ch, w.stop) - if err == etcd.ErrWatchStoppedByUser { - return - } else if err != nil { - // Log the error and start over. - log.Printf("Watch(): %s", err) - - // If the error is code 401 ("index - // out of date"), start from the - // current index. TODO: consider - // calling loadFullConfig, or we might - // potentially lose some updates? - if etcdErr, ok := err.(*etcd.EtcdError); ok && etcdErr.ErrorCode == 401 { - w.setIndex(etcdErr.Index) - } - } - } - }() +func (w *configWatcher) Start(stop chan bool) func(chan bool) { + log.Printf("loading configuration") + index := w.load(stop) + return func(stop chan bool) { + w.watcher(index, stop) + } } // An active streaming node, managing the local icecast server. type RadioNode struct { - client autoradio.EtcdClient - config *clusterConfig - name string - ips []net.IP - me *masterelection.MasterElection - watcher *configWatcher - icecast Controller - bw *bwmonitor.BandwidthUsageMonitor - heartbeat uint64 - upch chan bool - stop chan bool + client autoradio.EtcdClient + config *clusterConfig + name string + ips []net.IP + me *masterelection.MasterElection + watcher *configWatcher + icecast Controller + bw *bwmonitor.BandwidthUsageMonitor + heartbeat uint64 + reloadDelay time.Duration + upch chan bool + stop chan bool + wg sync.WaitGroup + Log *log.Logger } func NewRadioNode(name string, ips []net.IP, netDev string, bwLimit float64, client autoradio.EtcdClient) *RadioNode { config := newClusterConfig() + // Global 'stop' channel. + stopch := make(chan bool) + // Network updates trigger icecast reconfiguration. This // channel is used as an event signal. upch := make(chan bool, 1) @@ -239,14 +256,11 @@ func NewRadioNode(name string, ips []net.IP, netDev string, bwLimit float64, cli // MasterElection changes trigger an update. mech := make(chan masterelection.State) go func() { - for _ = range mech { + for range mech { trigger(upch) } }() - // Global 'stop' channel. - stopch := make(chan bool) - // Location information advertised when this node is master. minfo := &autoradio.MasterNodeInfo{ Name: name, @@ -267,20 +281,21 @@ func NewRadioNode(name string, ips []net.IP, netDev string, bwLimit float64, cli autoradio.MasterElectionPath, string(minfodata), 5, - mech, - stopch), - watcher: newConfigSyncer(client, config, upch, stopch), - icecast: NewIcecastController(name, stopch), - heartbeat: 2, - bw: bwmonitor.NewBandwidthUsageMonitor(netDev, bwLimit), - upch: upch, - stop: stopch, + mech), + watcher: newConfigSyncer(client, config, upch), + icecast: NewIcecastController(name), + reloadDelay: 1000 * time.Millisecond, + heartbeat: 2, + bw: bwmonitor.NewBandwidthUsageMonitor(netDev, bwLimit), + upch: upch, + stop: stopch, + Log: log.New(os.Stderr, "node: ", 0), } } // The presence goroutine periodically updates our entry in the list // of nodes with the current node statistics. -func (rc *RadioNode) presence() { +func (rc *RadioNode) presence(stop chan bool) { ticker := time.NewTicker(time.Duration(rc.heartbeat) * time.Second / 3) // Register ourselves using the node name. @@ -303,10 +318,10 @@ func (rc *RadioNode) presence() { var buf bytes.Buffer json.NewEncoder(&buf).Encode(&nodeStatus) if _, err := rc.client.Set(key, buf.String(), rc.heartbeat); err != nil { - log.Printf("presence: Set(): %s", err) + rc.Log.Printf("presence: Set(): %s", err) } - case <-rc.stop: + case <-stop: return } } @@ -323,51 +338,86 @@ func (rc *RadioNode) getMasterAddr() net.IP { return info.IP[0] } -// Run the node. This method does not return until someone calls Stop(). -func (rc *RadioNode) Run() { - // Bootstrap the config watcher. This ensures that we have a - // full configuration (thanks to the Get() call) before we - // start managing the icecast server. - rc.watcher.Start() - - // Start the presence heartbeat. - go rc.presence() - - // Start the masterelection runner. - go rc.me.Run() - - // Wait an instant to give a chance to the other services to - // initialize. +// Reload the icecast configuration when needed. +func (rc *RadioNode) updater(stop chan bool) { + // Wait an instant to give a chance to the services to + // initialize properly. time.Sleep(200 * time.Millisecond) - // Start the Icecast status checker. - go rc.icecast.Run() - - log.Printf("starting icecast updater") + rc.Log.Printf("starting icecast updater") for { select { case <-rc.upch: icecastReloads.Incr() - log.Printf("reloading icecast config") + rc.Log.Printf("reloading icecast config") if err := rc.icecast.Update(rc.config, rc.me.IsMaster(), rc.getMasterAddr()); err != nil { icecastReloadErrors.Incr() - log.Printf("Update(): %s", err) + rc.Log.Printf("Update(): %s", err) } - // Do not reload icecast more often than once - // every two seconds. - time.Sleep(2 * time.Second) + // Limit the rate of icecast reloads. + if rc.reloadDelay > 0 { + time.Sleep(rc.reloadDelay) + } - case <-rc.stop: + case <-stop: return } } } +// Start the node. Returns immediately. +func (rc *RadioNode) Start() { + // Bootstrap the config watcher first. This ensures that we + // have a full configuration (thanks to the Get() call) before + // we start managing the icecast server. + watcherFn := rc.watcher.Start(rc.stop) + + // Start auxiliary services and event listeners on their own + // goroutines. Put them all in a WaitGroup so we can wait for + // their termination. + bgfuncs := []func(chan bool){ + // Config watcher. + watcherFn, + + // Presence heartbeat. + rc.presence, + + // Icecast updater, + rc.updater, + + // Master election runner. + rc.me.Run, + + // Icecast status checker. + rc.icecast.Run, + } + + for _, fn := range bgfuncs { + rc.wg.Add(1) + go func(fn func(stop chan bool)) { + fn(rc.stop) + rc.wg.Done() + }(fn) + } +} + +// Wait until all processing associated with this node has terminated. +func (rc *RadioNode) Wait() { + rc.wg.Wait() + // Let's leave this around for garbage collection, otherwise + // the in-memory etcd mock server might try to send values to + // it due to a Watch-trigger desynchronization... + //close(rc.upch) +} + +// Run the node, waiting for termination. +func (rc *RadioNode) Run() { + rc.Start() + rc.Wait() +} + // Stop everything. func (rc *RadioNode) Stop() { close(rc.stop) - - // We should use WaitGroups here. Instead, wait 2 seconds. - time.Sleep(2 * time.Second) } diff --git a/node/node_test.go b/node/node_test.go index f71bc60234833ad976be8327e67ac834ea5798bc..b7b67fd3e56b1c50bb384e8fdeede63c48abd3d9 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -2,6 +2,7 @@ package node import ( "fmt" + "log" "net" "testing" "time" @@ -14,15 +15,18 @@ type mockController struct { mounts []*autoradio.Mount isMaster bool masterAddr net.IP + numUpdates int } -func (m *mockController) Run() { +func (m *mockController) Run(stop chan bool) { + <-stop } func (m *mockController) Update(conf *clusterConfig, isMaster bool, masterAddr net.IP) error { m.mounts = conf.ListMounts() m.isMaster = isMaster m.masterAddr = masterAddr + m.numUpdates++ return nil } @@ -41,13 +45,22 @@ func startTestNodes(n int, etcd autoradio.EtcdClient) []*RadioNode { 1000, etcd) node.icecast = &mockController{} - go node.Run() + node.reloadDelay = time.Duration(0) + node.Start() + node.Log.SetPrefix(fmt.Sprintf("node%d: ", i+1)) + node.me.Log.SetPrefix(fmt.Sprintf("node%d: masterelection: ", i+1)) nodes = append(nodes, node) } time.Sleep(1100 * time.Millisecond) return nodes } +func waitTestNodes(nodes []*RadioNode) { + for _, n := range nodes { + n.Wait() + } +} + func loadTestData(etcd autoradio.EtcdClient) { etcd.Set(autoradio.MountPrefix+"test.ogg", `{"Name": "/test.ogg", "Username": "source1", "Password": "foo"}`, @@ -59,24 +72,30 @@ func TestRadioNode_MasterElection(t *testing.T) { loadTestData(etcd) nodes := startTestNodes(3, etcd) - countMasters := func(nodes []*RadioNode) int { - var masters int - for _, n := range nodes { + // Force master transitions by shutting down the nodes one by + // one as the become the master, and verify that there is a + // single master among the remaining ones. + curNodes := nodes + for len(curNodes) > 0 { + var tmp []*RadioNode + masterIdx := -1 + numMasters := 0 + for i, n := range curNodes { if n.me.IsMaster() { - masters++ + numMasters++ + masterIdx = i + } else { + tmp = append(tmp, n) } } - return masters - } - - // Shut down the nodes one by one, and verify that there is a - // single master among the remaining ones. - for i := 0; i < 3; i++ { - if nm := countMasters(nodes[i:len(nodes)]); nm != 1 { - t.Fatalf("@%d: masters=%d (expected 1)", i, nm) + if numMasters != 1 { + t.Fatalf("masters=%d (expected 1): %#v", numMasters, curNodes) } - nodes[i].Stop() - time.Sleep(10 * time.Millisecond) + curNodes[masterIdx].Stop() + curNodes[masterIdx].Wait() + curNodes = tmp + + time.Sleep(20 * time.Millisecond) } } @@ -98,6 +117,21 @@ func TestRadioNode_ConfigChangePropagation(t *testing.T) { if username != "source2" { t.Errorf("change did not propagate to node %d", i+1) } - nodes[i].Stop() + } + + // Verify that the controller has received the updates. There + // should be two of them: the initial config load, and the + // test update above. + for i := 0; i < 3; i++ { + numUpdates := nodes[i].icecast.(*mockController).numUpdates + if numUpdates != 2 { + t.Errorf("node %d received %d updates (expected 2)", i+1, numUpdates) + } + } + + log.Printf("cleanup") + for _, n := range nodes { + n.Stop() + n.Wait() } }