From 03a67c17de8db7f5b0a408aa2c2192427ba97232 Mon Sep 17 00:00:00 2001 From: ale <ale@incal.net> Date: Fri, 26 Dec 2014 13:16:42 +0000 Subject: [PATCH] refactor masterelection to remove races; add tests Master election code should be more robust now, this commit eliminates the discrepancy between role and master info (previously it was possible for these two to be non synchronized). This commit also includes an in-memory etcd mock, which implements enough of the etcd interface to test the masterelection code. --- masterelection/masterelection.go | 180 ++++++++++++++---------- masterelection/masterelection_test.go | 67 +++++++++ node/icecast.go | 28 ++-- node/node.go | 4 +- node/node_test.go | 81 +++++++++++ util/mock_etcd.go | 193 ++++++++++++++++++++++++++ 6 files changed, 464 insertions(+), 89 deletions(-) create mode 100644 masterelection/masterelection_test.go create mode 100644 node/node_test.go create mode 100644 util/mock_etcd.go diff --git a/masterelection/masterelection.go b/masterelection/masterelection.go index 97c8d885..5eabdb2b 100644 --- a/masterelection/masterelection.go +++ b/masterelection/masterelection.go @@ -2,6 +2,7 @@ package masterelection import ( "log" + "sync" "time" "git.autistici.org/ale/autoradio" @@ -9,91 +10,100 @@ import ( ) const ( - STATE_SLAVE = iota - STATE_MASTER + ROLE_UNKNOWN = iota + ROLE_SLAVE + ROLE_MASTER ) -func stateToString(state int) string { - switch state { - case STATE_SLAVE: +type Role int + +func (r Role) String() string { + switch r { + case ROLE_SLAVE: return "slave" - case STATE_MASTER: + case ROLE_MASTER: return "master" } - return "" + return "unknown" +} + +type State struct { + Role Role + MasterData string +} + +func (s State) Equal(other State) bool { + return s.Role == other.Role && s.MasterData == other.MasterData } type MasterElection struct { - client autoradio.EtcdClient - stop chan bool - stopped bool + client autoradio.EtcdClient + stop chan bool Data string Path string TTL uint64 - State int - StateChange chan int + LogPrefix string + + stateLock sync.Mutex + stateCh chan State + state State } -func NewMasterElection(client autoradio.EtcdClient, path, data string, ttl uint64, sch chan int, stop chan bool) *MasterElection { - if ttl < 2 { - ttl = 2 +func NewMasterElection(client autoradio.EtcdClient, path, data string, ttl uint64, sch chan State, stop chan bool) *MasterElection { + if ttl < 1 { + ttl = 1 } return &MasterElection{ - client: client, - Path: path, - Data: data, - TTL: ttl, - State: STATE_SLAVE, - StateChange: sch, - stop: stop, + client: client, + Path: path, + Data: data, + TTL: ttl, + state: State{Role: ROLE_UNKNOWN}, + stateCh: sch, + stop: stop, + LogPrefix: "masterelection", } } +func (m *MasterElection) Valid() bool { + m.stateLock.Lock() + defer m.stateLock.Unlock() + return m.state.Role != ROLE_UNKNOWN +} + func (m *MasterElection) IsMaster() bool { - return m.State == STATE_MASTER + m.stateLock.Lock() + defer m.stateLock.Unlock() + return m.state.Role == ROLE_MASTER } func (m *MasterElection) GetMasterData() string { - response, err := m.client.Get(m.Path, false, false) - if err != nil || response.Node == nil { - return "" - } - return response.Node.Value + m.stateLock.Lock() + defer m.stateLock.Unlock() + return m.state.MasterData } -func (m *MasterElection) setState(state int) { - log.Printf("masterelection: %s -> %s", - stateToString(m.State), - stateToString(state)) - // Order is important here: set state before triggering the - // update channel so that the receiver sees the right value. - m.State = state - if m.StateChange != nil { - m.StateChange <- state +func (m *MasterElection) setState(role Role, masterData string) { + state := State{ + Role: role, + MasterData: masterData, } -} - -func (m *MasterElection) stopper() { - <-m.stop - - // Tell the Run() goroutine to exit as soon as it can (we - // could have simply used the 'stop' channel there but Watch() - // ignores the stop channel if invoked without a receiving - // channel and we'd have to refactor Run() to use a temp - // channel for each invocation. - m.stopped = true - - // Remove the lock file if we are the master. - if m.State == STATE_MASTER { - log.Printf("releasing masterelection lock") - m.client.Delete(m.Path, false) + var changed bool + m.stateLock.Lock() + if changed = !m.state.Equal(state); changed { + log.Printf("%s: %s (%s) -> %s (%s)", m.LogPrefix, m.state.Role.String(), m.state.MasterData, role.String(), masterData) + m.state = state + } + m.stateLock.Unlock() + if changed && m.stateCh != nil { + m.stateCh <- state } } func (m *MasterElection) runMaster(index uint64) { - m.setState(STATE_MASTER) + m.setState(ROLE_MASTER, m.Data) // If we renew the lease every TTL / N, we allow N renewal // errors before we stop believing being the master. @@ -105,59 +115,77 @@ func (m *MasterElection) runMaster(index uint64) { select { case t := <-tick.C: // To verify that we actually are still the - // master (not just we believe we are), try - // a compare-and-swap operation to check that + // master (not just we believe we are), try a + // compare-and-swap operation to check that // the stored master address is still our own, // and no-one stole our lock. If not, the TTL // will be updated (and the lock renewed). - response, err := m.client.CompareAndSwap(m.Path, m.Data, m.TTL, m.Data, index) + resp, err := m.client.CompareAndSwap(m.Path, m.Data, m.TTL, m.Data, index) if err != nil { - log.Printf("error updating lock: %s", err) + log.Printf("%s: error updating lock: %s", m.LogPrefix, err) // If we can't renew the lock for a // TTL, we must assume we lost it. if t.Sub(lastUpdate) > ttl { - log.Printf("too many errors, lost lock") + log.Printf("%s: too many errors, lost lock", m.LogPrefix) return } } - index = response.EtcdIndex + index = resp.EtcdIndex lastUpdate = t case <-m.stop: + // Facilitate a master re-election by dropping + // the lock rather than letting it expire. + log.Printf("%s: releasing masterelection lock", m.LogPrefix) + m.client.Delete(m.Path, false) return } } } func (m *MasterElection) runSlave(index uint64) { - m.setState(STATE_SLAVE) + // It would be best if we could simply retrieve the master + // lock file at the specified index. But we can only Get from + // HEAD, so in case of a quick master transition we might + // retrieve data at an index greater than 'index'. + // + // In this case, we skip updating state, since the Watch call + // will immediately return (with the change that caused the + // index to increase). + resp, err := m.client.Get(m.Path, false, false) + if err == nil && resp.Node.ModifiedIndex <= index { + m.setState(ROLE_SLAVE, resp.Node.Value) + } for { - // Start a watch on the lock, waiting for its removal. - response, err := m.client.Watch(m.Path, index+1, false, nil, m.stop) + resp, err = m.client.Watch(m.Path, index+1, false, nil, m.stop) if err != nil { if err != etcd.ErrWatchStoppedByUser { - log.Printf("slave Watch() error: %+v", err) + log.Printf("%s: slave Watch() error: %v", m.LogPrefix, err) } return } - if response.Action == "delete" || response.Action == "expire" { + // If the lock has expired, or it has been removed, + // try to acquire it again. + if resp.Action == "delete" || resp.Action == "expire" { + log.Printf("%s: lock is gone", m.LogPrefix) return } - index = response.EtcdIndex + m.setState(ROLE_SLAVE, resp.Node.Value) + index = resp.EtcdIndex } - } func (m *MasterElection) Run() { - go m.stopper() - - // Start as a slave. - m.setState(STATE_SLAVE) - - for !m.stopped { + for { + // Quick non-blocking check for the stop channel. + select { + case <-m.stop: + return + default: + } // Try to acquire the lock. This call will only succeed // if the lockfile does not exist (either because it @@ -168,16 +196,16 @@ func (m *MasterElection) Run() { if err == nil { // Howdy, we're the master now. Wait a while // and renew our TTL. - log.Printf("masterelection: we are the master") + log.Printf("%s: we are the master", m.LogPrefix) m.runMaster(response.EtcdIndex) } else if etcdErr, ok := err.(*etcd.EtcdError); ok { // We're not the master. Wait until the lock // is deleted or expires. - log.Printf("masterelection: running as slave (%v)", etcdErr) + log.Printf("%s: running as slave (%v)", m.LogPrefix, etcdErr) m.runSlave(etcdErr.Index) } else { // An error of some other sort! Retry. - log.Printf("masterelection: unexpected error: %v", err) + log.Printf("%s: unexpected error: %v", m.LogPrefix, err) } } diff --git a/masterelection/masterelection_test.go b/masterelection/masterelection_test.go new file mode 100644 index 00000000..ae495ca3 --- /dev/null +++ b/masterelection/masterelection_test.go @@ -0,0 +1,67 @@ +package masterelection + +import ( + "fmt" + "math/rand" + "testing" + "time" + + "git.autistici.org/ale/autoradio/util" +) + +func init() { + rand.Seed(time.Now().Unix()) +} + +func countMasters(nodes []*MasterElection) int { + var masters int + for _, n := range nodes { + if n.IsMaster() { + masters++ + } + } + return masters +} + +func verifyMasterData(t *testing.T, nodes []*MasterElection) { + tmp := make(map[string]struct{}) + for _, n := range nodes { + tmp[n.GetMasterData()] = struct{}{} + } + if len(tmp) != 1 { + t.Errorf("master data propagation error: >1 values: %v", tmp) + } +} + +func TestMasterElection(t *testing.T) { + //etcd := util.NewTestEtcdClient() + etcd := util.NewTestEtcdClientWithLatency(20 * time.Millisecond) + lockPath := "/master/election/test" + + n := 5 + var nodes []*MasterElection + var stop []chan bool + for i := 0; i < n; i++ { + stopCh := make(chan bool) + m := NewMasterElection( + etcd, + lockPath, + fmt.Sprintf("%d", i), + 1, + nil, + stopCh) + m.LogPrefix = fmt.Sprintf("node%d: masterelection", i+1) + go m.Run() + nodes = append(nodes, m) + stop = append(stop, stopCh) + } + + for i := 0; i < n; i++ { + time.Sleep(100 * time.Millisecond) + if nm := countMasters(nodes[i:len(nodes)]); nm != 1 { + t.Errorf("@%d: masters=%d (expected <= 1)", i, nm) + } + verifyMasterData(t, nodes[i:len(nodes)]) + close(stop[i]) + } +} diff --git a/node/icecast.go b/node/icecast.go index dc41c065..93248e79 100644 --- a/node/icecast.go +++ b/node/icecast.go @@ -25,6 +25,12 @@ var ( icecastOk = instrumentation.NewGauge("icecast.ok") ) +type Controller interface { + Update(*clusterConfig, bool, net.IP) error + GetStatus() *IcecastStatus + Run() +} + // Icecast returns empty fields in our status handler, which we'll // need to turn into integers (the xml unmarshaler will return an // error in this specific case), so we use a separate type for @@ -51,14 +57,14 @@ type IcecastStatus struct { Up bool } -type IcecastController struct { +type icecastController struct { config *icecastConfig status *IcecastStatus stop chan bool } -func NewIcecastController(publicIp string, stop chan bool) *IcecastController { - return &IcecastController{ +func NewIcecastController(publicIp string, stop chan bool) *icecastController { + return &icecastController{ config: newIcecastConfig(publicIp), status: &IcecastStatus{}, stop: make(chan bool, 1), @@ -67,7 +73,7 @@ func NewIcecastController(publicIp string, stop chan bool) *IcecastController { // Reload the icecast daemon. Redirects output to our standard error // for debugging purposes. -func (ic *IcecastController) reload() error { +func (ic *icecastController) reload() error { cmd := exec.Command("/bin/sh", "-c", icecastReloadCmd) cmd.Stdout = os.Stderr cmd.Stderr = os.Stderr @@ -75,7 +81,7 @@ func (ic *IcecastController) reload() error { } // Kill sources connected to local streams. -func (ic *IcecastController) killSources(conf *clusterConfig) error { +func (ic *icecastController) killSources(conf *clusterConfig) error { var anyErr error client := &http.Client{} for _, m := range conf.ListMounts() { @@ -99,7 +105,7 @@ func (ic *IcecastController) killSources(conf *clusterConfig) error { } // Update reloads the Icecast daemon with a new configuration. -func (ic *IcecastController) Update(conf *clusterConfig, isMaster bool, masterAddr net.IP) error { +func (ic *icecastController) Update(conf *clusterConfig, isMaster bool, masterAddr net.IP) error { if !isMaster && masterAddr == nil { return errors.New("unknown system state") } @@ -128,11 +134,11 @@ func (ic *IcecastController) Update(conf *clusterConfig, isMaster bool, masterAd return ic.reload() } -func (ic *IcecastController) GetStatus() *IcecastStatus { +func (ic *icecastController) GetStatus() *IcecastStatus { return ic.status } -func (ic *IcecastController) statusUpdater() { +func (ic *icecastController) statusUpdater() { t := time.NewTicker(3 * time.Second) downStatus := &IcecastStatus{} for { @@ -152,7 +158,7 @@ func (ic *IcecastController) statusUpdater() { } } -func (ic *IcecastController) fetchStatus() (*IcecastStatus, error) { +func (ic *icecastController) fetchStatus() (*IcecastStatus, error) { resp, err := http.Get(fmt.Sprintf("http://localhost:%d%s", autoradio.IcecastPort, statusPage)) if err != nil { return nil, err @@ -161,7 +167,7 @@ func (ic *IcecastController) fetchStatus() (*IcecastStatus, error) { return ic.parseStatusPage(resp.Body) } -func (ic *IcecastController) parseStatusPage(input io.Reader) (*IcecastStatus, error) { +func (ic *icecastController) parseStatusPage(input io.Reader) (*IcecastStatus, error) { var ustatus icecastStatusUnparsed if err := xml.NewDecoder(input).Decode(&ustatus); err != nil { return nil, err @@ -201,6 +207,6 @@ func (ic *IcecastController) parseStatusPage(input io.Reader) (*IcecastStatus, e return &status, nil } -func (ic *IcecastController) Run() { +func (ic *icecastController) Run() { ic.statusUpdater() } diff --git a/node/node.go b/node/node.go index e937b9d5..53ccc0d7 100644 --- a/node/node.go +++ b/node/node.go @@ -223,7 +223,7 @@ type RadioNode struct { client autoradio.EtcdClient me *masterelection.MasterElection watcher *configWatcher - icecast *IcecastController + icecast Controller bw *bwmonitor.BandwidthUsageMonitor heartbeat uint64 upch chan bool @@ -239,7 +239,7 @@ func NewRadioNode(name string, ips []net.IP, netDev string, bwLimit float64, cli upch := make(chan bool, 1) // MasterElection changes trigger an update. - mech := make(chan int) + mech := make(chan masterelection.State) go func() { for _ = range mech { trigger(upch) diff --git a/node/node_test.go b/node/node_test.go new file mode 100644 index 00000000..2d5485e1 --- /dev/null +++ b/node/node_test.go @@ -0,0 +1,81 @@ +package node + +import ( + "fmt" + "net" + "testing" + "time" + + "git.autistici.org/ale/autoradio" + "git.autistici.org/ale/autoradio/util" +) + +type mockController struct { + mounts []*autoradio.Mount + isMaster bool + masterAddr net.IP +} + +func (m *mockController) Run() { +} + +func (m *mockController) Update(conf *clusterConfig, isMaster bool, masterAddr net.IP) error { + m.mounts = conf.ListMounts() + m.isMaster = isMaster + m.masterAddr = masterAddr + return nil +} + +func (m *mockController) GetStatus() *IcecastStatus { + return &IcecastStatus{Up: true} +} + +func startTestNodes(n int, etcd autoradio.EtcdClient) []*RadioNode { + var nodes []*RadioNode + + for i := 0; i < n; i++ { + node := NewRadioNode( + fmt.Sprintf("node%d", i+1), + []net.IP{net.ParseIP(fmt.Sprintf("127.0.0.%d", i+1))}, + "eth0", + 1000, + etcd) + node.icecast = &mockController{} + go node.Run() + nodes = append(nodes, node) + } + time.Sleep(1100 * time.Millisecond) + return nodes +} + +func loadTestData(etcd autoradio.EtcdClient) { + etcd.Set(autoradio.MountPrefix+"/test.ogg", + `{"Name": "/test.ogg", "Username": "source1", "Password": "foo"}`, + 86400) +} + +func countMasters(nodes []*RadioNode) int { + var masters int + for _, n := range nodes { + if n.me.IsMaster() { + masters++ + } + } + return masters +} + +func TestRadioNode_MasterElection(t *testing.T) { + etcd := util.NewTestEtcdClient() + loadTestData(etcd) + nodes := startTestNodes(3, etcd) + + // Shut down the nodes one by one, and verify that there is a + // single master among the remaining ones. + for i := 0; i < 3; i++ { + if nm := countMasters(nodes[i:len(nodes)]); nm != 1 { + t.Fatalf("@%d: masters=%d (expected 1)", i, nm) + } + nodes[i].Stop() + time.Sleep(10 * time.Millisecond) + } +} diff --git a/util/mock_etcd.go b/util/mock_etcd.go new file mode 100644 index 00000000..a7960e9d --- /dev/null +++ b/util/mock_etcd.go @@ -0,0 +1,193 @@ +// Etcd client mock for testing purposes. It tries to follow the same +// semantics as the actual etcd, at least as far as they are used by +// autoradio. Instead of talking to a remote etcd server, it uses an +// in-memory representation of the data. + +package util + +import ( + "errors" + "math/rand" + "strings" + "sync" + "time" + + "git.autistici.org/ale/autoradio" + "github.com/coreos/go-etcd/etcd" +) + +type datum struct { + value string + expire time.Time +} + +type testEtcdServer struct { + lock sync.Mutex + latency time.Duration + data map[string]datum + watches map[string][]chan *etcd.Response + index uint64 +} + +func NewTestEtcdClient() autoradio.EtcdClient { + return &testEtcdServer{ + data: make(map[string]datum), + watches: make(map[string][]chan *etcd.Response), + index: 1, + } +} + +func NewTestEtcdClientWithLatency(maxLatency time.Duration) autoradio.EtcdClient { + return &testEtcdServer{ + data: make(map[string]datum), + watches: make(map[string][]chan *etcd.Response), + index: 1, + latency: maxLatency, + } +} + +func (s *testEtcdServer) delay() { + if s.latency > 0 { + time.Sleep(time.Duration(rand.Int63n(int64(s.latency)))) + } +} + +func (s *testEtcdServer) trigger(action, key string) *etcd.Response { + resp := &etcd.Response{ + Action: action, + EtcdIndex: s.index, + Node: &etcd.Node{ + Key: key, + }, + } + if action != "delete" { + resp.Node.Value = s.data[key].value + } + for pfx, w := range s.watches { + if strings.HasPrefix(key, pfx) { + for _, ch := range w { + ch <- resp + } + } + } + s.index++ + return resp +} + +func (s *testEtcdServer) Create(key, value string, ttl uint64) (*etcd.Response, error) { + s.delay() + s.lock.Lock() + defer s.lock.Unlock() + + if _, ok := s.data[key]; ok { + return nil, &etcd.EtcdError{Message: "already there", Index: s.index} + } + + s.data[key] = datum{ + value: value, + expire: time.Now().Add(time.Duration(ttl) * time.Second), + } + return s.trigger("create", key), nil +} + +func (s *testEtcdServer) CompareAndSwap(key, value string, ttl uint64, oldvalue string, index uint64) (*etcd.Response, error) { + s.delay() + s.lock.Lock() + defer s.lock.Unlock() + if d, ok := s.data[key]; ok && d.value == oldvalue { + s.data[key] = datum{ + value: value, + expire: time.Now().Add(time.Duration(ttl) * time.Second), + } + return s.trigger("update", key), nil + } + return nil, &etcd.EtcdError{Message: "failed", Index: s.index} +} + +func (s *testEtcdServer) Delete(key string, recursive bool) (*etcd.Response, error) { + s.delay() + s.lock.Lock() + defer s.lock.Unlock() + delete(s.data, key) + return s.trigger("delete", key), nil +} + +func (s *testEtcdServer) Get(key string, recursive, boh bool) (*etcd.Response, error) { + s.delay() + s.lock.Lock() + defer s.lock.Unlock() + resp := &etcd.Response{ + EtcdIndex: s.index, + } + var nodes []*etcd.Node + keyDirPfx := key + "/" + for path, datum := range s.data { + if path == key || strings.HasPrefix(path, keyDirPfx) { + nodes = append(nodes, &etcd.Node{ + Key: path, + Value: datum.value, + }) + } + } + switch { + case len(nodes) == 0: + return nil, errors.New("not found") + case len(nodes) == 1 && nodes[0].Key == key: + resp.Node = nodes[0] + default: + resp.Node = &etcd.Node{ + Key: key, + Dir: true, + Nodes: nodes, + } + } + return resp, nil +} + +func (s *testEtcdServer) Set(key, value string, ttl uint64) (*etcd.Response, error) { + s.delay() + s.lock.Lock() + defer s.lock.Unlock() + s.data[key] = datum{ + value: value, + expire: time.Now().Add(time.Duration(ttl) * time.Second), + } + return s.trigger("set", key), nil +} + +func (s *testEtcdServer) Watch(key string, index uint64, recursive bool, respch chan *etcd.Response, stop chan bool) (*etcd.Response, error) { + ch := respch + if ch == nil { + ch = make(chan *etcd.Response, 1) + } + s.lock.Lock() + s.watches[key] = append(s.watches[key], ch) + s.lock.Unlock() + + var resp *etcd.Response + if respch != nil { + <-stop + } else { + select { + case resp = <-ch: + case <-stop: + } + } + + // Delete the watch. + s.lock.Lock() + var watches []chan *etcd.Response + for _, w := range s.watches[key] { + if w != ch { + watches = append(watches, w) + } + } + s.watches[key] = watches + s.lock.Unlock() + close(ch) + + if resp == nil { + return nil, etcd.ErrWatchStoppedByUser + } + return resp, nil +} -- GitLab