diff --git a/masterelection/masterelection.go b/masterelection/masterelection.go index 97c8d8851037ed1ae433af9d4386f1b3be45e679..5eabdb2b53887ef6ddd52a39e03bdd87885d4ad2 100644 --- a/masterelection/masterelection.go +++ b/masterelection/masterelection.go @@ -2,6 +2,7 @@ package masterelection import ( "log" + "sync" "time" "git.autistici.org/ale/autoradio" @@ -9,91 +10,100 @@ import ( ) const ( - STATE_SLAVE = iota - STATE_MASTER + ROLE_UNKNOWN = iota + ROLE_SLAVE + ROLE_MASTER ) -func stateToString(state int) string { - switch state { - case STATE_SLAVE: +type Role int + +func (r Role) String() string { + switch r { + case ROLE_SLAVE: return "slave" - case STATE_MASTER: + case ROLE_MASTER: return "master" } - return "" + return "unknown" +} + +type State struct { + Role Role + MasterData string +} + +func (s State) Equal(other State) bool { + return s.Role == other.Role && s.MasterData == other.MasterData } type MasterElection struct { - client autoradio.EtcdClient - stop chan bool - stopped bool + client autoradio.EtcdClient + stop chan bool Data string Path string TTL uint64 - State int - StateChange chan int + LogPrefix string + + stateLock sync.Mutex + stateCh chan State + state State } -func NewMasterElection(client autoradio.EtcdClient, path, data string, ttl uint64, sch chan int, stop chan bool) *MasterElection { - if ttl < 2 { - ttl = 2 +func NewMasterElection(client autoradio.EtcdClient, path, data string, ttl uint64, sch chan State, stop chan bool) *MasterElection { + if ttl < 1 { + ttl = 1 } return &MasterElection{ - client: client, - Path: path, - Data: data, - TTL: ttl, - State: STATE_SLAVE, - StateChange: sch, - stop: stop, + client: client, + Path: path, + Data: data, + TTL: ttl, + state: State{Role: ROLE_UNKNOWN}, + stateCh: sch, + stop: stop, + LogPrefix: "masterelection", } } +func (m *MasterElection) Valid() bool { + m.stateLock.Lock() + defer m.stateLock.Unlock() + return m.state.Role != ROLE_UNKNOWN +} + func (m *MasterElection) IsMaster() bool { - return m.State == STATE_MASTER + m.stateLock.Lock() + defer m.stateLock.Unlock() + return m.state.Role == ROLE_MASTER } func (m *MasterElection) GetMasterData() string { - response, err := m.client.Get(m.Path, false, false) - if err != nil || response.Node == nil { - return "" - } - return response.Node.Value + m.stateLock.Lock() + defer m.stateLock.Unlock() + return m.state.MasterData } -func (m *MasterElection) setState(state int) { - log.Printf("masterelection: %s -> %s", - stateToString(m.State), - stateToString(state)) - // Order is important here: set state before triggering the - // update channel so that the receiver sees the right value. - m.State = state - if m.StateChange != nil { - m.StateChange <- state +func (m *MasterElection) setState(role Role, masterData string) { + state := State{ + Role: role, + MasterData: masterData, } -} - -func (m *MasterElection) stopper() { - <-m.stop - - // Tell the Run() goroutine to exit as soon as it can (we - // could have simply used the 'stop' channel there but Watch() - // ignores the stop channel if invoked without a receiving - // channel and we'd have to refactor Run() to use a temp - // channel for each invocation. - m.stopped = true - - // Remove the lock file if we are the master. - if m.State == STATE_MASTER { - log.Printf("releasing masterelection lock") - m.client.Delete(m.Path, false) + var changed bool + m.stateLock.Lock() + if changed = !m.state.Equal(state); changed { + log.Printf("%s: %s (%s) -> %s (%s)", m.LogPrefix, m.state.Role.String(), m.state.MasterData, role.String(), masterData) + m.state = state + } + m.stateLock.Unlock() + if changed && m.stateCh != nil { + m.stateCh <- state } } func (m *MasterElection) runMaster(index uint64) { - m.setState(STATE_MASTER) + m.setState(ROLE_MASTER, m.Data) // If we renew the lease every TTL / N, we allow N renewal // errors before we stop believing being the master. @@ -105,59 +115,77 @@ func (m *MasterElection) runMaster(index uint64) { select { case t := <-tick.C: // To verify that we actually are still the - // master (not just we believe we are), try - // a compare-and-swap operation to check that + // master (not just we believe we are), try a + // compare-and-swap operation to check that // the stored master address is still our own, // and no-one stole our lock. If not, the TTL // will be updated (and the lock renewed). - response, err := m.client.CompareAndSwap(m.Path, m.Data, m.TTL, m.Data, index) + resp, err := m.client.CompareAndSwap(m.Path, m.Data, m.TTL, m.Data, index) if err != nil { - log.Printf("error updating lock: %s", err) + log.Printf("%s: error updating lock: %s", m.LogPrefix, err) // If we can't renew the lock for a // TTL, we must assume we lost it. if t.Sub(lastUpdate) > ttl { - log.Printf("too many errors, lost lock") + log.Printf("%s: too many errors, lost lock", m.LogPrefix) return } } - index = response.EtcdIndex + index = resp.EtcdIndex lastUpdate = t case <-m.stop: + // Facilitate a master re-election by dropping + // the lock rather than letting it expire. + log.Printf("%s: releasing masterelection lock", m.LogPrefix) + m.client.Delete(m.Path, false) return } } } func (m *MasterElection) runSlave(index uint64) { - m.setState(STATE_SLAVE) + // It would be best if we could simply retrieve the master + // lock file at the specified index. But we can only Get from + // HEAD, so in case of a quick master transition we might + // retrieve data at an index greater than 'index'. + // + // In this case, we skip updating state, since the Watch call + // will immediately return (with the change that caused the + // index to increase). + resp, err := m.client.Get(m.Path, false, false) + if err == nil && resp.Node.ModifiedIndex <= index { + m.setState(ROLE_SLAVE, resp.Node.Value) + } for { - // Start a watch on the lock, waiting for its removal. - response, err := m.client.Watch(m.Path, index+1, false, nil, m.stop) + resp, err = m.client.Watch(m.Path, index+1, false, nil, m.stop) if err != nil { if err != etcd.ErrWatchStoppedByUser { - log.Printf("slave Watch() error: %+v", err) + log.Printf("%s: slave Watch() error: %v", m.LogPrefix, err) } return } - if response.Action == "delete" || response.Action == "expire" { + // If the lock has expired, or it has been removed, + // try to acquire it again. + if resp.Action == "delete" || resp.Action == "expire" { + log.Printf("%s: lock is gone", m.LogPrefix) return } - index = response.EtcdIndex + m.setState(ROLE_SLAVE, resp.Node.Value) + index = resp.EtcdIndex } - } func (m *MasterElection) Run() { - go m.stopper() - - // Start as a slave. - m.setState(STATE_SLAVE) - - for !m.stopped { + for { + // Quick non-blocking check for the stop channel. + select { + case <-m.stop: + return + default: + } // Try to acquire the lock. This call will only succeed // if the lockfile does not exist (either because it @@ -168,16 +196,16 @@ func (m *MasterElection) Run() { if err == nil { // Howdy, we're the master now. Wait a while // and renew our TTL. - log.Printf("masterelection: we are the master") + log.Printf("%s: we are the master", m.LogPrefix) m.runMaster(response.EtcdIndex) } else if etcdErr, ok := err.(*etcd.EtcdError); ok { // We're not the master. Wait until the lock // is deleted or expires. - log.Printf("masterelection: running as slave (%v)", etcdErr) + log.Printf("%s: running as slave (%v)", m.LogPrefix, etcdErr) m.runSlave(etcdErr.Index) } else { // An error of some other sort! Retry. - log.Printf("masterelection: unexpected error: %v", err) + log.Printf("%s: unexpected error: %v", m.LogPrefix, err) } } diff --git a/masterelection/masterelection_test.go b/masterelection/masterelection_test.go new file mode 100644 index 0000000000000000000000000000000000000000..ae495ca3509fdcd0d7a07296747042f16fef9459 --- /dev/null +++ b/masterelection/masterelection_test.go @@ -0,0 +1,67 @@ +package masterelection + +import ( + "fmt" + "math/rand" + "testing" + "time" + + "git.autistici.org/ale/autoradio/util" +) + +func init() { + rand.Seed(time.Now().Unix()) +} + +func countMasters(nodes []*MasterElection) int { + var masters int + for _, n := range nodes { + if n.IsMaster() { + masters++ + } + } + return masters +} + +func verifyMasterData(t *testing.T, nodes []*MasterElection) { + tmp := make(map[string]struct{}) + for _, n := range nodes { + tmp[n.GetMasterData()] = struct{}{} + } + if len(tmp) != 1 { + t.Errorf("master data propagation error: >1 values: %v", tmp) + } +} + +func TestMasterElection(t *testing.T) { + //etcd := util.NewTestEtcdClient() + etcd := util.NewTestEtcdClientWithLatency(20 * time.Millisecond) + lockPath := "/master/election/test" + + n := 5 + var nodes []*MasterElection + var stop []chan bool + for i := 0; i < n; i++ { + stopCh := make(chan bool) + m := NewMasterElection( + etcd, + lockPath, + fmt.Sprintf("%d", i), + 1, + nil, + stopCh) + m.LogPrefix = fmt.Sprintf("node%d: masterelection", i+1) + go m.Run() + nodes = append(nodes, m) + stop = append(stop, stopCh) + } + + for i := 0; i < n; i++ { + time.Sleep(100 * time.Millisecond) + if nm := countMasters(nodes[i:len(nodes)]); nm != 1 { + t.Errorf("@%d: masters=%d (expected <= 1)", i, nm) + } + verifyMasterData(t, nodes[i:len(nodes)]) + close(stop[i]) + } +} diff --git a/node/icecast.go b/node/icecast.go index dc41c065a6f4409701457dfa20879354dbb6bb89..93248e79bbf86b4c883ae89f743bbd0b34353554 100644 --- a/node/icecast.go +++ b/node/icecast.go @@ -25,6 +25,12 @@ var ( icecastOk = instrumentation.NewGauge("icecast.ok") ) +type Controller interface { + Update(*clusterConfig, bool, net.IP) error + GetStatus() *IcecastStatus + Run() +} + // Icecast returns empty fields in our status handler, which we'll // need to turn into integers (the xml unmarshaler will return an // error in this specific case), so we use a separate type for @@ -51,14 +57,14 @@ type IcecastStatus struct { Up bool } -type IcecastController struct { +type icecastController struct { config *icecastConfig status *IcecastStatus stop chan bool } -func NewIcecastController(publicIp string, stop chan bool) *IcecastController { - return &IcecastController{ +func NewIcecastController(publicIp string, stop chan bool) *icecastController { + return &icecastController{ config: newIcecastConfig(publicIp), status: &IcecastStatus{}, stop: make(chan bool, 1), @@ -67,7 +73,7 @@ func NewIcecastController(publicIp string, stop chan bool) *IcecastController { // Reload the icecast daemon. Redirects output to our standard error // for debugging purposes. -func (ic *IcecastController) reload() error { +func (ic *icecastController) reload() error { cmd := exec.Command("/bin/sh", "-c", icecastReloadCmd) cmd.Stdout = os.Stderr cmd.Stderr = os.Stderr @@ -75,7 +81,7 @@ func (ic *IcecastController) reload() error { } // Kill sources connected to local streams. -func (ic *IcecastController) killSources(conf *clusterConfig) error { +func (ic *icecastController) killSources(conf *clusterConfig) error { var anyErr error client := &http.Client{} for _, m := range conf.ListMounts() { @@ -99,7 +105,7 @@ func (ic *IcecastController) killSources(conf *clusterConfig) error { } // Update reloads the Icecast daemon with a new configuration. -func (ic *IcecastController) Update(conf *clusterConfig, isMaster bool, masterAddr net.IP) error { +func (ic *icecastController) Update(conf *clusterConfig, isMaster bool, masterAddr net.IP) error { if !isMaster && masterAddr == nil { return errors.New("unknown system state") } @@ -128,11 +134,11 @@ func (ic *IcecastController) Update(conf *clusterConfig, isMaster bool, masterAd return ic.reload() } -func (ic *IcecastController) GetStatus() *IcecastStatus { +func (ic *icecastController) GetStatus() *IcecastStatus { return ic.status } -func (ic *IcecastController) statusUpdater() { +func (ic *icecastController) statusUpdater() { t := time.NewTicker(3 * time.Second) downStatus := &IcecastStatus{} for { @@ -152,7 +158,7 @@ func (ic *IcecastController) statusUpdater() { } } -func (ic *IcecastController) fetchStatus() (*IcecastStatus, error) { +func (ic *icecastController) fetchStatus() (*IcecastStatus, error) { resp, err := http.Get(fmt.Sprintf("http://localhost:%d%s", autoradio.IcecastPort, statusPage)) if err != nil { return nil, err @@ -161,7 +167,7 @@ func (ic *IcecastController) fetchStatus() (*IcecastStatus, error) { return ic.parseStatusPage(resp.Body) } -func (ic *IcecastController) parseStatusPage(input io.Reader) (*IcecastStatus, error) { +func (ic *icecastController) parseStatusPage(input io.Reader) (*IcecastStatus, error) { var ustatus icecastStatusUnparsed if err := xml.NewDecoder(input).Decode(&ustatus); err != nil { return nil, err @@ -201,6 +207,6 @@ func (ic *IcecastController) parseStatusPage(input io.Reader) (*IcecastStatus, e return &status, nil } -func (ic *IcecastController) Run() { +func (ic *icecastController) Run() { ic.statusUpdater() } diff --git a/node/node.go b/node/node.go index e937b9d53f158d26dcdbb0e78c3f177f90ef7213..53ccc0d754c2a3be7993fa31d0353d49541086fa 100644 --- a/node/node.go +++ b/node/node.go @@ -223,7 +223,7 @@ type RadioNode struct { client autoradio.EtcdClient me *masterelection.MasterElection watcher *configWatcher - icecast *IcecastController + icecast Controller bw *bwmonitor.BandwidthUsageMonitor heartbeat uint64 upch chan bool @@ -239,7 +239,7 @@ func NewRadioNode(name string, ips []net.IP, netDev string, bwLimit float64, cli upch := make(chan bool, 1) // MasterElection changes trigger an update. - mech := make(chan int) + mech := make(chan masterelection.State) go func() { for _ = range mech { trigger(upch) diff --git a/node/node_test.go b/node/node_test.go new file mode 100644 index 0000000000000000000000000000000000000000..2d5485e1d06eb7d15426cf66241bb6c7efb496b3 --- /dev/null +++ b/node/node_test.go @@ -0,0 +1,81 @@ +package node + +import ( + "fmt" + "net" + "testing" + "time" + + "git.autistici.org/ale/autoradio" + "git.autistici.org/ale/autoradio/util" +) + +type mockController struct { + mounts []*autoradio.Mount + isMaster bool + masterAddr net.IP +} + +func (m *mockController) Run() { +} + +func (m *mockController) Update(conf *clusterConfig, isMaster bool, masterAddr net.IP) error { + m.mounts = conf.ListMounts() + m.isMaster = isMaster + m.masterAddr = masterAddr + return nil +} + +func (m *mockController) GetStatus() *IcecastStatus { + return &IcecastStatus{Up: true} +} + +func startTestNodes(n int, etcd autoradio.EtcdClient) []*RadioNode { + var nodes []*RadioNode + + for i := 0; i < n; i++ { + node := NewRadioNode( + fmt.Sprintf("node%d", i+1), + []net.IP{net.ParseIP(fmt.Sprintf("127.0.0.%d", i+1))}, + "eth0", + 1000, + etcd) + node.icecast = &mockController{} + go node.Run() + nodes = append(nodes, node) + } + time.Sleep(1100 * time.Millisecond) + return nodes +} + +func loadTestData(etcd autoradio.EtcdClient) { + etcd.Set(autoradio.MountPrefix+"/test.ogg", + `{"Name": "/test.ogg", "Username": "source1", "Password": "foo"}`, + 86400) +} + +func countMasters(nodes []*RadioNode) int { + var masters int + for _, n := range nodes { + if n.me.IsMaster() { + masters++ + } + } + return masters +} + +func TestRadioNode_MasterElection(t *testing.T) { + etcd := util.NewTestEtcdClient() + loadTestData(etcd) + nodes := startTestNodes(3, etcd) + + // Shut down the nodes one by one, and verify that there is a + // single master among the remaining ones. + for i := 0; i < 3; i++ { + if nm := countMasters(nodes[i:len(nodes)]); nm != 1 { + t.Fatalf("@%d: masters=%d (expected 1)", i, nm) + } + nodes[i].Stop() + time.Sleep(10 * time.Millisecond) + } +} diff --git a/util/mock_etcd.go b/util/mock_etcd.go new file mode 100644 index 0000000000000000000000000000000000000000..a7960e9d40fc4835e6323590f7d9fc8ab982c968 --- /dev/null +++ b/util/mock_etcd.go @@ -0,0 +1,193 @@ +// Etcd client mock for testing purposes. It tries to follow the same +// semantics as the actual etcd, at least as far as they are used by +// autoradio. Instead of talking to a remote etcd server, it uses an +// in-memory representation of the data. + +package util + +import ( + "errors" + "math/rand" + "strings" + "sync" + "time" + + "git.autistici.org/ale/autoradio" + "github.com/coreos/go-etcd/etcd" +) + +type datum struct { + value string + expire time.Time +} + +type testEtcdServer struct { + lock sync.Mutex + latency time.Duration + data map[string]datum + watches map[string][]chan *etcd.Response + index uint64 +} + +func NewTestEtcdClient() autoradio.EtcdClient { + return &testEtcdServer{ + data: make(map[string]datum), + watches: make(map[string][]chan *etcd.Response), + index: 1, + } +} + +func NewTestEtcdClientWithLatency(maxLatency time.Duration) autoradio.EtcdClient { + return &testEtcdServer{ + data: make(map[string]datum), + watches: make(map[string][]chan *etcd.Response), + index: 1, + latency: maxLatency, + } +} + +func (s *testEtcdServer) delay() { + if s.latency > 0 { + time.Sleep(time.Duration(rand.Int63n(int64(s.latency)))) + } +} + +func (s *testEtcdServer) trigger(action, key string) *etcd.Response { + resp := &etcd.Response{ + Action: action, + EtcdIndex: s.index, + Node: &etcd.Node{ + Key: key, + }, + } + if action != "delete" { + resp.Node.Value = s.data[key].value + } + for pfx, w := range s.watches { + if strings.HasPrefix(key, pfx) { + for _, ch := range w { + ch <- resp + } + } + } + s.index++ + return resp +} + +func (s *testEtcdServer) Create(key, value string, ttl uint64) (*etcd.Response, error) { + s.delay() + s.lock.Lock() + defer s.lock.Unlock() + + if _, ok := s.data[key]; ok { + return nil, &etcd.EtcdError{Message: "already there", Index: s.index} + } + + s.data[key] = datum{ + value: value, + expire: time.Now().Add(time.Duration(ttl) * time.Second), + } + return s.trigger("create", key), nil +} + +func (s *testEtcdServer) CompareAndSwap(key, value string, ttl uint64, oldvalue string, index uint64) (*etcd.Response, error) { + s.delay() + s.lock.Lock() + defer s.lock.Unlock() + if d, ok := s.data[key]; ok && d.value == oldvalue { + s.data[key] = datum{ + value: value, + expire: time.Now().Add(time.Duration(ttl) * time.Second), + } + return s.trigger("update", key), nil + } + return nil, &etcd.EtcdError{Message: "failed", Index: s.index} +} + +func (s *testEtcdServer) Delete(key string, recursive bool) (*etcd.Response, error) { + s.delay() + s.lock.Lock() + defer s.lock.Unlock() + delete(s.data, key) + return s.trigger("delete", key), nil +} + +func (s *testEtcdServer) Get(key string, recursive, boh bool) (*etcd.Response, error) { + s.delay() + s.lock.Lock() + defer s.lock.Unlock() + resp := &etcd.Response{ + EtcdIndex: s.index, + } + var nodes []*etcd.Node + keyDirPfx := key + "/" + for path, datum := range s.data { + if path == key || strings.HasPrefix(path, keyDirPfx) { + nodes = append(nodes, &etcd.Node{ + Key: path, + Value: datum.value, + }) + } + } + switch { + case len(nodes) == 0: + return nil, errors.New("not found") + case len(nodes) == 1 && nodes[0].Key == key: + resp.Node = nodes[0] + default: + resp.Node = &etcd.Node{ + Key: key, + Dir: true, + Nodes: nodes, + } + } + return resp, nil +} + +func (s *testEtcdServer) Set(key, value string, ttl uint64) (*etcd.Response, error) { + s.delay() + s.lock.Lock() + defer s.lock.Unlock() + s.data[key] = datum{ + value: value, + expire: time.Now().Add(time.Duration(ttl) * time.Second), + } + return s.trigger("set", key), nil +} + +func (s *testEtcdServer) Watch(key string, index uint64, recursive bool, respch chan *etcd.Response, stop chan bool) (*etcd.Response, error) { + ch := respch + if ch == nil { + ch = make(chan *etcd.Response, 1) + } + s.lock.Lock() + s.watches[key] = append(s.watches[key], ch) + s.lock.Unlock() + + var resp *etcd.Response + if respch != nil { + <-stop + } else { + select { + case resp = <-ch: + case <-stop: + } + } + + // Delete the watch. + s.lock.Lock() + var watches []chan *etcd.Response + for _, w := range s.watches[key] { + if w != ch { + watches = append(watches, w) + } + } + s.watches[key] = watches + s.lock.Unlock() + close(ch) + + if resp == nil { + return nil, etcd.ErrWatchStoppedByUser + } + return resp, nil +}