From 4e9d3e98b4a18c9d619d257d41fe14a54f5cf654 Mon Sep 17 00:00:00 2001 From: ale <ale@incal.net> Date: Mon, 27 Jul 2015 13:37:55 +0100 Subject: [PATCH] use a common etcd component library for coordination tasks --- api.go | 117 +++---- coordination/etcdtest/fake_etcd.go | 275 +++++++++++++++ .../masterelection}/masterelection.go | 89 +++-- .../masterelection}/masterelection_test.go | 4 +- coordination/presence/cache.go | 86 +++++ coordination/presence/presence.go | 127 +++++++ coordination/watcher/cache.go | 106 ++++++ coordination/watcher/sync.go | 59 ++++ coordination/watcher/watcher.go | 145 ++++++++ coordination/watcher/watcher_test.go | 49 +++ etcd_client.go | 3 + fe/http_test.go | 4 +- node/icecast_config_test.go | 12 +- node/node.go | 331 ++++++------------ node/node_test.go | 35 +- util/mock_etcd.go | 199 ----------- 16 files changed, 1101 insertions(+), 540 deletions(-) create mode 100644 coordination/etcdtest/fake_etcd.go rename {masterelection => coordination/masterelection}/masterelection.go (63%) rename {masterelection => coordination/masterelection}/masterelection_test.go (93%) create mode 100644 coordination/presence/cache.go create mode 100644 coordination/presence/presence.go create mode 100644 coordination/watcher/cache.go create mode 100644 coordination/watcher/sync.go create mode 100644 coordination/watcher/watcher.go create mode 100644 coordination/watcher/watcher_test.go delete mode 100644 util/mock_etcd.go diff --git a/api.go b/api.go index 6dd8c9ad..a97ba294 100644 --- a/api.go +++ b/api.go @@ -9,8 +9,9 @@ import ( "fmt" "net" "strings" - "sync" "time" + + "git.autistici.org/ale/autoradio/coordination/presence" ) const ( @@ -32,15 +33,10 @@ const ( IcecastMountPrefix = "/_stream" ) -var ( - // IcecastPort is the port that the Icecast server will listen - // on. Since we fully manage the system-wide Icecast instance, - // there's not much point in making this configurable. - IcecastPort = 8000 - - ErrIsDirectory = errors.New("key is a directory") - ErrIsFile = errors.New("key is a file") -) +// IcecastPort is the port that the Icecast server will listen +// on. Since we fully manage the system-wide Icecast instance, +// there's not much point in making this configurable. +var IcecastPort = 8000 // EncodingParams used to re-encode a stream. type EncodingParams struct { @@ -253,52 +249,44 @@ func (ns *NodeStatus) NumListeners() int { return listeners } -// Cache the list of active nodes (the front-ends need to retrieve -// this information continuously, so we limit them to 2qps). -type nodesCache struct { - ttl time.Duration - nodes []*NodeStatus - deadline time.Time - lock sync.Mutex -} - -type getNodesFunc func() ([]*NodeStatus, error) - -func newNodesCache() *nodesCache { - return &nodesCache{ - ttl: 500 * time.Millisecond, - } -} - -// Get returns the cached value of 'fn', if valid. If the value is -// expired and we get an error from 'fn', we will attempt to return -// the previously cached value anyway, along with the error: the -// caller can then pick the right failure behavior. -func (nc *nodesCache) Get(fn getNodesFunc) ([]*NodeStatus, error) { - nc.lock.Lock() - defer nc.lock.Unlock() - - var err error - now := time.Now() - if now.After(nc.deadline) { - var nodes []*NodeStatus - if nodes, err = fn(); err == nil { - nc.nodes = nodes - nc.deadline = now.Add(nc.ttl) - } - } - return nc.nodes, err -} - // Client is the actual API to the streaming cluster's database. type Client struct { - client EtcdClient - activeNodesCache *nodesCache + client EtcdClient + presenceCache *presence.Cache } // NewClient creates and returns a new Client. func NewClient(client EtcdClient) *Client { - return &Client{client, newNodesCache()} + // The Client keeps a cache of node presence information, + // since it is likely that it will be accessed quite often (in + // the case of redirectord, on every request). + return &Client{ + client: client, + presenceCache: presence.NewCache(client, NodePrefix, 2*time.Second, func(data []string) interface{} { + // Convert a list of JSON-encoded NodeStatus + // objects into a lisce of *NodeStatus + // pointers. Since decoding can be a bit + // expensive if performed on every query, we + // only perform it when the data is updated. + tmp := make([]*NodeStatus, 0, len(data)) + for _, nodeData := range data { + var ns NodeStatus + if err := json.NewDecoder(strings.NewReader(nodeData)).Decode(&ns); err == nil { + tmp = append(tmp, &ns) + } + } + return tmp + }), + } +} + +// GetNodes returns the list of active cluster nodes. +func (r *Client) GetNodes() ([]*NodeStatus, error) { + data, err := r.presenceCache.Data() + if err != nil { + return nil, err + } + return data.([]*NodeStatus), nil } // GetMount returns data on a specific mountpoint (returns nil if not @@ -309,7 +297,7 @@ func (r *Client) GetMount(mountName string) (*Mount, error) { return nil, err } if response.Node.Dir { - return nil, ErrIsDirectory + return nil, errors.New("key is a directory") } var m Mount @@ -343,7 +331,7 @@ func (r *Client) ListMounts() ([]*Mount, error) { return nil, err } if !response.Node.Dir { - return nil, ErrIsFile + return nil, errors.New("key is a file") } result := make([]*Mount, 0, len(response.Node.Nodes)) @@ -375,7 +363,7 @@ func (r *Client) GetMasterInfo() (*MasterNodeInfo, error) { return nil, err } if response.Node.Dir { - return nil, ErrIsDirectory + return nil, errors.New("key is a directory") } var m MasterNodeInfo if err := json.NewDecoder(strings.NewReader(response.Node.Value)).Decode(&m); err != nil { @@ -384,29 +372,6 @@ func (r *Client) GetMasterInfo() (*MasterNodeInfo, error) { return &m, nil } -func (r *Client) doGetNodes() ([]*NodeStatus, error) { - response, err := r.client.Get(NodePrefix, false, false) - if err != nil || response.Node == nil { - return nil, err - } - if !response.Node.Dir { - return nil, ErrIsFile - } - result := make([]*NodeStatus, 0, len(response.Node.Nodes)) - for _, entry := range response.Node.Nodes { - var ns NodeStatus - if err := json.NewDecoder(strings.NewReader(entry.Value)).Decode(&ns); err == nil { - result = append(result, &ns) - } - } - return result, nil -} - -// GetNodes returns the list of active cluster nodes. -func (r *Client) GetNodes() ([]*NodeStatus, error) { - return r.activeNodesCache.Get(r.doGetNodes) -} - // GeneratePassword returns a new random password. func GeneratePassword() string { b := make([]byte, 6) diff --git a/coordination/etcdtest/fake_etcd.go b/coordination/etcdtest/fake_etcd.go new file mode 100644 index 00000000..b82eca0c --- /dev/null +++ b/coordination/etcdtest/fake_etcd.go @@ -0,0 +1,275 @@ +// Etcd client fake for testing purposes. It tries to follow the same +// semantics as the actual etcd (highligh on "tries to"), with a +// simple in-memory implementation. +// +package etcdtest + +import ( + "fmt" + "math/rand" + "path/filepath" + "strings" + "sync" + "time" + + "github.com/coreos/go-etcd/etcd" +) + +type datum struct { + value string + expire time.Time + index uint64 +} + +type FakeEtcdClient struct { + latency time.Duration + lock sync.Mutex + data map[string]datum + wlock sync.Mutex + watches map[string][]chan *etcd.Response + index uint64 +} + +func NewClient() *FakeEtcdClient { + return &FakeEtcdClient{ + data: make(map[string]datum), + watches: make(map[string][]chan *etcd.Response), + index: 1, + } +} + +func NewClientWithLatency(maxLatency time.Duration) *FakeEtcdClient { + return &FakeEtcdClient{ + data: make(map[string]datum), + watches: make(map[string][]chan *etcd.Response), + index: 1, + latency: maxLatency, + } +} + +func (s *FakeEtcdClient) delay() { + if s.latency > 0 { + time.Sleep(time.Duration(rand.Int63n(int64(s.latency)))) + } +} + +func (s *FakeEtcdClient) trigger(action, key string) *etcd.Response { + // Create the etcd Response, containing a single Node. + resp := &etcd.Response{ + Action: action, + EtcdIndex: s.index, + Node: &etcd.Node{ + Key: key, + }, + } + if action != "delete" { + resp.Node.Value = s.data[key].value + } + s.index++ + + // Send notifications to the watchers. To avoid difficult race + // conditions, we close the channel after having sent a response. + var toclear []string + for pfx, w := range s.watches { + if strings.HasPrefix(key, pfx) { + for _, ch := range w { + ch <- resp + close(ch) + } + toclear = append(toclear, pfx) + } + } + for _, pfx := range toclear { + delete(s.watches, pfx) + } + + //s.wlock.Unlock() + s.lock.Unlock() + + return resp +} + +func ttlToTime(ttl uint64) time.Time { + if ttl == 0 { + return time.Time{} + } + return time.Now().Add(time.Duration(ttl) * time.Second) +} + +func (s *FakeEtcdClient) AddChild(key, value string, ttl uint64) (*etcd.Response, error) { + uniqueId := fmt.Sprintf("%x-%x", time.Now().Unix(), rand.Int63()) + return s.Create(filepath.Join(key, uniqueId), value, ttl) +} + +func (s *FakeEtcdClient) Create(key, value string, ttl uint64) (*etcd.Response, error) { + s.delay() + s.lock.Lock() + if _, ok := s.data[key]; ok { + s.lock.Unlock() + return nil, &etcd.EtcdError{Message: "already there", Index: s.index} + } + + s.data[key] = datum{ + value: value, + expire: ttlToTime(ttl), + index: s.index, + } + return s.trigger("create", key), nil +} + +func (s *FakeEtcdClient) CompareAndDelete(key, oldvalue string, index uint64) (*etcd.Response, error) { + s.delay() + s.lock.Lock() + d, ok := s.data[key] + if ok && ((oldvalue == "" || d.value == oldvalue) || + (index == 0 || d.index <= index)) { + delete(s.data, key) + return s.trigger("delete", key), nil + } + s.lock.Unlock() + return nil, &etcd.EtcdError{Message: "failed", Index: s.index} +} + +func (s *FakeEtcdClient) CompareAndSwap(key, value string, ttl uint64, oldvalue string, index uint64) (*etcd.Response, error) { + s.delay() + s.lock.Lock() + d, ok := s.data[key] + if ok && ((oldvalue == "" || d.value == oldvalue) || + (index == 0 || d.index <= index)) { + s.data[key] = datum{ + value: value, + expire: ttlToTime(ttl), + index: s.index, + } + return s.trigger("update", key), nil + } + s.lock.Unlock() + return nil, &etcd.EtcdError{Message: "failed", Index: s.index} +} + +func (s *FakeEtcdClient) Delete(key string, recursive bool) (*etcd.Response, error) { + s.delay() + s.lock.Lock() + delete(s.data, key) + return s.trigger("delete", key), nil +} + +func (s *FakeEtcdClient) Get(key string, recursive, boh bool) (*etcd.Response, error) { + s.delay() + s.lock.Lock() + defer s.lock.Unlock() + now := time.Now() + resp := &etcd.Response{ + EtcdIndex: s.index, + } + var nodes []*etcd.Node + key = strings.TrimSuffix(key, "/") + keyDirPfx := key + "/" + for path, datum := range s.data { + if (path == key || strings.HasPrefix(path, keyDirPfx)) && + (datum.expire.IsZero() || datum.expire.After(now)) { + nodes = append(nodes, &etcd.Node{ + Key: path, + Value: datum.value, + }) + } + } + switch { + case len(nodes) == 0: + return nil, &etcd.EtcdError{Message: "not found", Index: s.index} + case len(nodes) == 1 && nodes[0].Key == key: + resp.Node = nodes[0] + default: + resp.Node = &etcd.Node{ + Key: key, + Dir: true, + Nodes: nodes, + } + } + return resp, nil +} + +func (s *FakeEtcdClient) Set(key, value string, ttl uint64) (*etcd.Response, error) { + s.delay() + s.lock.Lock() + s.data[key] = datum{ + value: value, + expire: ttlToTime(ttl), + index: s.index, + } + return s.trigger("set", key), nil +} + +func (s *FakeEtcdClient) SetDir(key string, ttl uint64) (*etcd.Response, error) { + // TODO. There are no directories. + return &etcd.Response{}, nil +} + +func (s *FakeEtcdClient) Update(key, value string, ttl uint64) (*etcd.Response, error) { + s.lock.Lock() + if _, ok := s.data[key]; !ok { + s.lock.Unlock() + return nil, &etcd.EtcdError{Message: fmt.Sprintf("key %s not found", key), Index: s.index} + } + s.data[key] = datum{ + value: value, + expire: ttlToTime(ttl), + index: s.index, + } + return s.trigger("update", key), nil +} + +func (s *FakeEtcdClient) Watch(key string, index uint64, recursive bool, respch chan *etcd.Response, stop chan bool) (*etcd.Response, error) { + // First scan the data to check for changes >= index. + s.lock.Lock() + var nodes []*etcd.Node + for k, d := range s.data { + if strings.HasPrefix(k, key) && d.index >= index { + nodes = append(nodes, &etcd.Node{ + Key: k, + Value: d.value, + }) + } + } + if len(nodes) > 0 { + // Bleargh, we create a fake 'dir' with all contents. + resp := &etcd.Response{ + Action: "update", + Node: &etcd.Node{ + Key: key, + Dir: true, + Nodes: nodes, + }, + EtcdIndex: s.index, + } + s.lock.Unlock() + if respch != nil { + respch <- resp + } + return resp, nil + } + + // Install the watch. + ch := respch + if ch == nil { + ch = make(chan *etcd.Response, 1) + } + s.watches[key] = append(s.watches[key], ch) + s.lock.Unlock() + + // Wait for a single response. + var resp *etcd.Response + if respch != nil { + <-stop + } else { + select { + case resp = <-ch: + case <-stop: + } + } + + if resp == nil { + return nil, etcd.ErrWatchStoppedByUser + } + return resp, nil +} diff --git a/masterelection/masterelection.go b/coordination/masterelection/masterelection.go similarity index 63% rename from masterelection/masterelection.go rename to coordination/masterelection/masterelection.go index 18ab41ee..56784397 100644 --- a/masterelection/masterelection.go +++ b/coordination/masterelection/masterelection.go @@ -6,81 +6,111 @@ import ( "sync" "time" - "git.autistici.org/ale/autoradio" "github.com/coreos/go-etcd/etcd" ) +// Possible values for Role. const ( - ROLE_UNKNOWN = iota - ROLE_SLAVE - ROLE_MASTER + RoleUnknown = iota + RoleSlave + RoleMaster ) +// Role of a participant in the master election protocol. type Role int func (r Role) String() string { switch r { - case ROLE_SLAVE: + case RoleSlave: return "slave" - case ROLE_MASTER: + case RoleMaster: return "master" } return "unknown" } +// State of the master election protocol (istantaneous snapshot for a +// node). It stores the role of the node, and the data associated with +// the current master. type State struct { Role Role MasterData string } +// Equal returns true if the two states are identical. func (s State) Equal(other State) bool { return s.Role == other.Role && s.MasterData == other.MasterData } -type MasterElection struct { - client autoradio.EtcdClient - - Data string - Path string - TTL uint64 +// EtcdClient is the etcd interface used by this package. +type EtcdClient interface { + Create(key string, value string, ttl uint64) (*etcd.Response, error) + Delete(key string, recursive bool) (*etcd.Response, error) + Get(key string, sort, recursive bool) (*etcd.Response, error) + CompareAndSwap(key string, value string, ttl uint64, prevValue string, prevIndex uint64) (*etcd.Response, error) + Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error) +} - Log *log.Logger +// MasterElection runs a master election protocol on top of etcd, +// using an etcd object as a lock. Multiple master elections can run +// at the same time on the same etcd instance just by using separate +// lock files. +// +// Each participant is identified by a unique blob of data, which is +// opaque to the master election protocol itself. The data associated +// with the currently elected master (if any) can be retrieved at any +// time by calling GetMasterData. +// +type MasterElection struct { + client EtcdClient + data string + path string + ttl uint64 stateLock sync.Mutex stateCh chan State state State + + // Logger for debug messages and state changes. By default + // it will send output to os.Stderr. + Log *log.Logger } // New creates a new MasterElection object that will establish a lock // on 'path'. It will send state transitions to 'sch', if provided. If // 'sch' is not nil, it will be closed when Run() terminates. -func New(client autoradio.EtcdClient, path, data string, ttl uint64, sch chan State) *MasterElection { +// +// All participants in the protocol should use the same ttl. +func New(client EtcdClient, path, data string, ttl uint64, sch chan State) *MasterElection { if ttl < 1 { ttl = 1 } return &MasterElection{ client: client, - Path: path, - Data: data, - TTL: ttl, - state: State{Role: ROLE_UNKNOWN}, + path: path, + data: data, + ttl: ttl, stateCh: sch, Log: log.New(os.Stderr, "masterelection: ", 0), } } +// Valid returns true if the protocol has elected a master. func (m *MasterElection) Valid() bool { m.stateLock.Lock() defer m.stateLock.Unlock() - return m.state.Role != ROLE_UNKNOWN + return m.state.Role != RoleUnknown } +// IsMaster returns true if this participant is the elected master. func (m *MasterElection) IsMaster() bool { m.stateLock.Lock() defer m.stateLock.Unlock() - return m.state.Role == ROLE_MASTER + return m.state.Role == RoleMaster } +// GetMasterData returns the opaque data associated with the currently +// elected master. func (m *MasterElection) GetMasterData() string { m.stateLock.Lock() defer m.stateLock.Unlock() @@ -105,11 +135,11 @@ func (m *MasterElection) setState(role Role, masterData string) { } func (m *MasterElection) runMaster(index uint64, stop chan bool) { - m.setState(ROLE_MASTER, m.Data) + m.setState(RoleMaster, m.data) // If we renew the lease every TTL / N, we allow N renewal // errors before we stop believing being the master. - ttl := time.Second * time.Duration(m.TTL) + ttl := time.Second * time.Duration(m.ttl) tick := time.NewTicker(ttl / 3) lastUpdate := time.Now() @@ -122,7 +152,7 @@ func (m *MasterElection) runMaster(index uint64, stop chan bool) { // the stored master address is still our own, // and no-one stole our lock. If not, the TTL // will be updated (and the lock renewed). - resp, err := m.client.CompareAndSwap(m.Path, m.Data, m.TTL, m.Data, index) + resp, err := m.client.CompareAndSwap(m.path, m.data, m.ttl, m.data, index) if err != nil { m.Log.Printf("error updating lock: %s", err) @@ -140,7 +170,7 @@ func (m *MasterElection) runMaster(index uint64, stop chan bool) { // Facilitate a master re-election by dropping // the lock rather than letting it expire. m.Log.Println("releasing masterelection lock") - m.client.Delete(m.Path, false) + m.client.Delete(m.path, false) return } } @@ -155,13 +185,13 @@ func (m *MasterElection) runSlave(index uint64, stop chan bool) { // In this case, we skip updating state, since the Watch call // will immediately return (with the change that caused the // index to increase). - resp, err := m.client.Get(m.Path, false, false) + resp, err := m.client.Get(m.path, false, false) if err == nil && resp.Node.ModifiedIndex <= index { - m.setState(ROLE_SLAVE, resp.Node.Value) + m.setState(RoleSlave, resp.Node.Value) } for { - resp, err = m.client.Watch(m.Path, index+1, false, nil, stop) + resp, err = m.client.Watch(m.path, index+1, false, nil, stop) if err != nil { if err != etcd.ErrWatchStoppedByUser { m.Log.Printf("slave Watch() error: %v", err) @@ -176,11 +206,12 @@ func (m *MasterElection) runSlave(index uint64, stop chan bool) { return } - m.setState(ROLE_SLAVE, resp.Node.Value) + m.setState(RoleSlave, resp.Node.Value) index = resp.EtcdIndex } } +// Run the master election protocol, until the stop channel is closed. func (m *MasterElection) Run(stop chan bool) { if m.stateCh != nil { defer close(m.stateCh) @@ -198,7 +229,7 @@ func (m *MasterElection) Run(stop chan bool) { // if the lockfile does not exist (either because it // expired, or the previous master exited gracefully and // deleted it). - response, err := m.client.Create(m.Path, m.Data, m.TTL) + response, err := m.client.Create(m.path, m.data, m.ttl) if err == nil { // Howdy, we're the master now. Wait a while diff --git a/masterelection/masterelection_test.go b/coordination/masterelection/masterelection_test.go similarity index 93% rename from masterelection/masterelection_test.go rename to coordination/masterelection/masterelection_test.go index e342f7e4..0a618c36 100644 --- a/masterelection/masterelection_test.go +++ b/coordination/masterelection/masterelection_test.go @@ -8,7 +8,7 @@ import ( "testing" "time" - "git.autistici.org/ale/autoradio/util" + "git.autistici.org/ale/autoradio/coordination/etcdtest" ) func init() { @@ -39,7 +39,7 @@ func verifyMasterData(t *testing.T, nodes []*MasterElection) { func TestMasterElection(t *testing.T) { //etcd := util.NewTestEtcdClient() - etcd := util.NewTestEtcdClientWithLatency(20 * time.Millisecond) + etcd := etcdtest.NewClientWithLatency(20 * time.Millisecond) lockPath := "/master/election/test" n := 5 diff --git a/coordination/presence/cache.go b/coordination/presence/cache.go new file mode 100644 index 00000000..3c67b2e9 --- /dev/null +++ b/coordination/presence/cache.go @@ -0,0 +1,86 @@ +package presence + +import ( + "errors" + "log" + "sync" + "time" +) + +type DecodeFunc func([]string) interface{} + +// Cache the results of Client.Nodes() for a short period of time. It +// will periodically refresh the list of nodes in the background, so +// callers have always access to the list, even if it might be stale. +// The refresh worker will keep the old copy of the data in case of +// errors. Since decoding is usually expensive, node data is only +// decoded once, with a user-provided function, when the data is +// updated. Decoded node data is stored as an opaque interface{} pointer. +type Cache struct { + *Client + decodeFn DecodeFunc + stop chan struct{} + loaded chan struct{} + + mx sync.RWMutex + data interface{} +} + +func NewCache(client EtcdClient, path string, refresh time.Duration, decodeFn DecodeFunc) *Cache { + c := &Cache{ + Client: NewClient(client, path), + decodeFn: decodeFn, + stop: make(chan struct{}), + loaded: make(chan struct{}, 1), + } + go c.run(refresh) + return c +} + +func (c *Cache) WaitForInit() { + <-c.loaded +} + +func (c *Cache) Close() { + close(c.stop) +} + +func (c *Cache) Data() (interface{}, error) { + c.mx.RLock() + defer c.mx.RUnlock() + if c.data == nil { + return nil, errors.New("presence cache not initialized yet") + } + return c.data, nil +} + +func (c *Cache) run(refresh time.Duration) { + tick := time.NewTicker(refresh) + errCount := 0 + + doUpdate := func() { + nodes, err := c.Client.Nodes() + if err != nil { + errCount++ + if errCount%300 == 0 { + log.Printf("presence: persistent error when retrieving node list: %v", err) + } + } else { + c.mx.Lock() + c.data = c.decodeFn(nodes) + c.mx.Unlock() + errCount = 0 + } + } + + doUpdate() + c.loaded <- struct{}{} + for { + select { + case <-tick.C: + doUpdate() + case <-c.stop: + return + } + } +} diff --git a/coordination/presence/presence.go b/coordination/presence/presence.go new file mode 100644 index 00000000..7c11c5b1 --- /dev/null +++ b/coordination/presence/presence.go @@ -0,0 +1,127 @@ +package presence + +import ( + "fmt" + "log" + "time" + + "github.com/coreos/go-etcd/etcd" +) + +// EtcdClient is the etcd client interface used by this package. +type EtcdClient interface { + AddChild(key string, value string, ttl uint64) (*etcd.Response, error) + Create(key string, value string, ttl uint64) (*etcd.Response, error) + Update(key string, value string, ttl uint64) (*etcd.Response, error) + Delete(key string, recursive bool) (*etcd.Response, error) + Get(key string, sort, recursive bool) (*etcd.Response, error) + SetDir(key string, ttl uint64) (*etcd.Response, error) +} + +// A Client can read the list of active nodes written by Presence workers. +type Client struct { + client EtcdClient + path string +} + +func NewClient(client EtcdClient, path string) *Client { + return &Client{ + client: client, + path: path, + } +} + +// Nodes returns a list of all the other running nodes. +func (c *Client) Nodes() ([]string, error) { + resp, err := c.client.Get(c.path, false, true) + if err != nil { + return nil, err + } + var out []string + for _, r := range resp.Node.Nodes { + out = append(out, r.Value) + } + return out, nil +} + +// StateFn is a function that returns the encoded state of the current node. +type StateFn func() string + +// Presence maintains a list of active nodes on etcd, identified by a +// 'tag'. Nodes can report a small amount of information, periodically +// updated, that will be available to the other nodes. +type Presence struct { + *Client + nodeKey string + stateFn StateFn + ttl uint64 + stop chan struct{} +} + +// New returns a new Presence worker that establishes node presence at +// the specified path. +func New(client EtcdClient, path string, stateFn StateFn, ttl uint64) *Presence { + return &Presence{ + Client: NewClient(client, path), + ttl: ttl, + stateFn: stateFn, + stop: make(chan struct{}), + } +} + +// Stop the presence worker and release all resources. +func (p *Presence) Stop() { + close(p.stop) + p.client.Delete(p.nodeKey, false) +} + +// Start the presence worker. +func (p *Presence) Start() error { + // Create the presence directory if it does not exist. + if _, err := p.client.SetDir(p.path, 0); err != nil { + return fmt.Errorf("could not create presence directory: %v", err) + } + + go p.run() + return nil +} + +func (p *Presence) run() { + // Run 3 times faster than the TTL. + ttl := time.Duration(p.ttl) * time.Second + tick := time.NewTicker(ttl / 3) + + doinit := true + lastUpdate := time.Now() + + for { + select { + case <-p.stop: + return + case t := <-tick.C: + if doinit { + // Create a new entry for the current node. + resp, err := p.client.AddChild(p.path, p.stateFn(), p.ttl) + if err != nil { + log.Printf("could not create presence node: %v", err) + continue + } + p.nodeKey = resp.Node.Key + log.Printf("presence at %s", p.nodeKey) + doinit = false + } else { + // Update current entry. + if _, err := p.client.Update(p.nodeKey, p.stateFn(), p.ttl); err != nil { + log.Printf("error updating presence: %v", err) + // If we have received errors + // for longer than a ttl, + // create a new unique file. + if t.Sub(lastUpdate) > ttl { + doinit = true + } + } + } + lastUpdate = t + } + } +} diff --git a/coordination/watcher/cache.go b/coordination/watcher/cache.go new file mode 100644 index 00000000..9372281d --- /dev/null +++ b/coordination/watcher/cache.go @@ -0,0 +1,106 @@ +package watcher + +import ( + "sync" + + "github.com/coreos/go-etcd/etcd" +) + +type DecodeFn func(string) interface{} + +type hasClose interface { + Close() error +} + +type Cache struct { + w *Watcher + decodeFn DecodeFn + + lock sync.RWMutex + index uint64 + data map[string]interface{} +} + +func defaultDecodeFn(value string) interface{} { + return value +} + +func NewCache(client EtcdClient, path string, decodeFn DecodeFn) *Cache { + if decodeFn == nil { + decodeFn = defaultDecodeFn + } + w := NewWatcher(client, path) + c := &Cache{ + w: w, + decodeFn: decodeFn, + data: make(map[string]interface{}), + } + respCh := w.Start() + go func() { + for resp := range respCh { + c.handleResponse(resp) + } + }() + return c +} + +func (c *Cache) Close() { + c.w.Stop() +} + +func flatten(nodes etcd.Nodes, n *etcd.Node) etcd.Nodes { + if n.Dir { + for _, nn := range n.Nodes { + nodes = flatten(nodes, nn) + } + } else { + nodes = append(nodes, n) + } + return nodes +} + +func (c *Cache) handleResponse(resp *etcd.Response) { + // Flatten out the node hierarchy. + nodes := flatten(nil, resp.Node) + + c.lock.Lock() + + switch resp.Action { + case "delete": + for _, n := range nodes { + if value, ok := c.data[n.Key]; ok { + // If the object in the cache has a + // Close method, invoke it in a + // separate goroutine. + if closer, ok := value.(hasClose); ok { + go closer.Close() + } + delete(c.data, n.Key) + } + } + + case "set", "create", "update": + for _, n := range nodes { + c.data[n.Key] = c.decodeFn(n.Value) + } + } + + c.index = resp.EtcdIndex + + c.lock.Unlock() +} + +func (c *Cache) Get(key string) (interface{}, bool) { + c.lock.RLock() + value, ok := c.data[key] + c.lock.RUnlock() + return value, ok +} + +func (c *Cache) GetWithIndex(key string) (interface{}, uint64, bool) { + c.lock.RLock() + index := c.index + value, ok := c.data[key] + c.lock.RUnlock() + return value, index, ok +} diff --git a/coordination/watcher/sync.go b/coordination/watcher/sync.go new file mode 100644 index 00000000..f055357f --- /dev/null +++ b/coordination/watcher/sync.go @@ -0,0 +1,59 @@ +package watcher + +import "github.com/coreos/go-etcd/etcd" + +// A Syncable is a key/value map that can mirror the contents of a +// subdirectory in the etcd database. +type Syncable interface { + Set(key, value string, index uint64) + Delete(key string) +} + +// Syncer watches a path for changes and keeps a Syncable synchronized +// with the contents of the etcd database. +type Syncer struct { + w *Watcher + target Syncable +} + +// NewSyncer creates a new Syncer object associated with the given +// Syncable. +func NewSyncer(client EtcdClient, path string, target Syncable) *Syncer { + w := NewWatcher(client, path) + s := &Syncer{ + w: w, + target: target, + } + respCh := w.Start() + go func() { + for resp := range respCh { + s.handleResponse(resp) + } + }() + w.WaitForInit() + return s +} + +// Close the Syncer and stop watching the database for changes. +func (s *Syncer) Close() { + s.w.Stop() +} + +func (s *Syncer) handleResponse(resp *etcd.Response) { + nodes := flatten(nil, resp.Node) + + //data, _ := json.Marshal(resp) + //log.Printf("Syncer: %s", string(data)) + + switch resp.Action { + case "delete": + for _, n := range nodes { + s.target.Delete(n.Key) + } + + case "get", "set", "create", "update": + for _, n := range nodes { + s.target.Set(n.Key, n.Value, resp.EtcdIndex) + } + } +} diff --git a/coordination/watcher/watcher.go b/coordination/watcher/watcher.go new file mode 100644 index 00000000..430c0bed --- /dev/null +++ b/coordination/watcher/watcher.go @@ -0,0 +1,145 @@ +package watcher + +import ( + "log" + "time" + + "github.com/coreos/go-etcd/etcd" +) + +// EtcdClient is the etcd client interface used by this package. +type EtcdClient interface { + Create(key string, value string, ttl uint64) (*etcd.Response, error) + Delete(key string, recursive bool) (*etcd.Response, error) + Get(key string, sort, recursive bool) (*etcd.Response, error) + CompareAndSwap(key string, value string, ttl uint64, prevValue string, prevIndex uint64) (*etcd.Response, error) + Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error) +} + +// A Watcher will listen for changes on an etcd directory tree, and it +// will report them to the caller on the channel returned by Start(). +type Watcher struct { + client EtcdClient + path string + stop chan bool + loaded chan struct{} +} + +// NewWatcher returns a new Watcher on the specified path. +func NewWatcher(client EtcdClient, path string) *Watcher { + return &Watcher{ + client: client, + path: path, + stop: make(chan bool), + loaded: make(chan struct{}, 1), + } +} + +// Start the watcher. Changes will be sent on the returned channel. +// The Watcher will close the channel when Stop() is called. +func (w *Watcher) Start() <-chan *etcd.Response { + c := make(chan *etcd.Response, 1) + go w.watcher(c) + return c +} + +// WaitForInit waits until the initial load of the database data has +// succeeded. +func (w *Watcher) WaitForInit() { + <-w.loaded +} + +// Stop the Watcher and release all resources. Does not wait for the +// watcher goroutine to terminate. +func (w *Watcher) Stop() { + close(w.stop) +} + +func (w *Watcher) load(c chan<- *etcd.Response) uint64 { + defer func() { + // Do a nonblocking send on 'loaded' because load() + // might be called more than once (for instance + // whenever Watch returns with a 401 error). + select { + case w.loaded <- struct{}{}: + default: + } + }() + + // Try to fetch the initial contents of the data below our + // root. Repeat until we succeed, or we get a non-permanent + // error from etcd. + for { + resp, err := w.client.Get(w.path, false, true) + if err != nil { + // TODO: check error code! + if etcdErr, ok := err.(*etcd.EtcdError); ok { + return etcdErr.Index + } + log.Printf("Get(%s) error: %v", w.path, err) + if waitOrStop(200*time.Millisecond, w.stop) { + return 0 + } + continue + } + + // Pretend it is a 'set' operation. + resp.Action = "set" + + c <- resp + + return resp.EtcdIndex + } +} + +func (w *Watcher) watcher(c chan<- *etcd.Response) { + defer close(c) + + // Avoid leaking the 'loaded' channel when we are stopped. + defer close(w.loaded) + + index := w.load(c) + + for { + resp, err := w.client.Watch(w.path, index+1, true, nil, w.stop) + if err == etcd.ErrWatchStoppedByUser { + return + } + if err != nil { + if etcdErr, ok := err.(*etcd.EtcdError); ok { + // If the index passed to Watch falls + // behind the current log position by + // more than 1000 entries (which is + // bound to happen regularly in + // presence of a sustained write + // rate), etcd will return an error + // 401 ("index out of date"). The safe + // thing to do is just to run a full + // load again, and resume watching + // from the current EtcdIndex. + if etcdErr.ErrorCode == 401 { + index = w.load(c) + continue + } + index = etcdErr.Index + } + log.Printf("Watch(%s) error: %v", w.path, err) + time.Sleep(200 * time.Millisecond) + continue + } + + index = resp.EtcdIndex + + c <- resp + } +} + +func waitOrStop(t time.Duration, stop <-chan bool) bool { + a := time.After(t) + select { + case <-a: + case <-stop: + return true + } + return false +} diff --git a/coordination/watcher/watcher_test.go b/coordination/watcher/watcher_test.go new file mode 100644 index 00000000..38ef7127 --- /dev/null +++ b/coordination/watcher/watcher_test.go @@ -0,0 +1,49 @@ +package watcher + +import ( + "fmt" + "testing" + "time" + + "git.autistici.org/ale/autoradio/coordination/etcdtest" +) + +func testCountChanges(t *testing.T, n int, loader func(EtcdClient)) { + etcd := etcdtest.NewClient() + w := NewWatcher(etcd, "/test/") + ch := w.Start() + w.WaitForInit() + + go func() { + loader(etcd) + time.Sleep(10 * time.Millisecond) + w.Stop() + }() + + var counter int + for resp := range ch { + counter += len(flatten(nil, resp.Node)) + } + if counter != n { + t.Fatalf("Lost some events: sent %d, recv %d", n, counter) + } +} + +func TestWatcher_LowQps(t *testing.T) { + n := 3 + testCountChanges(t, n, func(etcd EtcdClient) { + for i := 0; i < n; i++ { + time.Sleep(100 * time.Millisecond) + etcd.Create(fmt.Sprintf("/test/%d", i), "data", 0) + } + }) +} + +func TestWatcher_HighQps(t *testing.T) { + n := 1024 + testCountChanges(t, n, func(etcd EtcdClient) { + for i := 0; i < n; i++ { + etcd.Create(fmt.Sprintf("/test/%d", i), "data", 0) + } + }) +} diff --git a/etcd_client.go b/etcd_client.go index 46cb5ae2..51208436 100644 --- a/etcd_client.go +++ b/etcd_client.go @@ -88,10 +88,13 @@ func NewEtcdClient(strongReads bool) EtcdClient { // EtcdClient is the etcd client interface used by autoradio. Used to // decouple our code from the actual etcd API, for testing purposes. type EtcdClient interface { + AddChild(string, string, uint64) (*etcd.Response, error) Create(string, string, uint64) (*etcd.Response, error) CompareAndSwap(string, string, uint64, string, uint64) (*etcd.Response, error) Delete(string, bool) (*etcd.Response, error) Get(string, bool, bool) (*etcd.Response, error) Set(string, string, uint64) (*etcd.Response, error) + SetDir(string, uint64) (*etcd.Response, error) + Update(string, string, uint64) (*etcd.Response, error) Watch(string, uint64, bool, chan *etcd.Response, chan bool) (*etcd.Response, error) } diff --git a/fe/http_test.go b/fe/http_test.go index 28d6b737..f004c666 100644 --- a/fe/http_test.go +++ b/fe/http_test.go @@ -11,7 +11,7 @@ import ( "testing" "git.autistici.org/ale/autoradio" - "git.autistici.org/ale/autoradio/util" + "git.autistici.org/ale/autoradio/coordination/etcdtest" ) // Create a test target, replacing the local icecast daemon. Returns @@ -36,7 +36,7 @@ func createTestHttpRedirector(t *testing.T) *HttpRedirector { }, } - etcd := util.NewTestEtcdClient() + etcd := etcdtest.NewClient() etcd.Set(autoradio.MountPrefix+"test.ogg", `{"Name": "/test.ogg", "Username": "source1", "Password": "foo"}`, 86400) diff --git a/node/icecast_config_test.go b/node/icecast_config_test.go index 8794a57b..970c225e 100644 --- a/node/icecast_config_test.go +++ b/node/icecast_config_test.go @@ -170,17 +170,17 @@ func checkStrings(t *testing.T, got, want string) { func TestIcecastConfig(t *testing.T) { // Create a test config with a few different mount types. c := newClusterConfig() - c.setMountIfChanged(&autoradio.Mount{ + c.mounts["/stream1.ogg"] = &autoradio.Mount{ Name: "/stream1.ogg", Username: "user", Password: "pass", Fallback: "/fallback.ogg", - }) - c.setMountIfChanged(&autoradio.Mount{ + } + c.mounts["/stream2.ogg"] = &autoradio.Mount{ Name: "/stream2.ogg", RelayUrl: "http://example.com/stream2.ogg", - }) - c.setMountIfChanged(&autoradio.Mount{ + } + c.mounts["/stream3.mp3"] = &autoradio.Mount{ Name: "/stream3.mp3", Username: "user", Password: "pass", @@ -191,7 +191,7 @@ func TestIcecastConfig(t *testing.T) { SampleRate: 44100, Channels: 2, }, - }) + } // Test a relay config. ice := createTestIcecastConfig() diff --git a/node/node.go b/node/node.go index ede26550..dfdc0f74 100644 --- a/node/node.go +++ b/node/node.go @@ -12,14 +12,15 @@ import ( "time" "git.autistici.org/ale/autoradio" + "git.autistici.org/ale/autoradio/coordination/masterelection" + "git.autistici.org/ale/autoradio/coordination/presence" + "git.autistici.org/ale/autoradio/coordination/watcher" "git.autistici.org/ale/autoradio/instrumentation" - "git.autistici.org/ale/autoradio/masterelection" "git.autistici.org/ale/autoradio/node/bwmonitor" - "github.com/coreos/go-etcd/etcd" ) var ( - masterElectionTtl = flag.Int("master-election-ttl", 5, "TTL for the master election protocol (s)") + masterElectionTTL = flag.Int("master-election-ttl", 5, "TTL for the master election protocol (s)") nodeHeartbeat = flag.Int("heartbeat", 3, "Period for the node presence heartbeat (s)") icecastReloadErrors = instrumentation.NewCounter("icecast.reload_errors") @@ -27,9 +28,9 @@ var ( configIndex = instrumentation.NewGauge("config.etcd_index") ) -func trigger(c chan bool) { +func trigger(c chan struct{}) { select { - case c <- true: + case c <- struct{}{}: default: } } @@ -54,6 +55,10 @@ func newClusterConfig() *clusterConfig { } } +// Delete implements the watcher.Syncable interface. +func (c *clusterConfig) Delete(key string) { +} + // TODO: remove? func (c *clusterConfig) GetMount(name string) *autoradio.Mount { c.lock.Lock() @@ -71,171 +76,58 @@ func (c *clusterConfig) ListMounts() []*autoradio.Mount { return result } -// Update a mount (in-memory only). Returns true if the new value is -// different than the old one. -func (c *clusterConfig) setMountIfChanged(m *autoradio.Mount) bool { - c.lock.Lock() - defer c.lock.Unlock() - if prev, ok := c.mounts[m.Name]; ok && prev.Equal(m) { - return false - } - c.mounts[m.Name] = m - return true -} - -// Delete a mount (in-memory only). Returns true if the mount existed -// in our configuration and was deleted successfully. -func (c *clusterConfig) delMount(name string) bool { - c.lock.Lock() - defer c.lock.Unlock() - if _, ok := c.mounts[name]; ok { - delete(c.mounts, name) - return true - } - return false -} - -// Keeps the in-memory service configuration in sync with the etcd -// database. An update channel is triggered whenever the data changes. +// Keeps the in-memory service configuration (clusterConfig) in sync +// with the etcd database. An update channel is triggered whenever the +// data changes. type configWatcher struct { + *clusterConfig client autoradio.EtcdClient - config *clusterConfig - upch chan bool + update chan struct{} } -func newConfigWatcher(client autoradio.EtcdClient, config *clusterConfig, upch chan bool) *configWatcher { - return &configWatcher{ - client: client, - config: config, - upch: upch, - } +func newConfigWatcher(client autoradio.EtcdClient, config *clusterConfig, update chan struct{}) *watcher.Syncer { + return watcher.NewSyncer(client, autoradio.MountPrefix, &configWatcher{ + clusterConfig: config, + client: client, + update: update, + }) } -// Returns true if the mount configuration has been modified. -func (w *configWatcher) updateMount(key, value string, index uint64) bool { - mountName := keyToMount(key) +// Set implements the watcher.Syncable interface. It will update the +// clusterConfig only if the data actually changed. +func (c *configWatcher) Set(key, value string, index uint64) { + name := keyToMount(key) + var m autoradio.Mount if err := json.NewDecoder(strings.NewReader(value)).Decode(&m); err != nil { - log.Printf("error updating mount %s [@%d]: corrupted data: %v", mountName, index, err) - } else { - if w.config.setMountIfChanged(&m) { - log.Printf("updated mount %s [@%d]: %s", keyToMount(key), index, value) - return true - } - } - return false -} - -func (w *configWatcher) watcher(index uint64, stop chan bool) { - for { - configIndex.Set(int64(index)) - resp, err := w.client.Watch(autoradio.MountPrefix, index+1, true, nil, stop) - if err != nil { - if err == etcd.ErrWatchStoppedByUser { - return - } - if etcdErr, ok := err.(*etcd.EtcdError); ok { - // If the index passed to Watch falls - // behind the current log position by - // more than 1000 entries (which is - // bound to happen regularly in - // presence of a sustained write - // rate), etcd will return an error - // 401 ("index out of date"). The safe - // thing to do is just to run a full - // load again, and resume watching - // from the current EtcdIndex. - if etcdErr.ErrorCode == 401 { - index = w.load(stop) - continue - } - // Log other etcd errors. - log.Printf("config Watch(): %v", err) - // Follow the etcd index even in case - // of errors. - index = etcdErr.Index - } else { - // Generic error, just retry the call. - log.Printf("config Watch(): %v", err) - } - time.Sleep(200 * time.Millisecond) - continue - } - - index = resp.EtcdIndex - - switch resp.Action { - case "delete": - mountName := keyToMount(resp.Node.Key) - log.Printf("deleted mount %s", mountName) - if !w.config.delMount(mountName) { - // Do not trigger an update if the - // operation had no effect. - continue - } - case "set", "create", "update": - if !w.updateMount(resp.Node.Key, resp.Node.Value, index) { - // Do not trigger an update if the - // update had no effect. - continue - } - default: - continue - } - - // Trigger an update. - trigger(w.upch) + log.Printf("error updating mount %s [@%d]: corrupted data: %v", name, index, err) + return } -} -// Load full configuration from etcd. It will wait until the -// configuration can be read successfully, and it will trigger the -// update channel after it's done. -func (w *configWatcher) load(stop chan bool) uint64 { - var err error - var resp *etcd.Response - for { - if resp, err = w.client.Get(autoradio.MountPrefix, false, false); err != nil { - log.Printf("config load error: %v", err) - } else if resp.Node == nil || !resp.Node.Dir { - log.Printf("config load error: no configuration found") - } else { - break - } - - // Wait 1 second, but watch the stop channel. - select { - case <-time.After(1 * time.Second): - case <-stop: - return 0 - } - } - - index := resp.EtcdIndex - changed := false - for _, n := range resp.Node.Nodes { - if w.updateMount(n.Key, n.Value, index) { - changed = true - } - } - - if changed { - // Update the icecast daemon now that we have a full config. - trigger(w.upch) + c.lock.Lock() + defer c.lock.Unlock() + if prev, ok := c.mounts[name]; ok && prev.Equal(&m) { + // No changes. + return } - return index + c.mounts[name] = &m + log.Printf("updated mount %s [@%d]: %s", name, index, value) + // trigger update here + trigger(c.update) } -// Start the configWatcher in the background. It will wait for -// initialization to complete, so that when this function returns, the -// in-memory configuration has already been fully synchronized. -func (w *configWatcher) Start(stop chan bool) func(chan bool) { - log.Printf("loading configuration") - index := w.load(stop) - return func(stop chan bool) { - w.watcher(index, stop) +// Delete implements the watcher.Syncable interface. +func (c *configWatcher) Delete(key string) { + name := keyToMount(key) + + c.lock.Lock() + defer c.lock.Unlock() + if _, ok := c.mounts[name]; ok { + delete(c.mounts, name) } + // trigger update here + trigger(c.update) } // Private interfaces for process controllers. These are used to @@ -254,35 +146,51 @@ type transcodingController interface { // Factory for transcodingControllers. type transcodingControllerFunc func(*liquidsoapParams) (transcodingController, error) -// An active streaming node, managing the local icecast server. +// RadioNode is an active streaming node, managing the local icecast server. type RadioNode struct { - client autoradio.EtcdClient - config *clusterConfig - name string - ips []net.IP - me *masterelection.MasterElection - watcher *configWatcher + wg sync.WaitGroup + client autoradio.EtcdClient + + name string + ips []net.IP + config *clusterConfig + + me *masterelection.MasterElection + syncer *watcher.Syncer + bw *bwmonitor.BandwidthUsageMonitor + icecast controller - transcoderFn transcodingControllerFunc - bw *bwmonitor.BandwidthUsageMonitor maxListeners int - heartbeat uint64 - reloadDelay time.Duration - upch chan bool - stop chan bool - wg sync.WaitGroup - Log *log.Logger + + // Node presence heartbeat. + presence *presence.Presence + + // How often to restart the Icecast daemon. + reloadDelay time.Duration + + // Generator for transcodingControllers. Exposed as a member + // so that it can be stubbed out during tests. + transcoderFn transcodingControllerFunc + + // A note on channel types used for signaling: while I + // personally prefer struct{} chans, the etcd interface for + // Watch makes it convenient to use bool stop channels + // throughout the application. + upch chan struct{} + stop chan bool + + // Logger for debug messages and state changes. + Log *log.Logger } +// NewRadioNode creates and initializes a new autoradio node. func NewRadioNode(name string, ips []net.IP, netDev string, bwLimit float64, maxListeners int, client autoradio.EtcdClient) *RadioNode { - config := newClusterConfig() - // Global 'stop' channel. stopch := make(chan bool) // Network updates trigger icecast reconfiguration. This // channel is used as an event signal. - upch := make(chan bool, 1) + upch := make(chan struct{}, 1) // MasterElection changes trigger an update. mech := make(chan masterelection.State) @@ -308,7 +216,8 @@ func NewRadioNode(name string, ips []net.IP, netDev string, bwLimit float64, max // for some headroom in the front-end traffic control // computations (if everything goes well, connections should // never be rejected by Icecast). - return &RadioNode{ + config := newClusterConfig() + rc := &RadioNode{ config: config, name: name, ips: ips, @@ -317,56 +226,40 @@ func NewRadioNode(name string, ips []net.IP, netDev string, bwLimit float64, max client, autoradio.MasterElectionPath, string(minfodata), - uint64(*masterElectionTtl), + uint64(*masterElectionTTL), mech), - watcher: newConfigWatcher(client, config, upch), + syncer: newConfigWatcher(client, config, upch), icecast: NewIcecastController(name, maxListeners*2), transcoderFn: func(p *liquidsoapParams) (transcodingController, error) { return newLiquidsoap(p) }, reloadDelay: 1000 * time.Millisecond, - heartbeat: uint64(*nodeHeartbeat), bw: bwmonitor.NewBandwidthUsageMonitor(netDev, bwLimit), maxListeners: maxListeners, upch: upch, stop: stopch, Log: log.New(os.Stderr, "node: ", 0), } + rc.presence = presence.New(client, autoradio.NodePrefix, rc.getNodeStatus, uint64(*nodeHeartbeat)) + return rc } -// The presence goroutine periodically updates our entry in the list -// of nodes with the current node statistics. -func (rc *RadioNode) presence(stop chan bool) { - ticker := time.NewTicker(time.Duration(rc.heartbeat) * time.Second / 3) - - // Register ourselves using the node name. - key := autoradio.NodePrefix + rc.name - - for { - select { - case <-ticker.C: - // Build our NodeStatus. - icecastStatus := rc.icecast.GetStatus() - nodeStatus := autoradio.NodeStatus{ - Name: rc.name, - IP: rc.ips, - IcecastUp: icecastStatus.Up, - Mounts: icecastStatus.Mounts, - BandwidthUsage: rc.bw.GetUsage(), - MaxListeners: rc.maxListeners, - } - - // Update our node entry in the database. - var buf bytes.Buffer - json.NewEncoder(&buf).Encode(&nodeStatus) - if _, err := rc.client.Set(key, buf.String(), rc.heartbeat); err != nil { - rc.Log.Printf("presence: Set(): %v", err) - } - - case <-stop: - return - } +func (rc *RadioNode) getNodeStatus() string { + // Build our NodeStatus. + icecastStatus := rc.icecast.GetStatus() + nodeStatus := autoradio.NodeStatus{ + Name: rc.name, + IP: rc.ips, + IcecastUp: icecastStatus.Up, + Mounts: icecastStatus.Mounts, + BandwidthUsage: rc.bw.GetUsage(), + MaxListeners: rc.maxListeners, } + + // Update our node entry in the database. + var buf bytes.Buffer + json.NewEncoder(&buf).Encode(&nodeStatus) + return buf.String() } // Get a valid IP address for the current master node (to be passed to @@ -467,21 +360,16 @@ func (rc *RadioNode) updater(stop chan bool) { // Start the node. Returns immediately. func (rc *RadioNode) Start() { - // Bootstrap the config watcher first. This ensures that we - // have a full configuration (thanks to the Get() call) before - // we start managing the icecast server. - watcherFn := rc.watcher.Start(rc.stop) + // Starting the presence runner might fail - raise a fatal + // error in that case. + if err := rc.presence.Start(); err != nil { + log.Fatal(err) + } // Start auxiliary services and event listeners on their own // goroutines. Put them all in a WaitGroup so we can wait for // their termination. bgfuncs := []func(chan bool){ - // Config watcher. - watcherFn, - - // Presence heartbeat. - rc.presence, - // Icecast updater, rc.updater, @@ -521,6 +409,7 @@ func (rc *RadioNode) Run() { // Stop everything. func (rc *RadioNode) Stop() { + rc.presence.Stop() close(rc.stop) } @@ -570,7 +459,7 @@ func (t *transcoder) run() { t.client, autoradio.TranscoderMasterElectionBase+t.params.TargetMount, t.nodeName, - uint64(*masterElectionTtl), + uint64(*masterElectionTTL), update) go me.Run(mestop) defer close(mestop) @@ -579,7 +468,7 @@ func (t *transcoder) run() { for { select { case state := <-update: - if state.Role == masterelection.ROLE_MASTER { + if state.Role == masterelection.RoleMaster { t.liquidsoap.Start() running = true } else if running { diff --git a/node/node_test.go b/node/node_test.go index 6dcfc9be..856e1cc2 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -8,7 +8,7 @@ import ( "time" "git.autistici.org/ale/autoradio" - "git.autistici.org/ale/autoradio/util" + "git.autistici.org/ale/autoradio/coordination/etcdtest" ) type mockController struct { @@ -95,7 +95,7 @@ func loadTestData(etcd autoradio.EtcdClient) { func TestRadioNode_MasterElection(t *testing.T) { globalMockTranscoder.Reset() - etcd := util.NewTestEtcdClient() + etcd := etcdtest.NewClient() loadTestData(etcd) nodes := startTestNodes(3, etcd) @@ -132,7 +132,7 @@ func TestRadioNode_MasterElection(t *testing.T) { } func TestRadioNode_ConfigChangePropagation(t *testing.T) { - etcd := util.NewTestEtcdClient() + etcd := etcdtest.NewClient() loadTestData(etcd) nodes := startTestNodes(3, etcd) @@ -169,7 +169,7 @@ func TestRadioNode_ConfigChangePropagation(t *testing.T) { } func TestRadioNode_UpdatesDoNotTriggerIfNothingChanged(t *testing.T) { - etcd := util.NewTestEtcdClient() + etcd := etcdtest.NewClient() loadTestData(etcd) node := startTestNodes(1, etcd)[0] @@ -188,7 +188,7 @@ func TestRadioNode_UpdatesDoNotTriggerIfNothingChanged(t *testing.T) { func TestRadioNode_TranscoderMasterElection(t *testing.T) { globalMockTranscoder.Reset() - etcd := util.NewTestEtcdClient() + etcd := etcdtest.NewClient() loadTestData(etcd) // Load a transcoding mount. @@ -215,3 +215,28 @@ func TestRadioNode_TranscoderMasterElection(t *testing.T) { t.Errorf("transcoder was started/stopped an unequal number of times: start=%d, stop=%d", globalMockTranscoder.startCount, globalMockTranscoder.stopCount) } } + +func TestRadioNode_Presence(t *testing.T) { + etcd := etcdtest.NewClient() + loadTestData(etcd) + *nodeHeartbeat = 1 + nodes := startTestNodes(3, etcd) + client := autoradio.NewClient(etcd) + + time.Sleep(2000 * time.Millisecond) + + defer func() { + for _, n := range nodes { + n.Stop() + n.Wait() + } + }() + + result, err := client.GetNodes() + if err != nil { + t.Fatal(err) + } + if len(result) != 3 { + t.Fatalf("Some nodes did not report presence: %+v", result) + } +} diff --git a/util/mock_etcd.go b/util/mock_etcd.go deleted file mode 100644 index 32cf1de5..00000000 --- a/util/mock_etcd.go +++ /dev/null @@ -1,199 +0,0 @@ -// Etcd client mock for testing purposes. It tries to follow the same -// semantics as the actual etcd, at least as far as they are used by -// autoradio. Instead of talking to a remote etcd server, it uses an -// in-memory representation of the data. - -package util - -import ( - "errors" - "math/rand" - "strings" - "sync" - "time" - - "git.autistici.org/ale/autoradio" - "github.com/coreos/go-etcd/etcd" -) - -type datum struct { - value string - expire time.Time -} - -type testEtcdServer struct { - latency time.Duration - lock sync.Mutex - data map[string]datum - wlock sync.Mutex - watches map[string][]chan *etcd.Response - index uint64 -} - -func NewTestEtcdClient() autoradio.EtcdClient { - return &testEtcdServer{ - data: make(map[string]datum), - watches: make(map[string][]chan *etcd.Response), - index: 1, - } -} - -func NewTestEtcdClientWithLatency(maxLatency time.Duration) autoradio.EtcdClient { - return &testEtcdServer{ - data: make(map[string]datum), - watches: make(map[string][]chan *etcd.Response), - index: 1, - latency: maxLatency, - } -} - -func (s *testEtcdServer) delay() { - if s.latency > 0 { - time.Sleep(time.Duration(rand.Int63n(int64(s.latency)))) - } -} - -func (s *testEtcdServer) trigger(action, key string) *etcd.Response { - resp := &etcd.Response{ - Action: action, - EtcdIndex: s.index, - Node: &etcd.Node{ - Key: key, - }, - } - if action != "delete" { - resp.Node.Value = s.data[key].value - } - s.index++ - s.lock.Unlock() - - go func() { - s.wlock.Lock() - for pfx, w := range s.watches { - if strings.HasPrefix(key, pfx) { - for _, ch := range w { - ch <- resp - } - } - } - s.wlock.Unlock() - }() - - return resp -} - -func (s *testEtcdServer) Create(key, value string, ttl uint64) (*etcd.Response, error) { - s.delay() - s.lock.Lock() - if _, ok := s.data[key]; ok { - s.lock.Unlock() - return nil, &etcd.EtcdError{Message: "already there", Index: s.index} - } - - s.data[key] = datum{ - value: value, - expire: time.Now().Add(time.Duration(ttl) * time.Second), - } - return s.trigger("create", key), nil -} - -func (s *testEtcdServer) CompareAndSwap(key, value string, ttl uint64, oldvalue string, index uint64) (*etcd.Response, error) { - s.delay() - s.lock.Lock() - if d, ok := s.data[key]; ok && d.value == oldvalue { - s.data[key] = datum{ - value: value, - expire: time.Now().Add(time.Duration(ttl) * time.Second), - } - return s.trigger("update", key), nil - } - s.lock.Unlock() - return nil, &etcd.EtcdError{Message: "failed", Index: s.index} -} - -func (s *testEtcdServer) Delete(key string, recursive bool) (*etcd.Response, error) { - s.delay() - s.lock.Lock() - delete(s.data, key) - return s.trigger("delete", key), nil -} - -func (s *testEtcdServer) Get(key string, recursive, boh bool) (*etcd.Response, error) { - s.delay() - s.lock.Lock() - defer s.lock.Unlock() - resp := &etcd.Response{ - EtcdIndex: s.index, - } - var nodes []*etcd.Node - key = strings.TrimSuffix(key, "/") - keyDirPfx := key + "/" - for path, datum := range s.data { - if path == key || strings.HasPrefix(path, keyDirPfx) { - nodes = append(nodes, &etcd.Node{ - Key: path, - Value: datum.value, - }) - } - } - switch { - case len(nodes) == 0: - return nil, errors.New("not found") - case len(nodes) == 1 && nodes[0].Key == key: - resp.Node = nodes[0] - default: - resp.Node = &etcd.Node{ - Key: key, - Dir: true, - Nodes: nodes, - } - } - return resp, nil -} - -func (s *testEtcdServer) Set(key, value string, ttl uint64) (*etcd.Response, error) { - s.delay() - s.lock.Lock() - s.data[key] = datum{ - value: value, - expire: time.Now().Add(time.Duration(ttl) * time.Second), - } - return s.trigger("set", key), nil -} - -func (s *testEtcdServer) Watch(key string, index uint64, recursive bool, respch chan *etcd.Response, stop chan bool) (*etcd.Response, error) { - ch := respch - if ch == nil { - ch = make(chan *etcd.Response) - } - s.wlock.Lock() - s.watches[key] = append(s.watches[key], ch) - s.wlock.Unlock() - - var resp *etcd.Response - if respch != nil { - <-stop - } else { - select { - case resp = <-ch: - case <-stop: - } - } - - // Delete the watch. - s.wlock.Lock() - var watches []chan *etcd.Response - for _, w := range s.watches[key] { - if w != ch { - watches = append(watches, w) - } - } - s.watches[key] = watches - s.wlock.Unlock() - close(ch) - - if resp == nil { - return nil, etcd.ErrWatchStoppedByUser - } - return resp, nil -} -- GitLab