Commit 4e9d3e98 authored by ale's avatar ale

use a common etcd component library for coordination tasks

parent 2b580be4
......@@ -9,8 +9,9 @@ import (
"fmt"
"net"
"strings"
"sync"
"time"
"git.autistici.org/ale/autoradio/coordination/presence"
)
const (
......@@ -32,15 +33,10 @@ const (
IcecastMountPrefix = "/_stream"
)
var (
// IcecastPort is the port that the Icecast server will listen
// on. Since we fully manage the system-wide Icecast instance,
// there's not much point in making this configurable.
IcecastPort = 8000
ErrIsDirectory = errors.New("key is a directory")
ErrIsFile = errors.New("key is a file")
)
// IcecastPort is the port that the Icecast server will listen
// on. Since we fully manage the system-wide Icecast instance,
// there's not much point in making this configurable.
var IcecastPort = 8000
// EncodingParams used to re-encode a stream.
type EncodingParams struct {
......@@ -253,52 +249,44 @@ func (ns *NodeStatus) NumListeners() int {
return listeners
}
// Cache the list of active nodes (the front-ends need to retrieve
// this information continuously, so we limit them to 2qps).
type nodesCache struct {
ttl time.Duration
nodes []*NodeStatus
deadline time.Time
lock sync.Mutex
}
type getNodesFunc func() ([]*NodeStatus, error)
func newNodesCache() *nodesCache {
return &nodesCache{
ttl: 500 * time.Millisecond,
}
}
// Get returns the cached value of 'fn', if valid. If the value is
// expired and we get an error from 'fn', we will attempt to return
// the previously cached value anyway, along with the error: the
// caller can then pick the right failure behavior.
func (nc *nodesCache) Get(fn getNodesFunc) ([]*NodeStatus, error) {
nc.lock.Lock()
defer nc.lock.Unlock()
var err error
now := time.Now()
if now.After(nc.deadline) {
var nodes []*NodeStatus
if nodes, err = fn(); err == nil {
nc.nodes = nodes
nc.deadline = now.Add(nc.ttl)
}
}
return nc.nodes, err
}
// Client is the actual API to the streaming cluster's database.
type Client struct {
client EtcdClient
activeNodesCache *nodesCache
client EtcdClient
presenceCache *presence.Cache
}
// NewClient creates and returns a new Client.
func NewClient(client EtcdClient) *Client {
return &Client{client, newNodesCache()}
// The Client keeps a cache of node presence information,
// since it is likely that it will be accessed quite often (in
// the case of redirectord, on every request).
return &Client{
client: client,
presenceCache: presence.NewCache(client, NodePrefix, 2*time.Second, func(data []string) interface{} {
// Convert a list of JSON-encoded NodeStatus
// objects into a lisce of *NodeStatus
// pointers. Since decoding can be a bit
// expensive if performed on every query, we
// only perform it when the data is updated.
tmp := make([]*NodeStatus, 0, len(data))
for _, nodeData := range data {
var ns NodeStatus
if err := json.NewDecoder(strings.NewReader(nodeData)).Decode(&ns); err == nil {
tmp = append(tmp, &ns)
}
}
return tmp
}),
}
}
// GetNodes returns the list of active cluster nodes.
func (r *Client) GetNodes() ([]*NodeStatus, error) {
data, err := r.presenceCache.Data()
if err != nil {
return nil, err
}
return data.([]*NodeStatus), nil
}
// GetMount returns data on a specific mountpoint (returns nil if not
......@@ -309,7 +297,7 @@ func (r *Client) GetMount(mountName string) (*Mount, error) {
return nil, err
}
if response.Node.Dir {
return nil, ErrIsDirectory
return nil, errors.New("key is a directory")
}
var m Mount
......@@ -343,7 +331,7 @@ func (r *Client) ListMounts() ([]*Mount, error) {
return nil, err
}
if !response.Node.Dir {
return nil, ErrIsFile
return nil, errors.New("key is a file")
}
result := make([]*Mount, 0, len(response.Node.Nodes))
......@@ -375,7 +363,7 @@ func (r *Client) GetMasterInfo() (*MasterNodeInfo, error) {
return nil, err
}
if response.Node.Dir {
return nil, ErrIsDirectory
return nil, errors.New("key is a directory")
}
var m MasterNodeInfo
if err := json.NewDecoder(strings.NewReader(response.Node.Value)).Decode(&m); err != nil {
......@@ -384,29 +372,6 @@ func (r *Client) GetMasterInfo() (*MasterNodeInfo, error) {
return &m, nil
}
func (r *Client) doGetNodes() ([]*NodeStatus, error) {
response, err := r.client.Get(NodePrefix, false, false)
if err != nil || response.Node == nil {
return nil, err
}
if !response.Node.Dir {
return nil, ErrIsFile
}
result := make([]*NodeStatus, 0, len(response.Node.Nodes))
for _, entry := range response.Node.Nodes {
var ns NodeStatus
if err := json.NewDecoder(strings.NewReader(entry.Value)).Decode(&ns); err == nil {
result = append(result, &ns)
}
}
return result, nil
}
// GetNodes returns the list of active cluster nodes.
func (r *Client) GetNodes() ([]*NodeStatus, error) {
return r.activeNodesCache.Get(r.doGetNodes)
}
// GeneratePassword returns a new random password.
func GeneratePassword() string {
b := make([]byte, 6)
......
// Etcd client mock for testing purposes. It tries to follow the same
// semantics as the actual etcd, at least as far as they are used by
// autoradio. Instead of talking to a remote etcd server, it uses an
// in-memory representation of the data.
package util
// Etcd client fake for testing purposes. It tries to follow the same
// semantics as the actual etcd (highligh on "tries to"), with a
// simple in-memory implementation.
//
package etcdtest
import (
"errors"
"fmt"
"math/rand"
"path/filepath"
"strings"
"sync"
"time"
"git.autistici.org/ale/autoradio"
"github.com/coreos/go-etcd/etcd"
)
type datum struct {
value string
expire time.Time
index uint64
}
type testEtcdServer struct {
type FakeEtcdClient struct {
latency time.Duration
lock sync.Mutex
data map[string]datum
......@@ -30,16 +30,16 @@ type testEtcdServer struct {
index uint64
}
func NewTestEtcdClient() autoradio.EtcdClient {
return &testEtcdServer{
func NewClient() *FakeEtcdClient {
return &FakeEtcdClient{
data: make(map[string]datum),
watches: make(map[string][]chan *etcd.Response),
index: 1,
}
}
func NewTestEtcdClientWithLatency(maxLatency time.Duration) autoradio.EtcdClient {
return &testEtcdServer{
func NewClientWithLatency(maxLatency time.Duration) *FakeEtcdClient {
return &FakeEtcdClient{
data: make(map[string]datum),
watches: make(map[string][]chan *etcd.Response),
index: 1,
......@@ -47,13 +47,14 @@ func NewTestEtcdClientWithLatency(maxLatency time.Duration) autoradio.EtcdClient
}
}
func (s *testEtcdServer) delay() {
func (s *FakeEtcdClient) delay() {
if s.latency > 0 {
time.Sleep(time.Duration(rand.Int63n(int64(s.latency))))
}
}
func (s *testEtcdServer) trigger(action, key string) *etcd.Response {
func (s *FakeEtcdClient) trigger(action, key string) *etcd.Response {
// Create the etcd Response, containing a single Node.
resp := &etcd.Response{
Action: action,
EtcdIndex: s.index,
......@@ -65,24 +66,42 @@ func (s *testEtcdServer) trigger(action, key string) *etcd.Response {
resp.Node.Value = s.data[key].value
}
s.index++
s.lock.Unlock()
go func() {
s.wlock.Lock()
for pfx, w := range s.watches {
if strings.HasPrefix(key, pfx) {
for _, ch := range w {
ch <- resp
}
// Send notifications to the watchers. To avoid difficult race
// conditions, we close the channel after having sent a response.
var toclear []string
for pfx, w := range s.watches {
if strings.HasPrefix(key, pfx) {
for _, ch := range w {
ch <- resp
close(ch)
}
toclear = append(toclear, pfx)
}
s.wlock.Unlock()
}()
}
for _, pfx := range toclear {
delete(s.watches, pfx)
}
//s.wlock.Unlock()
s.lock.Unlock()
return resp
}
func (s *testEtcdServer) Create(key, value string, ttl uint64) (*etcd.Response, error) {
func ttlToTime(ttl uint64) time.Time {
if ttl == 0 {
return time.Time{}
}
return time.Now().Add(time.Duration(ttl) * time.Second)
}
func (s *FakeEtcdClient) AddChild(key, value string, ttl uint64) (*etcd.Response, error) {
uniqueId := fmt.Sprintf("%x-%x", time.Now().Unix(), rand.Int63())
return s.Create(filepath.Join(key, uniqueId), value, ttl)
}
func (s *FakeEtcdClient) Create(key, value string, ttl uint64) (*etcd.Response, error) {
s.delay()
s.lock.Lock()
if _, ok := s.data[key]; ok {
......@@ -92,18 +111,35 @@ func (s *testEtcdServer) Create(key, value string, ttl uint64) (*etcd.Response,
s.data[key] = datum{
value: value,
expire: time.Now().Add(time.Duration(ttl) * time.Second),
expire: ttlToTime(ttl),
index: s.index,
}
return s.trigger("create", key), nil
}
func (s *testEtcdServer) CompareAndSwap(key, value string, ttl uint64, oldvalue string, index uint64) (*etcd.Response, error) {
func (s *FakeEtcdClient) CompareAndDelete(key, oldvalue string, index uint64) (*etcd.Response, error) {
s.delay()
s.lock.Lock()
if d, ok := s.data[key]; ok && d.value == oldvalue {
d, ok := s.data[key]
if ok && ((oldvalue == "" || d.value == oldvalue) ||
(index == 0 || d.index <= index)) {
delete(s.data, key)
return s.trigger("delete", key), nil
}
s.lock.Unlock()
return nil, &etcd.EtcdError{Message: "failed", Index: s.index}
}
func (s *FakeEtcdClient) CompareAndSwap(key, value string, ttl uint64, oldvalue string, index uint64) (*etcd.Response, error) {
s.delay()
s.lock.Lock()
d, ok := s.data[key]
if ok && ((oldvalue == "" || d.value == oldvalue) ||
(index == 0 || d.index <= index)) {
s.data[key] = datum{
value: value,
expire: time.Now().Add(time.Duration(ttl) * time.Second),
expire: ttlToTime(ttl),
index: s.index,
}
return s.trigger("update", key), nil
}
......@@ -111,17 +147,18 @@ func (s *testEtcdServer) CompareAndSwap(key, value string, ttl uint64, oldvalue
return nil, &etcd.EtcdError{Message: "failed", Index: s.index}
}
func (s *testEtcdServer) Delete(key string, recursive bool) (*etcd.Response, error) {
func (s *FakeEtcdClient) Delete(key string, recursive bool) (*etcd.Response, error) {
s.delay()
s.lock.Lock()
delete(s.data, key)
return s.trigger("delete", key), nil
}
func (s *testEtcdServer) Get(key string, recursive, boh bool) (*etcd.Response, error) {
func (s *FakeEtcdClient) Get(key string, recursive, boh bool) (*etcd.Response, error) {
s.delay()
s.lock.Lock()
defer s.lock.Unlock()
now := time.Now()
resp := &etcd.Response{
EtcdIndex: s.index,
}
......@@ -129,7 +166,8 @@ func (s *testEtcdServer) Get(key string, recursive, boh bool) (*etcd.Response, e
key = strings.TrimSuffix(key, "/")
keyDirPfx := key + "/"
for path, datum := range s.data {
if path == key || strings.HasPrefix(path, keyDirPfx) {
if (path == key || strings.HasPrefix(path, keyDirPfx)) &&
(datum.expire.IsZero() || datum.expire.After(now)) {
nodes = append(nodes, &etcd.Node{
Key: path,
Value: datum.value,
......@@ -138,7 +176,7 @@ func (s *testEtcdServer) Get(key string, recursive, boh bool) (*etcd.Response, e
}
switch {
case len(nodes) == 0:
return nil, errors.New("not found")
return nil, &etcd.EtcdError{Message: "not found", Index: s.index}
case len(nodes) == 1 && nodes[0].Key == key:
resp.Node = nodes[0]
default:
......@@ -151,25 +189,75 @@ func (s *testEtcdServer) Get(key string, recursive, boh bool) (*etcd.Response, e
return resp, nil
}
func (s *testEtcdServer) Set(key, value string, ttl uint64) (*etcd.Response, error) {
func (s *FakeEtcdClient) Set(key, value string, ttl uint64) (*etcd.Response, error) {
s.delay()
s.lock.Lock()
s.data[key] = datum{
value: value,
expire: time.Now().Add(time.Duration(ttl) * time.Second),
expire: ttlToTime(ttl),
index: s.index,
}
return s.trigger("set", key), nil
}
func (s *testEtcdServer) Watch(key string, index uint64, recursive bool, respch chan *etcd.Response, stop chan bool) (*etcd.Response, error) {
func (s *FakeEtcdClient) SetDir(key string, ttl uint64) (*etcd.Response, error) {
// TODO. There are no directories.
return &etcd.Response{}, nil
}
func (s *FakeEtcdClient) Update(key, value string, ttl uint64) (*etcd.Response, error) {
s.lock.Lock()
if _, ok := s.data[key]; !ok {
s.lock.Unlock()
return nil, &etcd.EtcdError{Message: fmt.Sprintf("key %s not found", key), Index: s.index}
}
s.data[key] = datum{
value: value,
expire: ttlToTime(ttl),
index: s.index,
}
return s.trigger("update", key), nil
}
func (s *FakeEtcdClient) Watch(key string, index uint64, recursive bool, respch chan *etcd.Response, stop chan bool) (*etcd.Response, error) {
// First scan the data to check for changes >= index.
s.lock.Lock()
var nodes []*etcd.Node
for k, d := range s.data {
if strings.HasPrefix(k, key) && d.index >= index {
nodes = append(nodes, &etcd.Node{
Key: k,
Value: d.value,
})
}
}
if len(nodes) > 0 {
// Bleargh, we create a fake 'dir' with all contents.
resp := &etcd.Response{
Action: "update",
Node: &etcd.Node{
Key: key,
Dir: true,
Nodes: nodes,
},
EtcdIndex: s.index,
}
s.lock.Unlock()
if respch != nil {
respch <- resp
}
return resp, nil
}
// Install the watch.
ch := respch
if ch == nil {
ch = make(chan *etcd.Response)
ch = make(chan *etcd.Response, 1)
}
s.wlock.Lock()
s.watches[key] = append(s.watches[key], ch)
s.wlock.Unlock()
s.lock.Unlock()
// Wait for a single response.
var resp *etcd.Response
if respch != nil {
<-stop
......@@ -180,18 +268,6 @@ func (s *testEtcdServer) Watch(key string, index uint64, recursive bool, respch
}
}
// Delete the watch.
s.wlock.Lock()
var watches []chan *etcd.Response
for _, w := range s.watches[key] {
if w != ch {
watches = append(watches, w)
}
}
s.watches[key] = watches
s.wlock.Unlock()
close(ch)
if resp == nil {
return nil, etcd.ErrWatchStoppedByUser
}
......
......@@ -6,81 +6,111 @@ import (
"sync"
"time"
"git.autistici.org/ale/autoradio"
"github.com/coreos/go-etcd/etcd"
)
// Possible values for Role.
const (
ROLE_UNKNOWN = iota
ROLE_SLAVE
ROLE_MASTER
RoleUnknown = iota
RoleSlave
RoleMaster
)
// Role of a participant in the master election protocol.
type Role int
func (r Role) String() string {
switch r {
case ROLE_SLAVE:
case RoleSlave:
return "slave"
case ROLE_MASTER:
case RoleMaster:
return "master"
}
return "unknown"
}
// State of the master election protocol (istantaneous snapshot for a
// node). It stores the role of the node, and the data associated with
// the current master.
type State struct {
Role Role
MasterData string
}
// Equal returns true if the two states are identical.
func (s State) Equal(other State) bool {
return s.Role == other.Role && s.MasterData == other.MasterData
}
type MasterElection struct {
client autoradio.EtcdClient
Data string
Path string
TTL uint64
// EtcdClient is the etcd interface used by this package.
type EtcdClient interface {
Create(key string, value string, ttl uint64) (*etcd.Response, error)
Delete(key string, recursive bool) (*etcd.Response, error)
Get(key string, sort, recursive bool) (*etcd.Response, error)
CompareAndSwap(key string, value string, ttl uint64, prevValue string, prevIndex uint64) (*etcd.Response, error)
Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error)
}
Log *log.Logger
// MasterElection runs a master election protocol on top of etcd,
// using an etcd object as a lock. Multiple master elections can run
// at the same time on the same etcd instance just by using separate
// lock files.
//
// Each participant is identified by a unique blob of data, which is
// opaque to the master election protocol itself. The data associated
// with the currently elected master (if any) can be retrieved at any
// time by calling GetMasterData.
//
type MasterElection struct {
client EtcdClient
data string
path string
ttl uint64
stateLock sync.Mutex
stateCh chan State
state State
// Logger for debug messages and state changes. By default
// it will send output to os.Stderr.
Log *log.Logger
}
// New creates a new MasterElection object that will establish a lock
// on 'path'. It will send state transitions to 'sch', if provided. If
// 'sch' is not nil, it will be closed when Run() terminates.
func New(client autoradio.EtcdClient, path, data string, ttl uint64, sch chan State) *MasterElection {
//
// All participants in the protocol should use the same ttl.
func New(client EtcdClient, path, data string, ttl uint64, sch chan State) *MasterElection {
if ttl < 1 {
ttl = 1
}
return &MasterElection{
client: client,
Path: path,
Data: data,
TTL: ttl,
state: State{Role: ROLE_UNKNOWN},
path: path,
data: data,
ttl: ttl,
stateCh: sch,
Log: log.New(os.Stderr, "masterelection: ", 0),
}
}
// Valid returns true if the protocol has elected a master.
func (m *MasterElection) Valid() bool {
m.stateLock.Lock()
defer m.stateLock.Unlock()
return m.state.Role != ROLE_UNKNOWN
return m.state.Role != RoleUnknown
}
// IsMaster returns true if this participant is the elected master.
func (m *MasterElection) IsMaster() bool {
m.stateLock.Lock()
defer m.stateLock.Unlock()
return m.state.Role == ROLE_MASTER
return m.state.Role == RoleMaster
}
// GetMasterData returns the opaque data associated with the currently
// elected master.
func (m *MasterElection) GetMasterData() string {
m.stateLock.Lock()
defer m.stateLock.Unlock()
......@@ -105,11 +135,11 @@ func (m *MasterElection) setState(role Role, masterData string) {
}
func (m *MasterElection) runMaster(index uint64, stop chan bool) {
m.setState(ROLE_MASTER, m.Data)
m.setState(RoleMaster, m.data)
// If we renew the lease every TTL / N, we allow N renewal
// errors before we stop believing being the master.
ttl := time.Second * time.Duration(m.TTL)
ttl := time.Second * time.Duration(m.ttl)
tick := time.NewTicker(ttl / 3)
lastUpdate := time.Now()
......@@ -122,7 +152,7 @@ func (m *MasterElection) runMaster(index uint64, stop chan bool) {
// the stored master address is still our own,
// and no-one stole our lock. If not, the TTL
// will be updated (and the lock renewed).
resp, err := m.client.CompareAndSwap(m.Path, m.Data, m.TTL, m.Data, index)
resp, err := m.client.CompareAndSwap(m.path, m.data, m.ttl, m.data, index)
if err != nil {
m.Log.Printf("error updating lock: %s", err)
......@@ -140,7 +170,7 @@ func (m *MasterElection) runMaster(index uint64, stop chan bool) {
// Facilitate a master re-election by dropping
// the lock rather than letting it expire.
m.Log.Println("releasing masterelection lock")
m.client.Delete(m.Path, false)
m.client.Delete(m.path, false)
return
}
}
......@@ -155,13 +185,13 @@ func (m *MasterElection) runSlave(index uint64, stop chan bool) {
// In this case, we skip updating state, since the Watch call
// will immediately return (with the change that caused the
// index to increase).
resp, err := m.client.Get(m.Path, false, false)
resp, err := m.client.Get(m.path, false, false)
if err == nil && resp.Node.ModifiedIndex <= index {
m.setState(ROLE_SLAVE, resp.Node.Value)
m.setState(RoleSlave, resp.Node.Value)
}
for {
resp, err = m.client.Watch(m.Path, index+1, false, nil, stop)
resp, err = m.client.Watch(m.path, index+1, false, nil, stop)
if err != nil {
if err != etcd.ErrWatchStoppedByUser {
m.Log.Printf("slave Watch() error: %v", err)
......@@ -176,11 +206,12 @@ func (m *MasterElection) runSlave(index uint64, stop chan bool) {
return
}
m.setState(ROLE_SLAVE, resp.Node.Value)
m.setState(RoleSlave, resp.Node.Value)
index = resp.EtcdIndex
}
}
// Run the master election protocol, until the stop channel is closed.
func (m *MasterElection) Run(stop chan bool) {
if m.stateCh != nil {
defer close(m.stateCh)
......@@ -198,7 +229,7 @@ func (m *MasterElection) Run(stop chan bool) {
// if the lockfile does not exist (either because it
// expired, or the previous master exited gracefully and
// deleted it).
response, err := m.client.Create(m.Path, m.Data, m.TTL)
response, err := m.client.Create(m.path, m.data, m.ttl)
if err == nil {
// Howdy, we're the master now. Wait a while
......
......@@ -8,7 +8,7 @@ import (
"testing"
"time"
"git.autistici.org/ale/autoradio/util"
"git.autistici.org/ale/autoradio/coordination/etcdtest"
)
func init() {
......@@ -39,7 +39,7 @@ func verifyMasterData(t *testing.T, nodes []*MasterElection) {
func TestMasterElection(t *testing.T) {
//etcd := util.NewTestEtcdClient()
etcd := util.NewTestEtcdClientWithLatency(20 * time.Millisecond)
etcd := etcdtest.NewClientWithLatency(20 * time.Millisecond)
lockPath := "/master/election/test"
n := 5
......
package presence
import (