From 22b8567eb9ed1f5518d84b2446145047a98c6c10 Mon Sep 17 00:00:00 2001
From: ale <ale@incal.net>
Date: Sat, 6 Dec 2014 10:37:03 +0000
Subject: [PATCH] use an internal interface to the etcd client

---
 api.go                           |  6 ++--
 etcd_client.go                   | 21 ++++++++---
 masterelection/masterelection.go | 15 ++++----
 node/icecast.go                  |  7 ++--
 node/node.go                     | 61 +++++++++++++++++++-------------
 5 files changed, 66 insertions(+), 44 deletions(-)

diff --git a/api.go b/api.go
index c07a67f7..7b7cbe90 100644
--- a/api.go
+++ b/api.go
@@ -10,8 +10,6 @@ import (
 	"strings"
 	"sync"
 	"time"
-
-	"git.autistici.org/ale/autoradio/third_party/github.com/coreos/go-etcd/etcd"
 )
 
 var (
@@ -142,11 +140,11 @@ func (nc *nodesCache) Get(fn getNodesFunc) ([]*NodeStatus, error) {
 
 // RadioAPI is the actual API to the streaming cluster's database.
 type RadioAPI struct {
-	client           *etcd.Client
+	client           EtcdClient
 	activeNodesCache *nodesCache
 }
 
-func NewRadioAPI(client *etcd.Client) *RadioAPI {
+func NewRadioAPI(client EtcdClient) *RadioAPI {
 	return &RadioAPI{client, newNodesCache()}
 }
 
diff --git a/etcd_client.go b/etcd_client.go
index 65711845..39dc469c 100644
--- a/etcd_client.go
+++ b/etcd_client.go
@@ -17,7 +17,7 @@ var (
 	etcdCaFile   = flag.String("etcd-ca", "", "SSL CA certificate for etcd client")
 )
 
-func loadFile(path string) string {
+func mustLoadFile(path string) string {
 	data, err := ioutil.ReadFile(path)
 	if err != nil {
 		log.Fatal(err)
@@ -45,7 +45,7 @@ func resolveAll(input []string, proto string) []string {
 	return result
 }
 
-func NewEtcdClient() *etcd.Client {
+func NewEtcdClient() EtcdClient {
 	proto := "http"
 	if *etcdCertFile != "" && *etcdKeyFile != "" {
 		proto = "https"
@@ -63,9 +63,9 @@ func NewEtcdClient() *etcd.Client {
 	if proto == "https" {
 		var err error
 		c, err = etcd.NewTLSClient(machines,
-			loadFile(*etcdCertFile),
-			loadFile(*etcdKeyFile),
-			loadFile(*etcdCaFile))
+			mustLoadFile(*etcdCertFile),
+			mustLoadFile(*etcdKeyFile),
+			mustLoadFile(*etcdCaFile))
 		if err != nil {
 			log.Fatalf("Error setting up SSL for etcd client: %s", err)
 		}
@@ -76,3 +76,14 @@ func NewEtcdClient() *etcd.Client {
 	c.SetConsistency(etcd.WEAK_CONSISTENCY)
 	return c
 }
+
+// Etcd client interface. Used to decouple our code from the actual
+// etcd API, for testing purposes.
+type EtcdClient interface {
+	Create(string, string, uint64) (*etcd.Response, error)
+	CompareAndSwap(string, string, uint64, string, uint64) (*etcd.Response, error)
+	Delete(string, bool) (*etcd.Response, error)
+	Get(string, bool, bool) (*etcd.Response, error)
+	Set(string, string, uint64) (*etcd.Response, error)
+	Watch(string, uint64, bool, chan *etcd.Response, chan bool) (*etcd.Response, error)
+}
diff --git a/masterelection/masterelection.go b/masterelection/masterelection.go
index a979db42..2ace742c 100644
--- a/masterelection/masterelection.go
+++ b/masterelection/masterelection.go
@@ -4,6 +4,7 @@ import (
 	"log"
 	"time"
 
+	"git.autistici.org/ale/autoradio"
 	"git.autistici.org/ale/autoradio/third_party/github.com/coreos/go-etcd/etcd"
 )
 
@@ -23,11 +24,11 @@ func stateToString(state int) string {
 }
 
 type MasterElection struct {
-	client  *etcd.Client
+	client  autoradio.EtcdClient
 	stop    chan bool
 	stopped bool
 
-	Addr string
+	Data string
 	Path string
 	TTL  uint64
 
@@ -35,14 +36,14 @@ type MasterElection struct {
 	StateChange chan int
 }
 
-func NewMasterElection(client *etcd.Client, path, addr string, ttl uint64, sch chan int, stop chan bool) *MasterElection {
+func NewMasterElection(client autoradio.EtcdClient, path, data string, ttl uint64, sch chan int, stop chan bool) *MasterElection {
 	if ttl < 2 {
 		ttl = 2
 	}
 	return &MasterElection{
 		client:      client,
 		Path:        path,
-		Addr:        addr,
+		Data:        data,
 		TTL:         ttl,
 		State:       STATE_SLAVE,
 		StateChange: sch,
@@ -54,7 +55,7 @@ func (m *MasterElection) IsMaster() bool {
 	return m.State == STATE_MASTER
 }
 
-func (m *MasterElection) GetMasterAddr() string {
+func (m *MasterElection) GetMasterData() string {
 	response, err := m.client.Get(m.Path, false, false)
 	if err != nil || response.Node == nil {
 		return ""
@@ -109,7 +110,7 @@ func (m *MasterElection) runMaster(index uint64) {
 			// the stored master address is still our own,
 			// and no-one stole our lock. If not, the TTL
 			// will be updated (and the lock renewed).
-			response, err := m.client.CompareAndSwap(m.Path, m.Addr, m.TTL, m.Addr, index)
+			response, err := m.client.CompareAndSwap(m.Path, m.Data, m.TTL, m.Data, index)
 			if err != nil {
 				log.Printf("error updating lock: %s", err)
 
@@ -162,7 +163,7 @@ func (m *MasterElection) Run() {
 		// if the lockfile does not exist (either because it
 		// expired, or the previous master exited gracefully and
 		// deleted it).
-		response, err := m.client.Create(m.Path, m.Addr, m.TTL)
+		response, err := m.client.Create(m.Path, m.Data, m.TTL)
 
 		if err == nil {
 			// Howdy, we're the master now. Wait a while
diff --git a/node/icecast.go b/node/icecast.go
index f3a9c048..dc41c065 100644
--- a/node/icecast.go
+++ b/node/icecast.go
@@ -6,6 +6,7 @@ import (
 	"fmt"
 	"io"
 	"log"
+	"net"
 	"net/http"
 	"os"
 	"os/exec"
@@ -98,8 +99,8 @@ func (ic *IcecastController) killSources(conf *clusterConfig) error {
 }
 
 // Update reloads the Icecast daemon with a new configuration.
-func (ic *IcecastController) Update(conf *clusterConfig, isMaster bool, masterAddr string) error {
-	if !isMaster && masterAddr == "" {
+func (ic *IcecastController) Update(conf *clusterConfig, isMaster bool, masterAddr net.IP) error {
+	if !isMaster && masterAddr == nil {
 		return errors.New("unknown system state")
 	}
 
@@ -113,7 +114,7 @@ func (ic *IcecastController) Update(conf *clusterConfig, isMaster bool, masterAd
 	}
 
 	// Write a new configuration (atomically).
-	ic.config.Update(conf, isMaster, masterAddr)
+	ic.config.Update(conf, isMaster, masterAddr.String())
 	tmpf := icecastConfigFile + ".tmp"
 	defer os.Remove(tmpf)
 	if err := ic.config.EncodeToFile(tmpf); err != nil {
diff --git a/node/node.go b/node/node.go
index 4d9628d6..24c3f2b7 100644
--- a/node/node.go
+++ b/node/node.go
@@ -83,14 +83,14 @@ func (c *clusterConfig) delMount(name string) {
 // Keeps the in-memory service configuration in sync with the etcd
 // database. An update channel is triggered whenever the data changes.
 type configWatcher struct {
-	client *etcd.Client
+	client autoradio.EtcdClient
 	config *clusterConfig
 	upch   chan bool
 	stop   chan bool
 	index  uint64
 }
 
-func newConfigSyncer(client *etcd.Client, config *clusterConfig, upch chan bool, stop chan bool) *configWatcher {
+func newConfigSyncer(client autoradio.EtcdClient, config *clusterConfig, upch chan bool, stop chan bool) *configWatcher {
 	return &configWatcher{
 		client: client,
 		config: config,
@@ -216,21 +216,21 @@ func (w *configWatcher) Start() {
 
 // An active streaming node, managing the local icecast server.
 type RadioNode struct {
-	Config *clusterConfig
-
-	name        string
-	ips         []net.IP
-	client      *etcd.Client
-	me          *masterelection.MasterElection
-	watcher     *configWatcher
-	icecast     *IcecastController
-	bw          *bwmonitor.BandwidthUsageMonitor
-	livenessTtl uint64
-	upch        chan bool
-	stop        chan bool
+	config *clusterConfig
+
+	name      string
+	ips       []net.IP
+	client    autoradio.EtcdClient
+	me        *masterelection.MasterElection
+	watcher   *configWatcher
+	icecast   *IcecastController
+	bw        *bwmonitor.BandwidthUsageMonitor
+	heartbeat uint64
+	upch      chan bool
+	stop      chan bool
 }
 
-func NewRadioNode(name string, ips []net.IP, netDev string, bwLimit float64, client *etcd.Client) *RadioNode {
+func NewRadioNode(name string, ips []net.IP, netDev string, bwLimit float64, client autoradio.EtcdClient) *RadioNode {
 	config := newClusterConfig()
 
 	// Network updates trigger icecast reconfiguration. This
@@ -260,7 +260,7 @@ func NewRadioNode(name string, ips []net.IP, netDev string, bwLimit float64, cli
 	}
 
 	return &RadioNode{
-		Config: config,
+		config: config,
 		name:   name,
 		ips:    ips,
 		client: client,
@@ -271,19 +271,19 @@ func NewRadioNode(name string, ips []net.IP, netDev string, bwLimit float64, cli
 			5,
 			mech,
 			stopch),
-		watcher:     newConfigSyncer(client, config, upch, stopch),
-		icecast:     NewIcecastController(name, stopch),
-		livenessTtl: 2,
-		bw:          bwmonitor.NewBandwidthUsageMonitor(netDev, bwLimit),
-		upch:        upch,
-		stop:        stopch,
+		watcher:   newConfigSyncer(client, config, upch, stopch),
+		icecast:   NewIcecastController(name, stopch),
+		heartbeat: 2,
+		bw:        bwmonitor.NewBandwidthUsageMonitor(netDev, bwLimit),
+		upch:      upch,
+		stop:      stopch,
 	}
 }
 
 // The presence goroutine continuously updates our entry in the list
 // of nodes.
 func (rc *RadioNode) presence() {
-	ticker := time.NewTicker(time.Duration(rc.livenessTtl/2) * time.Second)
+	ticker := time.NewTicker(time.Duration(rc.heartbeat) * time.Second / 3)
 
 	// Register ourselves using the node name.
 	key := autoradio.NodePrefix + rc.name
@@ -304,7 +304,7 @@ func (rc *RadioNode) presence() {
 			// Update our node entry in the database.
 			var buf bytes.Buffer
 			json.NewEncoder(&buf).Encode(&nodeSt)
-			if _, err := rc.client.Set(key, buf.String(), rc.livenessTtl); err != nil {
+			if _, err := rc.client.Set(key, buf.String(), rc.heartbeat); err != nil {
 				log.Printf("presence: Set(): %s", err)
 			}
 
@@ -314,6 +314,17 @@ func (rc *RadioNode) presence() {
 	}
 }
 
+// Get a valid IP address for the current master node (to be passed to
+// Icecast). Since we don't really know much about network topology,
+// just pick the first IP address associated with the master node.
+func (rc *RadioNode) getMasterAddr() net.IP {
+	var info autoradio.MasterNodeInfo
+	if err := json.NewDecoder(strings.NewReader(rc.me.GetMasterData())).Decode(&info); err != nil || len(info.IP) == 0 {
+		return nil
+	}
+	return info.IP[0]
+}
+
 // Run the node. This method does not return until someone calls Stop().
 func (rc *RadioNode) Run() {
 	// Bootstrap the config watcher. This ensures that we have a
@@ -340,7 +351,7 @@ func (rc *RadioNode) Run() {
 		case <-rc.upch:
 			icecastReloads.Incr()
 			log.Printf("reloading icecast config")
-			if err := rc.icecast.Update(rc.Config, rc.me.IsMaster(), rc.me.GetMasterAddr()); err != nil {
+			if err := rc.icecast.Update(rc.config, rc.me.IsMaster(), rc.getMasterAddr()); err != nil {
 				icecastReloadErrors.Incr()
 				log.Printf("Update(): %s", err)
 			}
-- 
GitLab