Skip to content
Snippets Groups Projects
Commit bc86eb31 authored by ale's avatar ale
Browse files

refactor RadioNode for robustness

Increase robustness of the config watcher by reloading the full
configuration on Etcd errors (fixes an issue with the very common error
401).

Make sure that the Icecast config reload is only triggered if the
configuration has actually changed.

Added a way to stop the node properly, which ensures that we are
tracking all the resources and that there are no deadlocks in the
internal communications.

MasterElection now closes the update channel when it's done.
parent 089bdf32
Branches
No related tags found
No related merge requests found
...@@ -45,6 +45,10 @@ type Mount struct { ...@@ -45,6 +45,10 @@ type Mount struct {
Fallback string Fallback string
} }
func (m *Mount) Equal(other *Mount) bool {
return *m == *other
}
func (m *Mount) IsRelay() bool { func (m *Mount) IsRelay() bool {
return m.RelayUrl != "" return m.RelayUrl != ""
} }
......
...@@ -2,6 +2,7 @@ package masterelection ...@@ -2,6 +2,7 @@ package masterelection
import ( import (
"log" "log"
"os"
"sync" "sync"
"time" "time"
...@@ -38,32 +39,34 @@ func (s State) Equal(other State) bool { ...@@ -38,32 +39,34 @@ func (s State) Equal(other State) bool {
type MasterElection struct { type MasterElection struct {
client autoradio.EtcdClient client autoradio.EtcdClient
stop chan bool
Data string Data string
Path string Path string
TTL uint64 TTL uint64
LogPrefix string Log *log.Logger
stateLock sync.Mutex stateLock sync.Mutex
stateCh chan State stateCh chan State
state State state State
} }
func NewMasterElection(client autoradio.EtcdClient, path, data string, ttl uint64, sch chan State, stop chan bool) *MasterElection { // NewMasterElection creates a new MasterElection object that will
// establish a lock on 'path'. It will send state transitions to
// 'sch', if provided. If 'sch' is not nil, it will be closed when
// Run() terminates.
func NewMasterElection(client autoradio.EtcdClient, path, data string, ttl uint64, sch chan State) *MasterElection {
if ttl < 1 { if ttl < 1 {
ttl = 1 ttl = 1
} }
return &MasterElection{ return &MasterElection{
client: client, client: client,
Path: path, Path: path,
Data: data, Data: data,
TTL: ttl, TTL: ttl,
state: State{Role: ROLE_UNKNOWN}, state: State{Role: ROLE_UNKNOWN},
stateCh: sch, stateCh: sch,
stop: stop, Log: log.New(os.Stderr, "masterelection: ", 0),
LogPrefix: "masterelection",
} }
} }
...@@ -93,7 +96,7 @@ func (m *MasterElection) setState(role Role, masterData string) { ...@@ -93,7 +96,7 @@ func (m *MasterElection) setState(role Role, masterData string) {
var changed bool var changed bool
m.stateLock.Lock() m.stateLock.Lock()
if changed = !m.state.Equal(state); changed { if changed = !m.state.Equal(state); changed {
log.Printf("%s: %s (%s) -> %s (%s)", m.LogPrefix, m.state.Role.String(), m.state.MasterData, role.String(), masterData) m.Log.Printf("%s (%s) -> %s (%s)", m.state.Role.String(), m.state.MasterData, role.String(), masterData)
m.state = state m.state = state
} }
m.stateLock.Unlock() m.stateLock.Unlock()
...@@ -102,7 +105,7 @@ func (m *MasterElection) setState(role Role, masterData string) { ...@@ -102,7 +105,7 @@ func (m *MasterElection) setState(role Role, masterData string) {
} }
} }
func (m *MasterElection) runMaster(index uint64) { func (m *MasterElection) runMaster(index uint64, stop chan bool) {
m.setState(ROLE_MASTER, m.Data) m.setState(ROLE_MASTER, m.Data)
// If we renew the lease every TTL / N, we allow N renewal // If we renew the lease every TTL / N, we allow N renewal
...@@ -122,28 +125,28 @@ func (m *MasterElection) runMaster(index uint64) { ...@@ -122,28 +125,28 @@ func (m *MasterElection) runMaster(index uint64) {
// will be updated (and the lock renewed). // will be updated (and the lock renewed).
resp, err := m.client.CompareAndSwap(m.Path, m.Data, m.TTL, m.Data, index) resp, err := m.client.CompareAndSwap(m.Path, m.Data, m.TTL, m.Data, index)
if err != nil { if err != nil {
log.Printf("%s: error updating lock: %s", m.LogPrefix, err) m.Log.Printf("error updating lock: %s", err)
// If we can't renew the lock for a // If we can't renew the lock for a
// TTL, we must assume we lost it. // TTL, we must assume we lost it.
if t.Sub(lastUpdate) > ttl { if t.Sub(lastUpdate) > ttl {
log.Printf("%s: too many errors, lost lock", m.LogPrefix) m.Log.Println("too many errors, lost lock")
return return
} }
} }
index = resp.EtcdIndex index = resp.EtcdIndex
lastUpdate = t lastUpdate = t
case <-m.stop: case <-stop:
// Facilitate a master re-election by dropping // Facilitate a master re-election by dropping
// the lock rather than letting it expire. // the lock rather than letting it expire.
log.Printf("%s: releasing masterelection lock", m.LogPrefix) m.Log.Println("releasing masterelection lock")
m.client.Delete(m.Path, false) m.client.Delete(m.Path, false)
return return
} }
} }
} }
func (m *MasterElection) runSlave(index uint64) { func (m *MasterElection) runSlave(index uint64, stop chan bool) {
// It would be best if we could simply retrieve the master // It would be best if we could simply retrieve the master
// lock file at the specified index. But we can only Get from // lock file at the specified index. But we can only Get from
// HEAD, so in case of a quick master transition we might // HEAD, so in case of a quick master transition we might
...@@ -158,10 +161,10 @@ func (m *MasterElection) runSlave(index uint64) { ...@@ -158,10 +161,10 @@ func (m *MasterElection) runSlave(index uint64) {
} }
for { for {
resp, err = m.client.Watch(m.Path, index+1, false, nil, m.stop) resp, err = m.client.Watch(m.Path, index+1, false, nil, stop)
if err != nil { if err != nil {
if err != etcd.ErrWatchStoppedByUser { if err != etcd.ErrWatchStoppedByUser {
log.Printf("%s: slave Watch() error: %v", m.LogPrefix, err) m.Log.Printf("slave Watch() error: %v", err)
} }
return return
} }
...@@ -169,7 +172,7 @@ func (m *MasterElection) runSlave(index uint64) { ...@@ -169,7 +172,7 @@ func (m *MasterElection) runSlave(index uint64) {
// If the lock has expired, or it has been removed, // If the lock has expired, or it has been removed,
// try to acquire it again. // try to acquire it again.
if resp.Action == "delete" || resp.Action == "expire" { if resp.Action == "delete" || resp.Action == "expire" {
log.Printf("%s: lock is gone", m.LogPrefix) m.Log.Println("lock is gone")
return return
} }
...@@ -178,11 +181,15 @@ func (m *MasterElection) runSlave(index uint64) { ...@@ -178,11 +181,15 @@ func (m *MasterElection) runSlave(index uint64) {
} }
} }
func (m *MasterElection) Run() { func (m *MasterElection) Run(stop chan bool) {
if m.stateCh != nil {
defer close(m.stateCh)
}
for { for {
// Quick non-blocking check for the stop channel. // Quick non-blocking check for the stop channel.
select { select {
case <-m.stop: case <-stop:
return return
default: default:
} }
...@@ -196,16 +203,14 @@ func (m *MasterElection) Run() { ...@@ -196,16 +203,14 @@ func (m *MasterElection) Run() {
if err == nil { if err == nil {
// Howdy, we're the master now. Wait a while // Howdy, we're the master now. Wait a while
// and renew our TTL. // and renew our TTL.
log.Printf("%s: we are the master", m.LogPrefix) m.runMaster(response.EtcdIndex, stop)
m.runMaster(response.EtcdIndex)
} else if etcdErr, ok := err.(*etcd.EtcdError); ok { } else if etcdErr, ok := err.(*etcd.EtcdError); ok {
// We're not the master. Wait until the lock // We're not the master. Wait until the lock
// is deleted or expires. // is deleted or expires.
log.Printf("%s: running as slave (%v)", m.LogPrefix, etcdErr) m.runSlave(etcdErr.Index, stop)
m.runSlave(etcdErr.Index)
} else { } else {
// An error of some other sort! Retry. // An error of some other sort! Retry.
log.Printf("%s: unexpected error: %v", m.LogPrefix, err) m.Log.Printf("unexpected error: %v", err)
} }
} }
......
...@@ -48,10 +48,9 @@ func TestMasterElection(t *testing.T) { ...@@ -48,10 +48,9 @@ func TestMasterElection(t *testing.T) {
lockPath, lockPath,
fmt.Sprintf("%d", i), fmt.Sprintf("%d", i),
1, 1,
nil, nil)
stopCh)
m.LogPrefix = fmt.Sprintf("node%d: masterelection", i+1) m.LogPrefix = fmt.Sprintf("node%d: masterelection", i+1)
go m.Run() go m.Run(stopCh)
nodes = append(nodes, m) nodes = append(nodes, m)
stop = append(stop, stopCh) stop = append(stop, stopCh)
} }
......
...@@ -28,7 +28,7 @@ var ( ...@@ -28,7 +28,7 @@ var (
type Controller interface { type Controller interface {
Update(*clusterConfig, bool, net.IP) error Update(*clusterConfig, bool, net.IP) error
GetStatus() *IcecastStatus GetStatus() *IcecastStatus
Run() Run(chan bool)
} }
// Icecast returns empty fields in our status handler, which we'll // Icecast returns empty fields in our status handler, which we'll
...@@ -63,11 +63,10 @@ type icecastController struct { ...@@ -63,11 +63,10 @@ type icecastController struct {
stop chan bool stop chan bool
} }
func NewIcecastController(publicIp string, stop chan bool) *icecastController { func NewIcecastController(publicIp string) *icecastController {
return &icecastController{ return &icecastController{
config: newIcecastConfig(publicIp), config: newIcecastConfig(publicIp),
status: &IcecastStatus{}, status: &IcecastStatus{},
stop: make(chan bool, 1),
} }
} }
...@@ -138,7 +137,7 @@ func (ic *icecastController) GetStatus() *IcecastStatus { ...@@ -138,7 +137,7 @@ func (ic *icecastController) GetStatus() *IcecastStatus {
return ic.status return ic.status
} }
func (ic *icecastController) statusUpdater() { func (ic *icecastController) Run(stop chan bool) {
t := time.NewTicker(3 * time.Second) t := time.NewTicker(3 * time.Second)
downStatus := &IcecastStatus{} downStatus := &IcecastStatus{}
for { for {
...@@ -152,7 +151,7 @@ func (ic *icecastController) statusUpdater() { ...@@ -152,7 +151,7 @@ func (ic *icecastController) statusUpdater() {
ic.status = downStatus ic.status = downStatus
icecastOk.Set(0) icecastOk.Set(0)
} }
case <-ic.stop: case <-stop:
return return
} }
} }
...@@ -206,7 +205,3 @@ func (ic *icecastController) parseStatusPage(input io.Reader) (*IcecastStatus, e ...@@ -206,7 +205,3 @@ func (ic *icecastController) parseStatusPage(input io.Reader) (*IcecastStatus, e
return &status, nil return &status, nil
} }
func (ic *icecastController) Run() {
ic.statusUpdater()
}
...@@ -14,7 +14,7 @@ func TestIcecastConfig(t *testing.T) { ...@@ -14,7 +14,7 @@ func TestIcecastConfig(t *testing.T) {
Password: "pass", Password: "pass",
} }
c := newClusterConfig() c := newClusterConfig()
c.setMount(mount) c.setMountIfChanged(mount)
// Test a relay config. // Test a relay config.
ice := newIcecastConfig("1.2.3.4") ice := newIcecastConfig("1.2.3.4")
......
...@@ -9,7 +9,7 @@ func TestIcecast_TestParseStatusPage(t *testing.T) { ...@@ -9,7 +9,7 @@ func TestIcecast_TestParseStatusPage(t *testing.T) {
xml := `<?xml version="1.0"?> xml := `<?xml version="1.0"?>
<status><mount name="/test.ogg"><listeners>3</listeners><bitrate/><quality/><video-quality/><frame-size/><frame-rate/></mount></status>` <status><mount name="/test.ogg"><listeners>3</listeners><bitrate/><quality/><video-quality/><frame-size/><frame-rate/></mount></status>`
ic := NewIcecastController("1.2.3.4", make(chan bool)) ic := NewIcecastController("1.2.3.4")
result, err := ic.parseStatusPage(strings.NewReader(xml)) result, err := ic.parseStatusPage(strings.NewReader(xml))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
......
...@@ -5,6 +5,7 @@ import ( ...@@ -5,6 +5,7 @@ import (
"encoding/json" "encoding/json"
"log" "log"
"net" "net"
"os"
"strings" "strings"
"sync" "sync"
"time" "time"
...@@ -66,11 +67,16 @@ func (c *clusterConfig) ListMounts() []*autoradio.Mount { ...@@ -66,11 +67,16 @@ func (c *clusterConfig) ListMounts() []*autoradio.Mount {
return result return result
} }
// Update a mount (in-memory only). // Update a mount (in-memory only). Returns true if the new value is
func (c *clusterConfig) setMount(m *autoradio.Mount) { // different than the old one.
func (c *clusterConfig) setMountIfChanged(m *autoradio.Mount) bool {
c.lock.Lock() c.lock.Lock()
defer c.lock.Unlock() defer c.lock.Unlock()
if prev, ok := c.mounts[m.Name]; ok && prev.Equal(m) {
return false
}
c.mounts[m.Name] = m c.mounts[m.Name] = m
return true
} }
// Delete a mount (in-memory only). // Delete a mount (in-memory only).
...@@ -86,152 +92,163 @@ type configWatcher struct { ...@@ -86,152 +92,163 @@ type configWatcher struct {
client autoradio.EtcdClient client autoradio.EtcdClient
config *clusterConfig config *clusterConfig
upch chan bool upch chan bool
stop chan bool
index uint64
} }
func newConfigSyncer(client autoradio.EtcdClient, config *clusterConfig, upch chan bool, stop chan bool) *configWatcher { func newConfigSyncer(client autoradio.EtcdClient, config *clusterConfig, upch chan bool) *configWatcher {
return &configWatcher{ return &configWatcher{
client: client, client: client,
config: config, config: config,
upch: upch, upch: upch,
stop: stop,
} }
} }
func (w *configWatcher) setIndex(index uint64) { // Returns true if the mount configuration has been modified.
w.index = index func (w *configWatcher) updateMount(key, value string, index uint64) bool {
configIndex.Set(int64(index))
}
func (w *configWatcher) updateConfigWithResponse(index uint64, key, value string) {
mountName := keyToMount(key) mountName := keyToMount(key)
log.Printf("updating mount %s [@%d]: %s", mountName, index, value)
var m autoradio.Mount var m autoradio.Mount
if err := json.NewDecoder(strings.NewReader(value)).Decode(&m); err != nil { if err := json.NewDecoder(strings.NewReader(value)).Decode(&m); err != nil {
log.Printf("corrupted data: %s: %s", value, err) log.Printf("error updating mount %s [@%d]: corrupted data: %v", mountName, index, err)
} else { } else {
w.config.setMount(&m) if w.config.setMountIfChanged(&m) {
log.Printf("updated mount %s [@%d]: %s", keyToMount(key), index, value)
return true
}
} }
return false
} }
func (w *configWatcher) watcher(ch chan *etcd.Response) { func (w *configWatcher) watcher(index uint64, stop chan bool) {
for { for {
select { configIndex.Set(int64(index))
case response, ok := <-ch: resp, err := w.client.Watch(autoradio.MountPrefix, index+1, true, nil, stop)
if !ok { if err != nil {
if err == etcd.ErrWatchStoppedByUser {
return return
} }
if etcdErr, ok := err.(*etcd.EtcdError); ok {
// Update the 'last seen' index, so that if // If the index passed to Watch falls
// the Watcher dies, it knows where to start // behind the current log position by
// from and we do not have to download the // more than 1000 entries (which is
// full configuration again. Do this even for // bound to happen regularly in
// actions we don't care about, just in case. // presence of a sustained write
w.setIndex(response.EtcdIndex) // rate), etcd will return an error
// 401 ("index out of date"). The safe
if response.Action == "delete" { // thing to do is just to run a full
mountName := keyToMount(response.Node.Key) // load again, and resume watching
log.Printf("deleted mount %s", mountName) // from the current EtcdIndex.
w.config.delMount(mountName) if etcdErr.ErrorCode == 401 {
} else if response.Action == "set" || response.Action == "create" || response.Action == "update" { index = w.load(stop)
w.updateConfigWithResponse(response.EtcdIndex, response.Node.Key, response.Node.Value) continue
}
// Log other etcd errors.
log.Printf("config Watch(): %v", err)
// Follow the etcd index even in case
// of errors.
index = etcdErr.Index
} else { } else {
continue // Generic error, just retry the call.
log.Printf("config Watch(): %v", err)
} }
time.Sleep(200 * time.Millisecond)
continue
}
// Trigger an update. index = resp.EtcdIndex
trigger(w.upch)
switch resp.Action {
case <-w.stop: case "delete":
return mountName := keyToMount(resp.Node.Key)
log.Printf("deleted mount %s", mountName)
w.config.delMount(mountName)
case "set", "create", "update":
if !w.updateMount(resp.Node.Key, resp.Node.Value, index) {
// Do not trigger an update if the
// update had no effect.
continue
}
default:
continue
} }
// Trigger an update.
trigger(w.upch)
} }
} }
// Load full configuration from etcd. This will trigger the update channel. // Load full configuration from etcd. It will wait until the
func (w *configWatcher) loadFullConfig() { // configuration can be read successfully, and it will trigger the
// update channel after it's done.
func (w *configWatcher) load(stop chan bool) uint64 {
var err error
var resp *etcd.Response
for { for {
response, err := w.client.Get(autoradio.MountPrefix, false, false) if resp, err = w.client.Get(autoradio.MountPrefix, false, false); err != nil {
if err == nil && response.Node != nil && response.Node.Dir { log.Printf("config load error: %v", err)
// Directly update the configuration. } else if resp.Node == nil || !resp.Node.Dir {
for _, n := range response.Node.Nodes { log.Printf("config load error: no configuration found")
w.updateConfigWithResponse(response.EtcdIndex, n.Key, n.Value) } else {
}
log.Printf("got configuration at index %d", response.EtcdIndex)
w.setIndex(response.EtcdIndex)
break break
} }
log.Printf("Get error: %s", err)
// Wait 1 second, but watch the stop channel. // Wait 1 second, but watch the stop channel.
select { select {
case <-time.After(1 * time.Second): case <-time.After(1 * time.Second):
case <-w.stop: case <-stop:
return return 0
}
}
index := resp.EtcdIndex
changed := false
for _, n := range resp.Node.Nodes {
if w.updateMount(n.Key, n.Value, index) {
changed = true
} }
} }
// Update the icecast daemon now that we have a full config. if changed {
trigger(w.upch) // Update the icecast daemon now that we have a full config.
trigger(w.upch)
}
return index
} }
// Start the configWatcher in the background. It will wait for // Start the configWatcher in the background. It will wait for
// initialization to complete, so that when this function returns, the // initialization to complete, so that when this function returns, the
// in-memory configuration has already been fully synchronized. // in-memory configuration has already been fully synchronized.
func (w *configWatcher) Start() { func (w *configWatcher) Start(stop chan bool) func(chan bool) {
// Run until the first successful Get(). log.Printf("loading configuration")
log.Printf("retrieving initial config") index := w.load(stop)
w.loadFullConfig() return func(stop chan bool) {
w.watcher(index, stop)
// Main watch loop. Remember that etcd.Watch() will close the }
// receiver channel when it returns, so we need to start a new
// syncer every time.
go func() {
for {
ch := make(chan *etcd.Response)
go w.watcher(ch)
curIndex := w.index + 1
log.Printf("starting watcher at index %d", curIndex)
_, err := w.client.Watch(autoradio.MountPrefix, curIndex, true, ch, w.stop)
if err == etcd.ErrWatchStoppedByUser {
return
} else if err != nil {
// Log the error and start over.
log.Printf("Watch(): %s", err)
// If the error is code 401 ("index
// out of date"), start from the
// current index. TODO: consider
// calling loadFullConfig, or we might
// potentially lose some updates?
if etcdErr, ok := err.(*etcd.EtcdError); ok && etcdErr.ErrorCode == 401 {
w.setIndex(etcdErr.Index)
}
}
}
}()
} }
// An active streaming node, managing the local icecast server. // An active streaming node, managing the local icecast server.
type RadioNode struct { type RadioNode struct {
client autoradio.EtcdClient client autoradio.EtcdClient
config *clusterConfig config *clusterConfig
name string name string
ips []net.IP ips []net.IP
me *masterelection.MasterElection me *masterelection.MasterElection
watcher *configWatcher watcher *configWatcher
icecast Controller icecast Controller
bw *bwmonitor.BandwidthUsageMonitor bw *bwmonitor.BandwidthUsageMonitor
heartbeat uint64 heartbeat uint64
upch chan bool reloadDelay time.Duration
stop chan bool upch chan bool
stop chan bool
wg sync.WaitGroup
Log *log.Logger
} }
func NewRadioNode(name string, ips []net.IP, netDev string, bwLimit float64, client autoradio.EtcdClient) *RadioNode { func NewRadioNode(name string, ips []net.IP, netDev string, bwLimit float64, client autoradio.EtcdClient) *RadioNode {
config := newClusterConfig() config := newClusterConfig()
// Global 'stop' channel.
stopch := make(chan bool)
// Network updates trigger icecast reconfiguration. This // Network updates trigger icecast reconfiguration. This
// channel is used as an event signal. // channel is used as an event signal.
upch := make(chan bool, 1) upch := make(chan bool, 1)
...@@ -239,14 +256,11 @@ func NewRadioNode(name string, ips []net.IP, netDev string, bwLimit float64, cli ...@@ -239,14 +256,11 @@ func NewRadioNode(name string, ips []net.IP, netDev string, bwLimit float64, cli
// MasterElection changes trigger an update. // MasterElection changes trigger an update.
mech := make(chan masterelection.State) mech := make(chan masterelection.State)
go func() { go func() {
for _ = range mech { for range mech {
trigger(upch) trigger(upch)
} }
}() }()
// Global 'stop' channel.
stopch := make(chan bool)
// Location information advertised when this node is master. // Location information advertised when this node is master.
minfo := &autoradio.MasterNodeInfo{ minfo := &autoradio.MasterNodeInfo{
Name: name, Name: name,
...@@ -267,20 +281,21 @@ func NewRadioNode(name string, ips []net.IP, netDev string, bwLimit float64, cli ...@@ -267,20 +281,21 @@ func NewRadioNode(name string, ips []net.IP, netDev string, bwLimit float64, cli
autoradio.MasterElectionPath, autoradio.MasterElectionPath,
string(minfodata), string(minfodata),
5, 5,
mech, mech),
stopch), watcher: newConfigSyncer(client, config, upch),
watcher: newConfigSyncer(client, config, upch, stopch), icecast: NewIcecastController(name),
icecast: NewIcecastController(name, stopch), reloadDelay: 1000 * time.Millisecond,
heartbeat: 2, heartbeat: 2,
bw: bwmonitor.NewBandwidthUsageMonitor(netDev, bwLimit), bw: bwmonitor.NewBandwidthUsageMonitor(netDev, bwLimit),
upch: upch, upch: upch,
stop: stopch, stop: stopch,
Log: log.New(os.Stderr, "node: ", 0),
} }
} }
// The presence goroutine periodically updates our entry in the list // The presence goroutine periodically updates our entry in the list
// of nodes with the current node statistics. // of nodes with the current node statistics.
func (rc *RadioNode) presence() { func (rc *RadioNode) presence(stop chan bool) {
ticker := time.NewTicker(time.Duration(rc.heartbeat) * time.Second / 3) ticker := time.NewTicker(time.Duration(rc.heartbeat) * time.Second / 3)
// Register ourselves using the node name. // Register ourselves using the node name.
...@@ -303,10 +318,10 @@ func (rc *RadioNode) presence() { ...@@ -303,10 +318,10 @@ func (rc *RadioNode) presence() {
var buf bytes.Buffer var buf bytes.Buffer
json.NewEncoder(&buf).Encode(&nodeStatus) json.NewEncoder(&buf).Encode(&nodeStatus)
if _, err := rc.client.Set(key, buf.String(), rc.heartbeat); err != nil { if _, err := rc.client.Set(key, buf.String(), rc.heartbeat); err != nil {
log.Printf("presence: Set(): %s", err) rc.Log.Printf("presence: Set(): %s", err)
} }
case <-rc.stop: case <-stop:
return return
} }
} }
...@@ -323,51 +338,86 @@ func (rc *RadioNode) getMasterAddr() net.IP { ...@@ -323,51 +338,86 @@ func (rc *RadioNode) getMasterAddr() net.IP {
return info.IP[0] return info.IP[0]
} }
// Run the node. This method does not return until someone calls Stop(). // Reload the icecast configuration when needed.
func (rc *RadioNode) Run() { func (rc *RadioNode) updater(stop chan bool) {
// Bootstrap the config watcher. This ensures that we have a // Wait an instant to give a chance to the services to
// full configuration (thanks to the Get() call) before we // initialize properly.
// start managing the icecast server.
rc.watcher.Start()
// Start the presence heartbeat.
go rc.presence()
// Start the masterelection runner.
go rc.me.Run()
// Wait an instant to give a chance to the other services to
// initialize.
time.Sleep(200 * time.Millisecond) time.Sleep(200 * time.Millisecond)
// Start the Icecast status checker. rc.Log.Printf("starting icecast updater")
go rc.icecast.Run()
log.Printf("starting icecast updater")
for { for {
select { select {
case <-rc.upch: case <-rc.upch:
icecastReloads.Incr() icecastReloads.Incr()
log.Printf("reloading icecast config") rc.Log.Printf("reloading icecast config")
if err := rc.icecast.Update(rc.config, rc.me.IsMaster(), rc.getMasterAddr()); err != nil { if err := rc.icecast.Update(rc.config, rc.me.IsMaster(), rc.getMasterAddr()); err != nil {
icecastReloadErrors.Incr() icecastReloadErrors.Incr()
log.Printf("Update(): %s", err) rc.Log.Printf("Update(): %s", err)
} }
// Do not reload icecast more often than once // Limit the rate of icecast reloads.
// every two seconds. if rc.reloadDelay > 0 {
time.Sleep(2 * time.Second) time.Sleep(rc.reloadDelay)
}
case <-rc.stop: case <-stop:
return return
} }
} }
} }
// Start the node. Returns immediately.
func (rc *RadioNode) Start() {
// Bootstrap the config watcher first. This ensures that we
// have a full configuration (thanks to the Get() call) before
// we start managing the icecast server.
watcherFn := rc.watcher.Start(rc.stop)
// Start auxiliary services and event listeners on their own
// goroutines. Put them all in a WaitGroup so we can wait for
// their termination.
bgfuncs := []func(chan bool){
// Config watcher.
watcherFn,
// Presence heartbeat.
rc.presence,
// Icecast updater,
rc.updater,
// Master election runner.
rc.me.Run,
// Icecast status checker.
rc.icecast.Run,
}
for _, fn := range bgfuncs {
rc.wg.Add(1)
go func(fn func(stop chan bool)) {
fn(rc.stop)
rc.wg.Done()
}(fn)
}
}
// Wait until all processing associated with this node has terminated.
func (rc *RadioNode) Wait() {
rc.wg.Wait()
// Let's leave this around for garbage collection, otherwise
// the in-memory etcd mock server might try to send values to
// it due to a Watch-trigger desynchronization...
//close(rc.upch)
}
// Run the node, waiting for termination.
func (rc *RadioNode) Run() {
rc.Start()
rc.Wait()
}
// Stop everything. // Stop everything.
func (rc *RadioNode) Stop() { func (rc *RadioNode) Stop() {
close(rc.stop) close(rc.stop)
// We should use WaitGroups here. Instead, wait 2 seconds.
time.Sleep(2 * time.Second)
} }
...@@ -2,6 +2,7 @@ package node ...@@ -2,6 +2,7 @@ package node
import ( import (
"fmt" "fmt"
"log"
"net" "net"
"testing" "testing"
"time" "time"
...@@ -14,15 +15,18 @@ type mockController struct { ...@@ -14,15 +15,18 @@ type mockController struct {
mounts []*autoradio.Mount mounts []*autoradio.Mount
isMaster bool isMaster bool
masterAddr net.IP masterAddr net.IP
numUpdates int
} }
func (m *mockController) Run() { func (m *mockController) Run(stop chan bool) {
<-stop
} }
func (m *mockController) Update(conf *clusterConfig, isMaster bool, masterAddr net.IP) error { func (m *mockController) Update(conf *clusterConfig, isMaster bool, masterAddr net.IP) error {
m.mounts = conf.ListMounts() m.mounts = conf.ListMounts()
m.isMaster = isMaster m.isMaster = isMaster
m.masterAddr = masterAddr m.masterAddr = masterAddr
m.numUpdates++
return nil return nil
} }
...@@ -41,13 +45,22 @@ func startTestNodes(n int, etcd autoradio.EtcdClient) []*RadioNode { ...@@ -41,13 +45,22 @@ func startTestNodes(n int, etcd autoradio.EtcdClient) []*RadioNode {
1000, 1000,
etcd) etcd)
node.icecast = &mockController{} node.icecast = &mockController{}
go node.Run() node.reloadDelay = time.Duration(0)
node.Start()
node.Log.SetPrefix(fmt.Sprintf("node%d: ", i+1))
node.me.Log.SetPrefix(fmt.Sprintf("node%d: masterelection: ", i+1))
nodes = append(nodes, node) nodes = append(nodes, node)
} }
time.Sleep(1100 * time.Millisecond) time.Sleep(1100 * time.Millisecond)
return nodes return nodes
} }
func waitTestNodes(nodes []*RadioNode) {
for _, n := range nodes {
n.Wait()
}
}
func loadTestData(etcd autoradio.EtcdClient) { func loadTestData(etcd autoradio.EtcdClient) {
etcd.Set(autoradio.MountPrefix+"test.ogg", etcd.Set(autoradio.MountPrefix+"test.ogg",
`{"Name": "/test.ogg", "Username": "source1", "Password": "foo"}`, `{"Name": "/test.ogg", "Username": "source1", "Password": "foo"}`,
...@@ -59,24 +72,30 @@ func TestRadioNode_MasterElection(t *testing.T) { ...@@ -59,24 +72,30 @@ func TestRadioNode_MasterElection(t *testing.T) {
loadTestData(etcd) loadTestData(etcd)
nodes := startTestNodes(3, etcd) nodes := startTestNodes(3, etcd)
countMasters := func(nodes []*RadioNode) int { // Force master transitions by shutting down the nodes one by
var masters int // one as the become the master, and verify that there is a
for _, n := range nodes { // single master among the remaining ones.
curNodes := nodes
for len(curNodes) > 0 {
var tmp []*RadioNode
masterIdx := -1
numMasters := 0
for i, n := range curNodes {
if n.me.IsMaster() { if n.me.IsMaster() {
masters++ numMasters++
masterIdx = i
} else {
tmp = append(tmp, n)
} }
} }
return masters if numMasters != 1 {
} t.Fatalf("masters=%d (expected 1): %#v", numMasters, curNodes)
// Shut down the nodes one by one, and verify that there is a
// single master among the remaining ones.
for i := 0; i < 3; i++ {
if nm := countMasters(nodes[i:len(nodes)]); nm != 1 {
t.Fatalf("@%d: masters=%d (expected 1)", i, nm)
} }
nodes[i].Stop() curNodes[masterIdx].Stop()
time.Sleep(10 * time.Millisecond) curNodes[masterIdx].Wait()
curNodes = tmp
time.Sleep(20 * time.Millisecond)
} }
} }
...@@ -98,6 +117,21 @@ func TestRadioNode_ConfigChangePropagation(t *testing.T) { ...@@ -98,6 +117,21 @@ func TestRadioNode_ConfigChangePropagation(t *testing.T) {
if username != "source2" { if username != "source2" {
t.Errorf("change did not propagate to node %d", i+1) t.Errorf("change did not propagate to node %d", i+1)
} }
nodes[i].Stop() }
// Verify that the controller has received the updates. There
// should be two of them: the initial config load, and the
// test update above.
for i := 0; i < 3; i++ {
numUpdates := nodes[i].icecast.(*mockController).numUpdates
if numUpdates != 2 {
t.Errorf("node %d received %d updates (expected 2)", i+1, numUpdates)
}
}
log.Printf("cleanup")
for _, n := range nodes {
n.Stop()
n.Wait()
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment