From 4c9e0b30a87502df55cbf9a2d601e000cc85e6b5 Mon Sep 17 00:00:00 2001
From: ale <ale@incal.net>
Date: Mon, 4 Nov 2013 19:39:02 +0000
Subject: [PATCH] add client API; add graceful stop; clean up code

---
 TODO                             |  12 +++
 api.go                           |  83 ++++++++++++++
 icecast_config_test.go           |   2 +-
 masterelection/masterelection.go |  25 ++++-
 node.go                          | 179 +++++++++++++++++++------------
 5 files changed, 229 insertions(+), 72 deletions(-)
 create mode 100644 TODO
 create mode 100644 api.go

diff --git a/TODO b/TODO
new file mode 100644
index 00000000..2c608a58
--- /dev/null
+++ b/TODO
@@ -0,0 +1,12 @@
+
+Not necessarily ordered by priority:
+
+- HTTP redirectors for sources (point to the master), and relays (point to
+  any node) for the low-TTL naming service.
+
+- DNS server to provide the necessary bootstrap infrastructure and
+  the higher-TTL, stable, naming service.
+
+- Debian packages, init scripts, everything that is necessary for proper
+  packaging and easy distribution.
+
diff --git a/api.go b/api.go
new file mode 100644
index 00000000..f6959b2f
--- /dev/null
+++ b/api.go
@@ -0,0 +1,83 @@
+package radioai
+
+import (
+	"bytes"
+	"encoding/json"
+	"errors"
+	"github.com/coreos/go-etcd/etcd"
+	"strings"
+)
+
+func mountPath(mountName string) string {
+	return mountPrefix + mountName[1:]
+}
+
+// RadioAPI is the actual API to the streaming cluster's database.
+type RadioAPI struct {
+	client *etcd.Client
+}
+
+// GetMount returns data on a specific mountpoint (returns an error if
+// not found).
+func (r *RadioAPI) GetMount(mountName string) (*Mount, error) {
+	response, err := r.client.Get(mountPath(mountName))
+	if err != nil {
+		return nil, err
+	}
+	if len(response) != 1 {
+		return nil, errors.New("not found")
+	}
+
+	var m Mount
+	if err := json.NewDecoder(strings.NewReader(response[0].Value)).Decode(&m); err != nil {
+		return nil, err
+	}
+	return &m, nil
+}
+
+// SetMount creates or updates a mountpoint.
+func (r *RadioAPI) SetMount(m *Mount) error {
+	var buf bytes.Buffer
+	if err := json.NewEncoder(&buf).Encode(m); err != nil {
+		return err
+	}
+
+	_, err := r.client.Set(mountPath(m.Name), buf.String(), 0)
+	return err
+}
+
+// DelMount removes a mountpoint.
+func (r *RadioAPI) DelMount(mountName string) error {
+	_, err := r.client.Delete(mountPath(mountName))
+	return err
+}
+
+// ListMounts returns a list of all the configured mountpoints.
+func (r *RadioAPI) ListMounts() ([]*Mount, error) {
+	response, err := r.client.Get(mountPrefix)
+	if err != nil {
+		return nil, err
+	}
+	result := make([]*Mount, 0, len(response))
+	for _, entry := range response {
+		var m Mount
+		if err := json.NewDecoder(strings.NewReader(entry.Value)).Decode(&m); err != nil {
+			continue
+		}
+		result = append(result, &m)
+	}
+	return result, nil
+}
+
+// GetNodes returns the list of active cluster nodes.
+func (r *RadioAPI) GetNodes() ([]string, error) {
+	response, err := r.client.Get(nodePrefix)
+	if err != nil {
+		return nil, err
+	}
+	result := make([]string, 0, len(response))
+	for _, entry := range response {
+		result = append(result, entry.Value)
+	}
+	return result, nil
+}
diff --git a/icecast_config_test.go b/icecast_config_test.go
index 272fd948..8c263358 100644
--- a/icecast_config_test.go
+++ b/icecast_config_test.go
@@ -12,7 +12,7 @@ func TestIcecastConfig(t *testing.T) {
 		Password: "pass",
 	}
 	c := NewClusterConfig()
-	c.UpdateMount(mount)
+	c.setMount(mount)
 
 	// Test a relay config.
 	ice := NewIcecastConfig(c, "1.2.3.4", false, "2.3.4.5")
diff --git a/masterelection/masterelection.go b/masterelection/masterelection.go
index 51add679..a9e8dc19 100644
--- a/masterelection/masterelection.go
+++ b/masterelection/masterelection.go
@@ -14,6 +14,8 @@ const (
 
 type MasterElection struct {
 	client *etcd.Client
+	stop chan bool
+	stopped bool
 
 	Addr string
 	MasterAddr string
@@ -24,7 +26,7 @@ type MasterElection struct {
 	StateChange chan int
 }
 
-func NewMasterElection(client *etcd.Client, path, addr string, ttl uint64, sch chan int) *MasterElection {
+func NewMasterElection(client *etcd.Client, path, addr string, ttl uint64, sch chan int, stop chan bool) *MasterElection {
 	if ttl < 2 {
 		ttl = 2
 	}
@@ -35,6 +37,7 @@ func NewMasterElection(client *etcd.Client, path, addr string, ttl uint64, sch c
 		TTL:         ttl,
 		State:       STATE_SLAVE,
 		StateChange: sch,
+		stop:        stop,
 	}
 }
 
@@ -52,13 +55,31 @@ func (m *MasterElection) setState(state int) {
 	m.State = state
 }
 
+func (m *MasterElection) stopper() {
+	<-m.stop
+
+	// Tell the Run() goroutine to exit as soon as it can (we
+	// could have simply used the 'stop' channel there but Watch()
+	// ignores the stop channel if invoked without a receiving
+	// channel and we'd have to refactor Run() to use a temp
+	// channel for each invocation.
+	m.stopped = true
+
+	// Remove the lock file if we are the master.
+	if m.State == STATE_MASTER {
+		m.client.Delete(m.Path)
+	}
+}
+
 func (m *MasterElection) Run() {
+	go m.stopper()
+
 	// Start as a slave.
 	m.setState(STATE_SLAVE)
 
 	halfttl := time.Second * time.Duration(m.TTL / 2)
 
-	for {
+	for !m.stopped {
 		// Try to acquire the lock. If we are currently the
 		// master, the previous value should be our own
 		// address, otherwise it should be unset.
diff --git a/node.go b/node.go
index 69ad9e9a..d6120e99 100644
--- a/node.go
+++ b/node.go
@@ -3,7 +3,6 @@ package radioai
 import (
 	"encoding/json"
 	"log"
-	"path/filepath"
 	"strings"
 	"sync"
 	"time"
@@ -18,6 +17,13 @@ var (
 	nodePrefix         = "/icecast/nodes/"
 )
 
+func trigger(c chan bool) {
+	select {
+	case c <- true:
+	default:
+	}
+}
+
 // A mountpoint for a stream.
 type Mount struct {
 	// Name (path to the mountpoint).
@@ -46,74 +52,91 @@ func NewClusterConfig() *ClusterConfig {
 	}
 }
 
+// TODO: remove?
 func (c *ClusterConfig) GetMount(name string) *Mount {
 	c.lock.Lock()
 	defer c.lock.Unlock()
 	return c.mounts[name]
 }
 
-func (c *ClusterConfig) UpdateMount(m *Mount) {
+// TODO: remove?
+func (c *ClusterConfig) ListMounts() []*Mount {
 	c.lock.Lock()
 	defer c.lock.Unlock()
-	c.mounts[m.Name] = m
+	result := make([]*Mount, 0, len(c.mounts))
+	for _, m := range c.mounts {
+		result = append(result, m)
+	}
+	return result
 }
 
-func (c *ClusterConfig) DelMount(name string) {
+// Update a mount (in-memory only).
+func (c *ClusterConfig) setMount(m *Mount) {
 	c.lock.Lock()
 	defer c.lock.Unlock()
-	delete(c.mounts, name)
+	c.mounts[m.Name] = m
 }
 
-func (c *ClusterConfig) ListMounts() []*Mount {
+// Delete a mount (in-memory only).
+func (c *ClusterConfig) delMount(name string) {
 	c.lock.Lock()
 	defer c.lock.Unlock()
-	result := make([]*Mount, 0, len(c.mounts))
-	for _, m := range c.mounts {
-		result = append(result, m)
-	}
-	return result
+	delete(c.mounts, name)
 }
 
-type ConfigWatcher struct {
+// Keeps the in-memory service configuration in sync with the
+// distributed database. An update channel is triggered whenever the
+// data changes.
+type ConfigSyncer struct {
 	client *etcd.Client
 	config *ClusterConfig
 	rch    chan *etcd.Response
 	upch   chan bool
+	stop   chan bool
 }
 
-func NewConfigWatcher(client *etcd.Client, config *ClusterConfig, upch chan bool) *ConfigWatcher {
-	return &ConfigWatcher{
+func NewConfigSyncer(client *etcd.Client, config *ClusterConfig, upch chan bool, stop chan bool) *ConfigSyncer {
+	return &ConfigSyncer{
 		client: client,
 		config: config,
 		rch:    make(chan *etcd.Response, 100),
 		upch:   upch,
+		stop:   stop,
 	}
 }
 
-func (w *ConfigWatcher) syncer() {
-	for response := range w.rch {
-		mountName := filepath.Base(response.Key)
-		if response.Action == "DELETE" {
-			log.Printf("deleted mount '%s'", mountName)
-			w.config.DelMount(mountName)
-		} else {
-			log.Printf("update to mount '%s'", mountName)
-			var m Mount
-			if err := json.NewDecoder(strings.NewReader(response.Value)).Decode(&m); err != nil {
-				log.Printf("corrupted data: %s", err)
+func (w *ConfigSyncer) syncer() {
+	for {
+		select {
+		case response := <-w.rch:
+			// Remove mountPrefix from the beginning of
+			// the path, but keep the leading slash.
+			mountName := response.Key[len(mountPrefix)-1:]
+
+			if response.Action == "DELETE" {
+				log.Printf("deleted mount '%s'", mountName)
+				w.config.delMount(mountName)
 			} else {
-				w.config.UpdateMount(&m)
+				log.Printf("update to mount '%s'", mountName)
+				var m Mount
+				if err := json.NewDecoder(strings.NewReader(response.Value)).Decode(&m); err != nil {
+					log.Printf("corrupted data: %s", err)
+				} else {
+					w.config.setMount(&m)
+				}
 			}
-		}
+			trigger(w.upch)
 
-		// Only flip the update signal once.
-		select {
-		case w.upch <- true:
+		case <-w.stop:
+			return
 		}
 	}
 }
 
-func (w *ConfigWatcher) Run() {
+// Run the ConfigSyncer in the background. It will wait for
+// initialization to complete, so that when this function returns, the
+// in-memory configuration has already been fully synchronized.
+func (w *ConfigSyncer) Run() {
 	go w.syncer()
 
 	// Run until the first successful Get().
@@ -139,34 +162,42 @@ func (w *ConfigWatcher) Run() {
 	}
 }
 
-type RadioCluster struct {
+// An active streaming node, managing the local icecast server.
+type RadioNode struct {
+	Config      *ClusterConfig
+
 	ip          string
 	client      *etcd.Client
 	me          *masterelection.MasterElection
-	config      *ClusterConfig
-	watcher     *ConfigWatcher
+	watcher     *ConfigSyncer
 	icecast     *IcecastController
-	upch        chan bool
 	livenessTtl uint64
+	upch        chan bool
+	stop        chan bool
 }
 
-func NewRadioCluster(ip string, client *etcd.Client) *RadioCluster {
+func NewRadioNode(ip string, client *etcd.Client) *RadioNode {
 	config := NewClusterConfig()
 
-	// Network updates trigger icecast reconfiguration.
+	// Network updates trigger icecast reconfiguration. This
+	// channel is used as an 'event', no more than one entry will
+	// be queued.
 	upch := make(chan bool, 1)
 
 	// MasterElection changes trigger an update.
 	mech := make(chan int)
 	go func() {
-		for _ = range mech {
-			select {
-			case upch <- true:
-			}
+		for state := range mech {
+			log.Printf("master election status changed: %d", state)
+			trigger(upch)
 		}
 	}()
 
-	return &RadioCluster{
+	// Global 'stop' channel.
+	stopch := make(chan bool)
+
+	return &RadioNode{
+		Config:      config,
 		ip:     ip,
 		client: client,
 		me: masterelection.NewMasterElection(
@@ -174,40 +205,35 @@ func NewRadioCluster(ip string, client *etcd.Client) *RadioCluster {
 			masterElectionPath,
 			ip,
 			5,
-			mech),
-		config:      config,
-		watcher:     NewConfigWatcher(client, config, upch),
+			mech,
+			stopch),
+		watcher:     NewConfigSyncer(client, config, upch, stopch),
 		icecast:     NewIcecastController(ip),
-		upch:        upch,
 		livenessTtl: 2,
+		upch:        upch,
+		stop:        stopch,
 	}
 }
 
-func (rc *RadioCluster) presence() {
+// The presence goroutine continuously updates our entry in the list
+// of nodes.
+func (rc *RadioNode) presence() {
+	ticker := time.NewTicker(time.Duration(rc.livenessTtl / 2) * time.Second)
+
 	for {
-		if _, err := rc.client.Set(nodePrefix + rc.ip, rc.ip, rc.livenessTtl); err != nil {
-			log.Printf("Set() error: %s", err)
-			time.Sleep(100 * time.Millisecond)
-		} else {
-			time.Sleep(time.Duration(rc.livenessTtl / 2) * time.Second)
+		select {
+		case <-ticker.C:
+			if _, err := rc.client.Set(nodePrefix + rc.ip, rc.ip, rc.livenessTtl); err != nil {
+				log.Printf("Set() error: %s", err)
+			}
+		case <-rc.stop:
+			return
 		}
 	}
 }
 
-// Return the list of currently active nodes.
-func (rc *RadioCluster) GetNodes() ([]string, error) {
-	response, err := rc.client.Get(nodePrefix)
-	if err != nil {
-		return nil, err
-	}
-	result := make([]string, 0, len(response))
-	for _, r := range response {
-		result = append(result, r.Value)
-	}
-	return result, nil
-}
-
-func (rc *RadioCluster) Run() {
+// Run the node. This method does not return until someone calls Stop().
+func (rc *RadioNode) Run() {
 	// Bootstrap the config watcher. This ensures that we have a
 	// full configuration (thanks to the Get() call) before we
 	// start managing the icecast server.
@@ -216,9 +242,24 @@ func (rc *RadioCluster) Run() {
 	// Start the presence heartbeat.
 	go rc.presence()
 
-	for _ = range rc.upch {
-		if err := rc.icecast.Update(rc.config, rc.me.IsMaster(), rc.me.MasterAddr); err != nil {
-			log.Printf("Update() failed: %s", err)
+	for {
+		select {
+		case <-rc.upch:
+			if err := rc.icecast.Update(rc.Config, rc.me.IsMaster(), rc.me.MasterAddr); err != nil {
+				log.Printf("Update() failed: %s", err)
+			}
+
+			// Do not reload icecast more often than once
+			// every two seconds.
+			time.Sleep(2 * time.Second)
+
+		case <-rc.stop:
+			return
 		}
 	}
 }
+
+// Stop everything.
+func (rc *RadioNode) Stop() {
+	close(rc.stop)
+}
-- 
GitLab