diff --git a/coordination/masterelection/masterelection.go b/coordination/masterelection/masterelection.go index 5678439712e7a5742ca0149fe27f24479e5f2752..3b2786e200a2bec9e95f673e71e7fbef35db380c 100644 --- a/coordination/masterelection/masterelection.go +++ b/coordination/masterelection/masterelection.go @@ -1,3 +1,5 @@ +// Package masterelection lets you run a simple master-election +// protocol on top of etcd. package masterelection import ( @@ -16,6 +18,9 @@ const ( RoleMaster ) +// How long to sleep on etcd Create() errors. +var errorDelay = 200 * time.Millisecond + // Role of a participant in the master election protocol. type Role int @@ -213,6 +218,7 @@ func (m *MasterElection) runSlave(index uint64, stop chan bool) { // Run the master election protocol, until the stop channel is closed. func (m *MasterElection) Run(stop chan bool) { + // If an update channel is set, close it on exit. if m.stateCh != nil { defer close(m.stateCh) } @@ -242,7 +248,7 @@ func (m *MasterElection) Run(stop chan bool) { } else { // An error of some other sort! Retry. m.Log.Printf("unexpected error: %v", err) + time.Sleep(errorDelay) } - } } diff --git a/coordination/presence/presence.go b/coordination/presence/presence.go index 41c95864a356950fee6b9e5dac2a2904fc463b12..5f6192918d0f0c7d58a2ca2c292f710a9dd1836e 100644 --- a/coordination/presence/presence.go +++ b/coordination/presence/presence.go @@ -1,3 +1,8 @@ +// Package presence allows individual nodes in a cluster to let their +// presence known to other peers through a central registry (a +// directory on etcd). Nodes can store a small amount of information +// along with their state. +// package presence import ( @@ -23,6 +28,8 @@ type Client struct { path string } +// NewClient returns a simple client that can look up the registry and +// return a list of active nodes. func NewClient(client EtcdClient, path string) *Client { return &Client{ client: client, @@ -58,7 +65,8 @@ type Presence struct { } // New returns a new Presence worker that establishes node presence at -// the specified path. +// the specified path. It will create a new unique file below that +// directory, and remove it when Stop() is called (or the TTL expires). func New(client EtcdClient, path string, stateFn StateFn, ttl uint64) *Presence { return &Presence{ Client: NewClient(client, path), diff --git a/coordination/watcher/cache.go b/coordination/watcher/cache.go index 9372281d5f249c2f4d87779431b905e30e164cfd..d826239f317f3b0ef4f74fc1d2728e6acf8bc3eb 100644 --- a/coordination/watcher/cache.go +++ b/coordination/watcher/cache.go @@ -12,6 +12,10 @@ type hasClose interface { Close() error } +// Cache for data that is expensive to decode. Keeps the (decoded) +// contents of the remote etcd directory in memory. Uses a DecodeFn to +// parse the encoded data stored in etcd only when it is actually +// modified. type Cache struct { w *Watcher decodeFn DecodeFn @@ -25,6 +29,7 @@ func defaultDecodeFn(value string) interface{} { return value } +// NewCache returns a new Cache mirroring 'path'. func NewCache(client EtcdClient, path string, decodeFn DecodeFn) *Cache { if decodeFn == nil { decodeFn = defaultDecodeFn @@ -44,6 +49,7 @@ func NewCache(client EtcdClient, path string, decodeFn DecodeFn) *Cache { return c } +// Close the Cache and release associated resources. func (c *Cache) Close() { c.w.Stop() } @@ -90,6 +96,7 @@ func (c *Cache) handleResponse(resp *etcd.Response) { c.lock.Unlock() } +// Get a value from the cache. func (c *Cache) Get(key string) (interface{}, bool) { c.lock.RLock() value, ok := c.data[key] @@ -97,6 +104,7 @@ func (c *Cache) Get(key string) (interface{}, bool) { return value, ok } +// Get a value and the associated etcd index. func (c *Cache) GetWithIndex(key string) (interface{}, uint64, bool) { c.lock.RLock() index := c.index diff --git a/coordination/watcher/watcher.go b/coordination/watcher/watcher.go index 430c0beddd64fe392d767ae5e16a999f9df56d5f..3770edc9a6b496e2de13ec686dc7a62dfbfd116d 100644 --- a/coordination/watcher/watcher.go +++ b/coordination/watcher/watcher.go @@ -1,3 +1,13 @@ +// Package watcher monitors a path on etcd for changes, and sends them +// on a channel. It can thus build a local in-memory cache of the +// contents of the etcd directory, which is convenient for small +// datasets that don't change often (configuration data, for +// instance). +// +// The package includes both a simple cache implementation (Cache), +// and a background worker to synchronize arbitrary objects +// implementing the Syncable interface (Syncer). +// package watcher import ( diff --git a/vagrant-test/run-tests b/vagrant-test/run-tests new file mode 100755 index 0000000000000000000000000000000000000000..9a78ee816a9565b182ac8ee8c15338eccb172ab9 --- /dev/null +++ b/vagrant-test/run-tests @@ -0,0 +1,16 @@ +#!/bin/sh +# +# Test that the running autoradio cluster on Vagrant is performing +# as expected. +# + +expected_dns="192.168.50.2 192.168.50.3 192.168.50.4" +dns=$(dig +short stream.autora.dio @192.168.50.3 | sort | tr '\n' ' ' | sed -e 's/ $//') +if [ "$dns" != "$expected_dns" ]; then + echo "DNS query for stream.autora.dio did not return expected results: ${expected_dns}" >&2 + exit 1 +fi + +echo "Vagrant autoradio cluster looks ok." >&2 + +exit 0