Skip to content
Snippets Groups Projects
Commit 22b8567e authored by ale's avatar ale
Browse files

use an internal interface to the etcd client

parent 01d010c9
Branches
No related tags found
No related merge requests found
...@@ -10,8 +10,6 @@ import ( ...@@ -10,8 +10,6 @@ import (
"strings" "strings"
"sync" "sync"
"time" "time"
"git.autistici.org/ale/autoradio/third_party/github.com/coreos/go-etcd/etcd"
) )
var ( var (
...@@ -142,11 +140,11 @@ func (nc *nodesCache) Get(fn getNodesFunc) ([]*NodeStatus, error) { ...@@ -142,11 +140,11 @@ func (nc *nodesCache) Get(fn getNodesFunc) ([]*NodeStatus, error) {
// RadioAPI is the actual API to the streaming cluster's database. // RadioAPI is the actual API to the streaming cluster's database.
type RadioAPI struct { type RadioAPI struct {
client *etcd.Client client EtcdClient
activeNodesCache *nodesCache activeNodesCache *nodesCache
} }
func NewRadioAPI(client *etcd.Client) *RadioAPI { func NewRadioAPI(client EtcdClient) *RadioAPI {
return &RadioAPI{client, newNodesCache()} return &RadioAPI{client, newNodesCache()}
} }
......
...@@ -17,7 +17,7 @@ var ( ...@@ -17,7 +17,7 @@ var (
etcdCaFile = flag.String("etcd-ca", "", "SSL CA certificate for etcd client") etcdCaFile = flag.String("etcd-ca", "", "SSL CA certificate for etcd client")
) )
func loadFile(path string) string { func mustLoadFile(path string) string {
data, err := ioutil.ReadFile(path) data, err := ioutil.ReadFile(path)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
...@@ -45,7 +45,7 @@ func resolveAll(input []string, proto string) []string { ...@@ -45,7 +45,7 @@ func resolveAll(input []string, proto string) []string {
return result return result
} }
func NewEtcdClient() *etcd.Client { func NewEtcdClient() EtcdClient {
proto := "http" proto := "http"
if *etcdCertFile != "" && *etcdKeyFile != "" { if *etcdCertFile != "" && *etcdKeyFile != "" {
proto = "https" proto = "https"
...@@ -63,9 +63,9 @@ func NewEtcdClient() *etcd.Client { ...@@ -63,9 +63,9 @@ func NewEtcdClient() *etcd.Client {
if proto == "https" { if proto == "https" {
var err error var err error
c, err = etcd.NewTLSClient(machines, c, err = etcd.NewTLSClient(machines,
loadFile(*etcdCertFile), mustLoadFile(*etcdCertFile),
loadFile(*etcdKeyFile), mustLoadFile(*etcdKeyFile),
loadFile(*etcdCaFile)) mustLoadFile(*etcdCaFile))
if err != nil { if err != nil {
log.Fatalf("Error setting up SSL for etcd client: %s", err) log.Fatalf("Error setting up SSL for etcd client: %s", err)
} }
...@@ -76,3 +76,14 @@ func NewEtcdClient() *etcd.Client { ...@@ -76,3 +76,14 @@ func NewEtcdClient() *etcd.Client {
c.SetConsistency(etcd.WEAK_CONSISTENCY) c.SetConsistency(etcd.WEAK_CONSISTENCY)
return c return c
} }
// Etcd client interface. Used to decouple our code from the actual
// etcd API, for testing purposes.
type EtcdClient interface {
Create(string, string, uint64) (*etcd.Response, error)
CompareAndSwap(string, string, uint64, string, uint64) (*etcd.Response, error)
Delete(string, bool) (*etcd.Response, error)
Get(string, bool, bool) (*etcd.Response, error)
Set(string, string, uint64) (*etcd.Response, error)
Watch(string, uint64, bool, chan *etcd.Response, chan bool) (*etcd.Response, error)
}
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
"log" "log"
"time" "time"
"git.autistici.org/ale/autoradio"
"git.autistici.org/ale/autoradio/third_party/github.com/coreos/go-etcd/etcd" "git.autistici.org/ale/autoradio/third_party/github.com/coreos/go-etcd/etcd"
) )
...@@ -23,11 +24,11 @@ func stateToString(state int) string { ...@@ -23,11 +24,11 @@ func stateToString(state int) string {
} }
type MasterElection struct { type MasterElection struct {
client *etcd.Client client autoradio.EtcdClient
stop chan bool stop chan bool
stopped bool stopped bool
Addr string Data string
Path string Path string
TTL uint64 TTL uint64
...@@ -35,14 +36,14 @@ type MasterElection struct { ...@@ -35,14 +36,14 @@ type MasterElection struct {
StateChange chan int StateChange chan int
} }
func NewMasterElection(client *etcd.Client, path, addr string, ttl uint64, sch chan int, stop chan bool) *MasterElection { func NewMasterElection(client autoradio.EtcdClient, path, data string, ttl uint64, sch chan int, stop chan bool) *MasterElection {
if ttl < 2 { if ttl < 2 {
ttl = 2 ttl = 2
} }
return &MasterElection{ return &MasterElection{
client: client, client: client,
Path: path, Path: path,
Addr: addr, Data: data,
TTL: ttl, TTL: ttl,
State: STATE_SLAVE, State: STATE_SLAVE,
StateChange: sch, StateChange: sch,
...@@ -54,7 +55,7 @@ func (m *MasterElection) IsMaster() bool { ...@@ -54,7 +55,7 @@ func (m *MasterElection) IsMaster() bool {
return m.State == STATE_MASTER return m.State == STATE_MASTER
} }
func (m *MasterElection) GetMasterAddr() string { func (m *MasterElection) GetMasterData() string {
response, err := m.client.Get(m.Path, false, false) response, err := m.client.Get(m.Path, false, false)
if err != nil || response.Node == nil { if err != nil || response.Node == nil {
return "" return ""
...@@ -109,7 +110,7 @@ func (m *MasterElection) runMaster(index uint64) { ...@@ -109,7 +110,7 @@ func (m *MasterElection) runMaster(index uint64) {
// the stored master address is still our own, // the stored master address is still our own,
// and no-one stole our lock. If not, the TTL // and no-one stole our lock. If not, the TTL
// will be updated (and the lock renewed). // will be updated (and the lock renewed).
response, err := m.client.CompareAndSwap(m.Path, m.Addr, m.TTL, m.Addr, index) response, err := m.client.CompareAndSwap(m.Path, m.Data, m.TTL, m.Data, index)
if err != nil { if err != nil {
log.Printf("error updating lock: %s", err) log.Printf("error updating lock: %s", err)
...@@ -162,7 +163,7 @@ func (m *MasterElection) Run() { ...@@ -162,7 +163,7 @@ func (m *MasterElection) Run() {
// if the lockfile does not exist (either because it // if the lockfile does not exist (either because it
// expired, or the previous master exited gracefully and // expired, or the previous master exited gracefully and
// deleted it). // deleted it).
response, err := m.client.Create(m.Path, m.Addr, m.TTL) response, err := m.client.Create(m.Path, m.Data, m.TTL)
if err == nil { if err == nil {
// Howdy, we're the master now. Wait a while // Howdy, we're the master now. Wait a while
......
...@@ -6,6 +6,7 @@ import ( ...@@ -6,6 +6,7 @@ import (
"fmt" "fmt"
"io" "io"
"log" "log"
"net"
"net/http" "net/http"
"os" "os"
"os/exec" "os/exec"
...@@ -98,8 +99,8 @@ func (ic *IcecastController) killSources(conf *clusterConfig) error { ...@@ -98,8 +99,8 @@ func (ic *IcecastController) killSources(conf *clusterConfig) error {
} }
// Update reloads the Icecast daemon with a new configuration. // Update reloads the Icecast daemon with a new configuration.
func (ic *IcecastController) Update(conf *clusterConfig, isMaster bool, masterAddr string) error { func (ic *IcecastController) Update(conf *clusterConfig, isMaster bool, masterAddr net.IP) error {
if !isMaster && masterAddr == "" { if !isMaster && masterAddr == nil {
return errors.New("unknown system state") return errors.New("unknown system state")
} }
...@@ -113,7 +114,7 @@ func (ic *IcecastController) Update(conf *clusterConfig, isMaster bool, masterAd ...@@ -113,7 +114,7 @@ func (ic *IcecastController) Update(conf *clusterConfig, isMaster bool, masterAd
} }
// Write a new configuration (atomically). // Write a new configuration (atomically).
ic.config.Update(conf, isMaster, masterAddr) ic.config.Update(conf, isMaster, masterAddr.String())
tmpf := icecastConfigFile + ".tmp" tmpf := icecastConfigFile + ".tmp"
defer os.Remove(tmpf) defer os.Remove(tmpf)
if err := ic.config.EncodeToFile(tmpf); err != nil { if err := ic.config.EncodeToFile(tmpf); err != nil {
......
...@@ -83,14 +83,14 @@ func (c *clusterConfig) delMount(name string) { ...@@ -83,14 +83,14 @@ func (c *clusterConfig) delMount(name string) {
// Keeps the in-memory service configuration in sync with the etcd // Keeps the in-memory service configuration in sync with the etcd
// database. An update channel is triggered whenever the data changes. // database. An update channel is triggered whenever the data changes.
type configWatcher struct { type configWatcher struct {
client *etcd.Client client autoradio.EtcdClient
config *clusterConfig config *clusterConfig
upch chan bool upch chan bool
stop chan bool stop chan bool
index uint64 index uint64
} }
func newConfigSyncer(client *etcd.Client, config *clusterConfig, upch chan bool, stop chan bool) *configWatcher { func newConfigSyncer(client autoradio.EtcdClient, config *clusterConfig, upch chan bool, stop chan bool) *configWatcher {
return &configWatcher{ return &configWatcher{
client: client, client: client,
config: config, config: config,
...@@ -216,21 +216,21 @@ func (w *configWatcher) Start() { ...@@ -216,21 +216,21 @@ func (w *configWatcher) Start() {
// An active streaming node, managing the local icecast server. // An active streaming node, managing the local icecast server.
type RadioNode struct { type RadioNode struct {
Config *clusterConfig config *clusterConfig
name string name string
ips []net.IP ips []net.IP
client *etcd.Client client autoradio.EtcdClient
me *masterelection.MasterElection me *masterelection.MasterElection
watcher *configWatcher watcher *configWatcher
icecast *IcecastController icecast *IcecastController
bw *bwmonitor.BandwidthUsageMonitor bw *bwmonitor.BandwidthUsageMonitor
livenessTtl uint64 heartbeat uint64
upch chan bool upch chan bool
stop chan bool stop chan bool
} }
func NewRadioNode(name string, ips []net.IP, netDev string, bwLimit float64, client *etcd.Client) *RadioNode { func NewRadioNode(name string, ips []net.IP, netDev string, bwLimit float64, client autoradio.EtcdClient) *RadioNode {
config := newClusterConfig() config := newClusterConfig()
// Network updates trigger icecast reconfiguration. This // Network updates trigger icecast reconfiguration. This
...@@ -260,7 +260,7 @@ func NewRadioNode(name string, ips []net.IP, netDev string, bwLimit float64, cli ...@@ -260,7 +260,7 @@ func NewRadioNode(name string, ips []net.IP, netDev string, bwLimit float64, cli
} }
return &RadioNode{ return &RadioNode{
Config: config, config: config,
name: name, name: name,
ips: ips, ips: ips,
client: client, client: client,
...@@ -271,19 +271,19 @@ func NewRadioNode(name string, ips []net.IP, netDev string, bwLimit float64, cli ...@@ -271,19 +271,19 @@ func NewRadioNode(name string, ips []net.IP, netDev string, bwLimit float64, cli
5, 5,
mech, mech,
stopch), stopch),
watcher: newConfigSyncer(client, config, upch, stopch), watcher: newConfigSyncer(client, config, upch, stopch),
icecast: NewIcecastController(name, stopch), icecast: NewIcecastController(name, stopch),
livenessTtl: 2, heartbeat: 2,
bw: bwmonitor.NewBandwidthUsageMonitor(netDev, bwLimit), bw: bwmonitor.NewBandwidthUsageMonitor(netDev, bwLimit),
upch: upch, upch: upch,
stop: stopch, stop: stopch,
} }
} }
// The presence goroutine continuously updates our entry in the list // The presence goroutine continuously updates our entry in the list
// of nodes. // of nodes.
func (rc *RadioNode) presence() { func (rc *RadioNode) presence() {
ticker := time.NewTicker(time.Duration(rc.livenessTtl/2) * time.Second) ticker := time.NewTicker(time.Duration(rc.heartbeat) * time.Second / 3)
// Register ourselves using the node name. // Register ourselves using the node name.
key := autoradio.NodePrefix + rc.name key := autoradio.NodePrefix + rc.name
...@@ -304,7 +304,7 @@ func (rc *RadioNode) presence() { ...@@ -304,7 +304,7 @@ func (rc *RadioNode) presence() {
// Update our node entry in the database. // Update our node entry in the database.
var buf bytes.Buffer var buf bytes.Buffer
json.NewEncoder(&buf).Encode(&nodeSt) json.NewEncoder(&buf).Encode(&nodeSt)
if _, err := rc.client.Set(key, buf.String(), rc.livenessTtl); err != nil { if _, err := rc.client.Set(key, buf.String(), rc.heartbeat); err != nil {
log.Printf("presence: Set(): %s", err) log.Printf("presence: Set(): %s", err)
} }
...@@ -314,6 +314,17 @@ func (rc *RadioNode) presence() { ...@@ -314,6 +314,17 @@ func (rc *RadioNode) presence() {
} }
} }
// Get a valid IP address for the current master node (to be passed to
// Icecast). Since we don't really know much about network topology,
// just pick the first IP address associated with the master node.
func (rc *RadioNode) getMasterAddr() net.IP {
var info autoradio.MasterNodeInfo
if err := json.NewDecoder(strings.NewReader(rc.me.GetMasterData())).Decode(&info); err != nil || len(info.IP) == 0 {
return nil
}
return info.IP[0]
}
// Run the node. This method does not return until someone calls Stop(). // Run the node. This method does not return until someone calls Stop().
func (rc *RadioNode) Run() { func (rc *RadioNode) Run() {
// Bootstrap the config watcher. This ensures that we have a // Bootstrap the config watcher. This ensures that we have a
...@@ -340,7 +351,7 @@ func (rc *RadioNode) Run() { ...@@ -340,7 +351,7 @@ func (rc *RadioNode) Run() {
case <-rc.upch: case <-rc.upch:
icecastReloads.Incr() icecastReloads.Incr()
log.Printf("reloading icecast config") log.Printf("reloading icecast config")
if err := rc.icecast.Update(rc.Config, rc.me.IsMaster(), rc.me.GetMasterAddr()); err != nil { if err := rc.icecast.Update(rc.config, rc.me.IsMaster(), rc.getMasterAddr()); err != nil {
icecastReloadErrors.Incr() icecastReloadErrors.Incr()
log.Printf("Update(): %s", err) log.Printf("Update(): %s", err)
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment