Skip to content
Snippets Groups Projects
Commit 03a67c17 authored by ale's avatar ale
Browse files

refactor masterelection to remove races; add tests

Master election code should be more robust now, this commit eliminates
the discrepancy between role and master info (previously it was possible
for these two to be non synchronized).

This commit also includes an in-memory etcd mock, which implements
enough of the etcd interface to test the masterelection code.
parent db305e75
No related branches found
No related tags found
No related merge requests found
......@@ -2,6 +2,7 @@ package masterelection
import (
"log"
"sync"
"time"
"git.autistici.org/ale/autoradio"
......@@ -9,91 +10,100 @@ import (
)
const (
STATE_SLAVE = iota
STATE_MASTER
ROLE_UNKNOWN = iota
ROLE_SLAVE
ROLE_MASTER
)
func stateToString(state int) string {
switch state {
case STATE_SLAVE:
type Role int
func (r Role) String() string {
switch r {
case ROLE_SLAVE:
return "slave"
case STATE_MASTER:
case ROLE_MASTER:
return "master"
}
return ""
return "unknown"
}
type State struct {
Role Role
MasterData string
}
func (s State) Equal(other State) bool {
return s.Role == other.Role && s.MasterData == other.MasterData
}
type MasterElection struct {
client autoradio.EtcdClient
stop chan bool
stopped bool
client autoradio.EtcdClient
stop chan bool
Data string
Path string
TTL uint64
State int
StateChange chan int
LogPrefix string
stateLock sync.Mutex
stateCh chan State
state State
}
func NewMasterElection(client autoradio.EtcdClient, path, data string, ttl uint64, sch chan int, stop chan bool) *MasterElection {
if ttl < 2 {
ttl = 2
func NewMasterElection(client autoradio.EtcdClient, path, data string, ttl uint64, sch chan State, stop chan bool) *MasterElection {
if ttl < 1 {
ttl = 1
}
return &MasterElection{
client: client,
Path: path,
Data: data,
TTL: ttl,
State: STATE_SLAVE,
StateChange: sch,
stop: stop,
client: client,
Path: path,
Data: data,
TTL: ttl,
state: State{Role: ROLE_UNKNOWN},
stateCh: sch,
stop: stop,
LogPrefix: "masterelection",
}
}
func (m *MasterElection) Valid() bool {
m.stateLock.Lock()
defer m.stateLock.Unlock()
return m.state.Role != ROLE_UNKNOWN
}
func (m *MasterElection) IsMaster() bool {
return m.State == STATE_MASTER
m.stateLock.Lock()
defer m.stateLock.Unlock()
return m.state.Role == ROLE_MASTER
}
func (m *MasterElection) GetMasterData() string {
response, err := m.client.Get(m.Path, false, false)
if err != nil || response.Node == nil {
return ""
}
return response.Node.Value
m.stateLock.Lock()
defer m.stateLock.Unlock()
return m.state.MasterData
}
func (m *MasterElection) setState(state int) {
log.Printf("masterelection: %s -> %s",
stateToString(m.State),
stateToString(state))
// Order is important here: set state before triggering the
// update channel so that the receiver sees the right value.
m.State = state
if m.StateChange != nil {
m.StateChange <- state
func (m *MasterElection) setState(role Role, masterData string) {
state := State{
Role: role,
MasterData: masterData,
}
}
func (m *MasterElection) stopper() {
<-m.stop
// Tell the Run() goroutine to exit as soon as it can (we
// could have simply used the 'stop' channel there but Watch()
// ignores the stop channel if invoked without a receiving
// channel and we'd have to refactor Run() to use a temp
// channel for each invocation.
m.stopped = true
// Remove the lock file if we are the master.
if m.State == STATE_MASTER {
log.Printf("releasing masterelection lock")
m.client.Delete(m.Path, false)
var changed bool
m.stateLock.Lock()
if changed = !m.state.Equal(state); changed {
log.Printf("%s: %s (%s) -> %s (%s)", m.LogPrefix, m.state.Role.String(), m.state.MasterData, role.String(), masterData)
m.state = state
}
m.stateLock.Unlock()
if changed && m.stateCh != nil {
m.stateCh <- state
}
}
func (m *MasterElection) runMaster(index uint64) {
m.setState(STATE_MASTER)
m.setState(ROLE_MASTER, m.Data)
// If we renew the lease every TTL / N, we allow N renewal
// errors before we stop believing being the master.
......@@ -105,59 +115,77 @@ func (m *MasterElection) runMaster(index uint64) {
select {
case t := <-tick.C:
// To verify that we actually are still the
// master (not just we believe we are), try
// a compare-and-swap operation to check that
// master (not just we believe we are), try a
// compare-and-swap operation to check that
// the stored master address is still our own,
// and no-one stole our lock. If not, the TTL
// will be updated (and the lock renewed).
response, err := m.client.CompareAndSwap(m.Path, m.Data, m.TTL, m.Data, index)
resp, err := m.client.CompareAndSwap(m.Path, m.Data, m.TTL, m.Data, index)
if err != nil {
log.Printf("error updating lock: %s", err)
log.Printf("%s: error updating lock: %s", m.LogPrefix, err)
// If we can't renew the lock for a
// TTL, we must assume we lost it.
if t.Sub(lastUpdate) > ttl {
log.Printf("too many errors, lost lock")
log.Printf("%s: too many errors, lost lock", m.LogPrefix)
return
}
}
index = response.EtcdIndex
index = resp.EtcdIndex
lastUpdate = t
case <-m.stop:
// Facilitate a master re-election by dropping
// the lock rather than letting it expire.
log.Printf("%s: releasing masterelection lock", m.LogPrefix)
m.client.Delete(m.Path, false)
return
}
}
}
func (m *MasterElection) runSlave(index uint64) {
m.setState(STATE_SLAVE)
// It would be best if we could simply retrieve the master
// lock file at the specified index. But we can only Get from
// HEAD, so in case of a quick master transition we might
// retrieve data at an index greater than 'index'.
//
// In this case, we skip updating state, since the Watch call
// will immediately return (with the change that caused the
// index to increase).
resp, err := m.client.Get(m.Path, false, false)
if err == nil && resp.Node.ModifiedIndex <= index {
m.setState(ROLE_SLAVE, resp.Node.Value)
}
for {
// Start a watch on the lock, waiting for its removal.
response, err := m.client.Watch(m.Path, index+1, false, nil, m.stop)
resp, err = m.client.Watch(m.Path, index+1, false, nil, m.stop)
if err != nil {
if err != etcd.ErrWatchStoppedByUser {
log.Printf("slave Watch() error: %+v", err)
log.Printf("%s: slave Watch() error: %v", m.LogPrefix, err)
}
return
}
if response.Action == "delete" || response.Action == "expire" {
// If the lock has expired, or it has been removed,
// try to acquire it again.
if resp.Action == "delete" || resp.Action == "expire" {
log.Printf("%s: lock is gone", m.LogPrefix)
return
}
index = response.EtcdIndex
m.setState(ROLE_SLAVE, resp.Node.Value)
index = resp.EtcdIndex
}
}
func (m *MasterElection) Run() {
go m.stopper()
// Start as a slave.
m.setState(STATE_SLAVE)
for !m.stopped {
for {
// Quick non-blocking check for the stop channel.
select {
case <-m.stop:
return
default:
}
// Try to acquire the lock. This call will only succeed
// if the lockfile does not exist (either because it
......@@ -168,16 +196,16 @@ func (m *MasterElection) Run() {
if err == nil {
// Howdy, we're the master now. Wait a while
// and renew our TTL.
log.Printf("masterelection: we are the master")
log.Printf("%s: we are the master", m.LogPrefix)
m.runMaster(response.EtcdIndex)
} else if etcdErr, ok := err.(*etcd.EtcdError); ok {
// We're not the master. Wait until the lock
// is deleted or expires.
log.Printf("masterelection: running as slave (%v)", etcdErr)
log.Printf("%s: running as slave (%v)", m.LogPrefix, etcdErr)
m.runSlave(etcdErr.Index)
} else {
// An error of some other sort! Retry.
log.Printf("masterelection: unexpected error: %v", err)
log.Printf("%s: unexpected error: %v", m.LogPrefix, err)
}
}
......
package masterelection
import (
"fmt"
"math/rand"
"testing"
"time"
"git.autistici.org/ale/autoradio/util"
)
func init() {
rand.Seed(time.Now().Unix())
}
func countMasters(nodes []*MasterElection) int {
var masters int
for _, n := range nodes {
if n.IsMaster() {
masters++
}
}
return masters
}
func verifyMasterData(t *testing.T, nodes []*MasterElection) {
tmp := make(map[string]struct{})
for _, n := range nodes {
tmp[n.GetMasterData()] = struct{}{}
}
if len(tmp) != 1 {
t.Errorf("master data propagation error: >1 values: %v", tmp)
}
}
func TestMasterElection(t *testing.T) {
//etcd := util.NewTestEtcdClient()
etcd := util.NewTestEtcdClientWithLatency(20 * time.Millisecond)
lockPath := "/master/election/test"
n := 5
var nodes []*MasterElection
var stop []chan bool
for i := 0; i < n; i++ {
stopCh := make(chan bool)
m := NewMasterElection(
etcd,
lockPath,
fmt.Sprintf("%d", i),
1,
nil,
stopCh)
m.LogPrefix = fmt.Sprintf("node%d: masterelection", i+1)
go m.Run()
nodes = append(nodes, m)
stop = append(stop, stopCh)
}
for i := 0; i < n; i++ {
time.Sleep(100 * time.Millisecond)
if nm := countMasters(nodes[i:len(nodes)]); nm != 1 {
t.Errorf("@%d: masters=%d (expected <= 1)", i, nm)
}
verifyMasterData(t, nodes[i:len(nodes)])
close(stop[i])
}
}
......@@ -25,6 +25,12 @@ var (
icecastOk = instrumentation.NewGauge("icecast.ok")
)
type Controller interface {
Update(*clusterConfig, bool, net.IP) error
GetStatus() *IcecastStatus
Run()
}
// Icecast returns empty fields in our status handler, which we'll
// need to turn into integers (the xml unmarshaler will return an
// error in this specific case), so we use a separate type for
......@@ -51,14 +57,14 @@ type IcecastStatus struct {
Up bool
}
type IcecastController struct {
type icecastController struct {
config *icecastConfig
status *IcecastStatus
stop chan bool
}
func NewIcecastController(publicIp string, stop chan bool) *IcecastController {
return &IcecastController{
func NewIcecastController(publicIp string, stop chan bool) *icecastController {
return &icecastController{
config: newIcecastConfig(publicIp),
status: &IcecastStatus{},
stop: make(chan bool, 1),
......@@ -67,7 +73,7 @@ func NewIcecastController(publicIp string, stop chan bool) *IcecastController {
// Reload the icecast daemon. Redirects output to our standard error
// for debugging purposes.
func (ic *IcecastController) reload() error {
func (ic *icecastController) reload() error {
cmd := exec.Command("/bin/sh", "-c", icecastReloadCmd)
cmd.Stdout = os.Stderr
cmd.Stderr = os.Stderr
......@@ -75,7 +81,7 @@ func (ic *IcecastController) reload() error {
}
// Kill sources connected to local streams.
func (ic *IcecastController) killSources(conf *clusterConfig) error {
func (ic *icecastController) killSources(conf *clusterConfig) error {
var anyErr error
client := &http.Client{}
for _, m := range conf.ListMounts() {
......@@ -99,7 +105,7 @@ func (ic *IcecastController) killSources(conf *clusterConfig) error {
}
// Update reloads the Icecast daemon with a new configuration.
func (ic *IcecastController) Update(conf *clusterConfig, isMaster bool, masterAddr net.IP) error {
func (ic *icecastController) Update(conf *clusterConfig, isMaster bool, masterAddr net.IP) error {
if !isMaster && masterAddr == nil {
return errors.New("unknown system state")
}
......@@ -128,11 +134,11 @@ func (ic *IcecastController) Update(conf *clusterConfig, isMaster bool, masterAd
return ic.reload()
}
func (ic *IcecastController) GetStatus() *IcecastStatus {
func (ic *icecastController) GetStatus() *IcecastStatus {
return ic.status
}
func (ic *IcecastController) statusUpdater() {
func (ic *icecastController) statusUpdater() {
t := time.NewTicker(3 * time.Second)
downStatus := &IcecastStatus{}
for {
......@@ -152,7 +158,7 @@ func (ic *IcecastController) statusUpdater() {
}
}
func (ic *IcecastController) fetchStatus() (*IcecastStatus, error) {
func (ic *icecastController) fetchStatus() (*IcecastStatus, error) {
resp, err := http.Get(fmt.Sprintf("http://localhost:%d%s", autoradio.IcecastPort, statusPage))
if err != nil {
return nil, err
......@@ -161,7 +167,7 @@ func (ic *IcecastController) fetchStatus() (*IcecastStatus, error) {
return ic.parseStatusPage(resp.Body)
}
func (ic *IcecastController) parseStatusPage(input io.Reader) (*IcecastStatus, error) {
func (ic *icecastController) parseStatusPage(input io.Reader) (*IcecastStatus, error) {
var ustatus icecastStatusUnparsed
if err := xml.NewDecoder(input).Decode(&ustatus); err != nil {
return nil, err
......@@ -201,6 +207,6 @@ func (ic *IcecastController) parseStatusPage(input io.Reader) (*IcecastStatus, e
return &status, nil
}
func (ic *IcecastController) Run() {
func (ic *icecastController) Run() {
ic.statusUpdater()
}
......@@ -223,7 +223,7 @@ type RadioNode struct {
client autoradio.EtcdClient
me *masterelection.MasterElection
watcher *configWatcher
icecast *IcecastController
icecast Controller
bw *bwmonitor.BandwidthUsageMonitor
heartbeat uint64
upch chan bool
......@@ -239,7 +239,7 @@ func NewRadioNode(name string, ips []net.IP, netDev string, bwLimit float64, cli
upch := make(chan bool, 1)
// MasterElection changes trigger an update.
mech := make(chan int)
mech := make(chan masterelection.State)
go func() {
for _ = range mech {
trigger(upch)
......
package node
import (
"fmt"
"net"
"testing"
"time"
"git.autistici.org/ale/autoradio"
"git.autistici.org/ale/autoradio/util"
)
type mockController struct {
mounts []*autoradio.Mount
isMaster bool
masterAddr net.IP
}
func (m *mockController) Run() {
}
func (m *mockController) Update(conf *clusterConfig, isMaster bool, masterAddr net.IP) error {
m.mounts = conf.ListMounts()
m.isMaster = isMaster
m.masterAddr = masterAddr
return nil
}
func (m *mockController) GetStatus() *IcecastStatus {
return &IcecastStatus{Up: true}
}
func startTestNodes(n int, etcd autoradio.EtcdClient) []*RadioNode {
var nodes []*RadioNode
for i := 0; i < n; i++ {
node := NewRadioNode(
fmt.Sprintf("node%d", i+1),
[]net.IP{net.ParseIP(fmt.Sprintf("127.0.0.%d", i+1))},
"eth0",
1000,
etcd)
node.icecast = &mockController{}
go node.Run()
nodes = append(nodes, node)
}
time.Sleep(1100 * time.Millisecond)
return nodes
}
func loadTestData(etcd autoradio.EtcdClient) {
etcd.Set(autoradio.MountPrefix+"/test.ogg",
`{"Name": "/test.ogg", "Username": "source1", "Password": "foo"}`,
86400)
}
func countMasters(nodes []*RadioNode) int {
var masters int
for _, n := range nodes {
if n.me.IsMaster() {
masters++
}
}
return masters
}
func TestRadioNode_MasterElection(t *testing.T) {
etcd := util.NewTestEtcdClient()
loadTestData(etcd)
nodes := startTestNodes(3, etcd)
// Shut down the nodes one by one, and verify that there is a
// single master among the remaining ones.
for i := 0; i < 3; i++ {
if nm := countMasters(nodes[i:len(nodes)]); nm != 1 {
t.Fatalf("@%d: masters=%d (expected 1)", i, nm)
}
nodes[i].Stop()
time.Sleep(10 * time.Millisecond)
}
}
// Etcd client mock for testing purposes. It tries to follow the same
// semantics as the actual etcd, at least as far as they are used by
// autoradio. Instead of talking to a remote etcd server, it uses an
// in-memory representation of the data.
package util
import (
"errors"
"math/rand"
"strings"
"sync"
"time"
"git.autistici.org/ale/autoradio"
"github.com/coreos/go-etcd/etcd"
)
type datum struct {
value string
expire time.Time
}
type testEtcdServer struct {
lock sync.Mutex
latency time.Duration
data map[string]datum
watches map[string][]chan *etcd.Response
index uint64
}
func NewTestEtcdClient() autoradio.EtcdClient {
return &testEtcdServer{
data: make(map[string]datum),
watches: make(map[string][]chan *etcd.Response),
index: 1,
}
}
func NewTestEtcdClientWithLatency(maxLatency time.Duration) autoradio.EtcdClient {
return &testEtcdServer{
data: make(map[string]datum),
watches: make(map[string][]chan *etcd.Response),
index: 1,
latency: maxLatency,
}
}
func (s *testEtcdServer) delay() {
if s.latency > 0 {
time.Sleep(time.Duration(rand.Int63n(int64(s.latency))))
}
}
func (s *testEtcdServer) trigger(action, key string) *etcd.Response {
resp := &etcd.Response{
Action: action,
EtcdIndex: s.index,
Node: &etcd.Node{
Key: key,
},
}
if action != "delete" {
resp.Node.Value = s.data[key].value
}
for pfx, w := range s.watches {
if strings.HasPrefix(key, pfx) {
for _, ch := range w {
ch <- resp
}
}
}
s.index++
return resp
}
func (s *testEtcdServer) Create(key, value string, ttl uint64) (*etcd.Response, error) {
s.delay()
s.lock.Lock()
defer s.lock.Unlock()
if _, ok := s.data[key]; ok {
return nil, &etcd.EtcdError{Message: "already there", Index: s.index}
}
s.data[key] = datum{
value: value,
expire: time.Now().Add(time.Duration(ttl) * time.Second),
}
return s.trigger("create", key), nil
}
func (s *testEtcdServer) CompareAndSwap(key, value string, ttl uint64, oldvalue string, index uint64) (*etcd.Response, error) {
s.delay()
s.lock.Lock()
defer s.lock.Unlock()
if d, ok := s.data[key]; ok && d.value == oldvalue {
s.data[key] = datum{
value: value,
expire: time.Now().Add(time.Duration(ttl) * time.Second),
}
return s.trigger("update", key), nil
}
return nil, &etcd.EtcdError{Message: "failed", Index: s.index}
}
func (s *testEtcdServer) Delete(key string, recursive bool) (*etcd.Response, error) {
s.delay()
s.lock.Lock()
defer s.lock.Unlock()
delete(s.data, key)
return s.trigger("delete", key), nil
}
func (s *testEtcdServer) Get(key string, recursive, boh bool) (*etcd.Response, error) {
s.delay()
s.lock.Lock()
defer s.lock.Unlock()
resp := &etcd.Response{
EtcdIndex: s.index,
}
var nodes []*etcd.Node
keyDirPfx := key + "/"
for path, datum := range s.data {
if path == key || strings.HasPrefix(path, keyDirPfx) {
nodes = append(nodes, &etcd.Node{
Key: path,
Value: datum.value,
})
}
}
switch {
case len(nodes) == 0:
return nil, errors.New("not found")
case len(nodes) == 1 && nodes[0].Key == key:
resp.Node = nodes[0]
default:
resp.Node = &etcd.Node{
Key: key,
Dir: true,
Nodes: nodes,
}
}
return resp, nil
}
func (s *testEtcdServer) Set(key, value string, ttl uint64) (*etcd.Response, error) {
s.delay()
s.lock.Lock()
defer s.lock.Unlock()
s.data[key] = datum{
value: value,
expire: time.Now().Add(time.Duration(ttl) * time.Second),
}
return s.trigger("set", key), nil
}
func (s *testEtcdServer) Watch(key string, index uint64, recursive bool, respch chan *etcd.Response, stop chan bool) (*etcd.Response, error) {
ch := respch
if ch == nil {
ch = make(chan *etcd.Response, 1)
}
s.lock.Lock()
s.watches[key] = append(s.watches[key], ch)
s.lock.Unlock()
var resp *etcd.Response
if respch != nil {
<-stop
} else {
select {
case resp = <-ch:
case <-stop:
}
}
// Delete the watch.
s.lock.Lock()
var watches []chan *etcd.Response
for _, w := range s.watches[key] {
if w != ch {
watches = append(watches, w)
}
}
s.watches[key] = watches
s.lock.Unlock()
close(ch)
if resp == nil {
return nil, etcd.ErrWatchStoppedByUser
}
return resp, nil
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment