From 6c9f8d3129e1176df80dc76397b37e78a6634693 Mon Sep 17 00:00:00 2001
From: ale <ale@incal.net>
Date: Fri, 15 Nov 2013 20:39:21 +0000
Subject: [PATCH] update to the etcd v2 API

---
 api.go                           |  48 ++++++++-----
 cmd/radioctl/radioctl.go         |   5 +-
 etcd_client.go                   |   2 +-
 masterelection/masterelection.go | 117 +++++++++++++++++++++++++------
 node/node.go                     |  41 ++++++-----
 5 files changed, 149 insertions(+), 64 deletions(-)

diff --git a/api.go b/api.go
index 5ad33a3e..5d8e9311 100644
--- a/api.go
+++ b/api.go
@@ -18,7 +18,10 @@ var (
 	MountPrefix        = "/icecast/mounts/"
 	NodePrefix         = "/icecast/nodes/"
 
-	IcecastPort        = 8000
+	IcecastPort = 8000
+
+	ErrIsDirectory = errors.New("key is a directory")
+	ErrIsFile      = errors.New("key is a file")
 )
 
 // A mountpoint for a stream.
@@ -40,7 +43,8 @@ func mountPath(mountName string) string {
 	return MountPrefix + mountName[1:]
 }
 
-// Cache the list of active nodes.
+// Cache the list of active nodes (the front-ends that need to
+// retrieve this information continuously, so we limit them to 2qps).
 type nodesCache struct {
 	ttl      time.Duration
 	nodes    []string
@@ -89,16 +93,16 @@ func NewRadioAPI(client *etcd.Client) *RadioAPI {
 // GetMount returns data on a specific mountpoint (returns nil if not
 // found).
 func (r *RadioAPI) GetMount(mountName string) (*Mount, error) {
-	response, err := r.client.Get(mountPath(mountName))
+	response, err := r.client.Get(mountPath(mountName), false)
 	if err != nil {
 		return nil, err
 	}
-	if len(response) != 1 {
-		return nil, nil
+	if response.Dir {
+		return nil, ErrIsDirectory
 	}
 
 	var m Mount
-	if err := json.NewDecoder(strings.NewReader(response[0].Value)).Decode(&m); err != nil {
+	if err := json.NewDecoder(strings.NewReader(response.Value)).Decode(&m); err != nil {
 		return nil, err
 	}
 	return &m, nil
@@ -123,14 +127,21 @@ func (r *RadioAPI) DelMount(mountName string) error {
 
 // ListMounts returns a list of all the configured mountpoints.
 func (r *RadioAPI) ListMounts() ([]*Mount, error) {
-	response, err := r.client.Get(MountPrefix)
+	response, err := r.client.Get(MountPrefix, false)
 	if err != nil {
 		return nil, err
 	}
-	result := make([]*Mount, 0, len(response))
-	for _, entry := range response {
+	if !response.Dir {
+		return nil, ErrIsFile
+	}
+
+	result := make([]*Mount, 0, len(response.Kvs))
+	for _, kv := range response.Kvs {
+		if kv.Dir {
+			continue
+		}
 		var m Mount
-		if err := json.NewDecoder(strings.NewReader(entry.Value)).Decode(&m); err != nil {
+		if err := json.NewDecoder(strings.NewReader(kv.Value)).Decode(&m); err != nil {
 			continue
 		}
 		result = append(result, &m)
@@ -140,24 +151,27 @@ func (r *RadioAPI) ListMounts() ([]*Mount, error) {
 
 // GetMasterAddr returns the address of the current master server.
 func (r *RadioAPI) GetMasterAddr() (string, error) {
-	response, err := r.client.Get(MasterElectionPath)
+	response, err := r.client.Get(MasterElectionPath, false)
 	if err != nil {
 		return "", err
 	}
-	if len(response) < 1 {
-		return "", errors.New("no active master")
+	if response.Dir {
+		return "", ErrIsDirectory
 	}
-	return response[0].Value, nil
+	return response.Value, nil
 }
 
 // GetNodes returns the list of active cluster nodes.
 func (r *RadioAPI) doGetNodes() ([]string, error) {
-	response, err := r.client.Get(NodePrefix)
+	response, err := r.client.Get(NodePrefix, false)
 	if err != nil {
 		return nil, err
 	}
-	result := make([]string, 0, len(response))
-	for _, entry := range response {
+	if !response.Dir {
+		return nil, ErrIsFile
+	}
+	result := make([]string, 0, len(response.Kvs))
+	for _, entry := range response.Kvs {
 		result = append(result, entry.Value)
 	}
 	return result, nil
diff --git a/cmd/radioctl/radioctl.go b/cmd/radioctl/radioctl.go
index e827fba8..3b3f45e0 100644
--- a/cmd/radioctl/radioctl.go
+++ b/cmd/radioctl/radioctl.go
@@ -104,9 +104,10 @@ func generateUsername(path string) string {
 
 func createMount(args []string) {
 	path := args[0]
-	if !strings.HasPrefix(path, "/") {
-		log.Fatal("Mount points should specify a full path")
+	if strings.Contains(path, "/") {
+		log.Fatal("Mount points should not contain a slash ('/').")
 	}
+	path = "/" + path
 
 	// Check if the mount already exists.
 	client := getClient()
diff --git a/etcd_client.go b/etcd_client.go
index 5d62395c..09b9d2aa 100644
--- a/etcd_client.go
+++ b/etcd_client.go
@@ -59,7 +59,7 @@ func NewEtcdClient() *etcd.Client {
 	c := etcd.NewClient(machines)
 	if proto == "https" {
 		c.SetScheme(etcd.HTTPS)
-		if _, err := c.SetCertAndKey(loadFile(*etcdCertFile), loadFile(*etcdKeyFile)); err != nil {
+		if err := c.SetCertAndKey(loadFile(*etcdCertFile), loadFile(*etcdKeyFile)); err != nil {
 			log.Fatal("Error setting up SSL for etcd client: %s", err)
 		}
 	}
diff --git a/masterelection/masterelection.go b/masterelection/masterelection.go
index 96015f6c..93eb2f1a 100644
--- a/masterelection/masterelection.go
+++ b/masterelection/masterelection.go
@@ -23,8 +23,8 @@ func stateToString(state int) string {
 }
 
 type MasterElection struct {
-	client *etcd.Client
-	stop chan bool
+	client  *etcd.Client
+	stop    chan bool
 	stopped bool
 
 	Addr string
@@ -55,11 +55,11 @@ func (m *MasterElection) IsMaster() bool {
 }
 
 func (m *MasterElection) GetMasterAddr() string {
-	responses, err := m.client.Get(m.Path)
-	if err != nil || len(responses) != 1 {
+	response, err := m.client.Get(m.Path, false)
+	if err != nil {
 		return ""
 	}
-	return responses[0].Value
+	return response.Value
 }
 
 func (m *MasterElection) setState(state int) {
@@ -87,42 +87,113 @@ func (m *MasterElection) stopper() {
 
 	// Remove the lock file if we are the master.
 	if m.State == STATE_MASTER {
+		log.Printf("releasing masterelection lock")
 		m.client.Delete(m.Path)
 	}
 }
 
+func boolTimer(delay time.Duration) chan bool {
+	ch := make(chan bool)
+	go func() {
+		time.Sleep(delay)
+		close(ch)
+	}()
+	return ch
+}
+
+func (m *MasterElection) loop() {
+
+}
+
+func (m *MasterElection) runMaster(index uint64) {
+	m.setState(STATE_MASTER)
+
+	// If we renew the lease every TTL / N, we allow N renewal
+	// errors before we stop believing being the master.
+	ttl := time.Second * time.Duration(m.TTL)
+	tick := time.NewTicker(ttl / 3)
+	lastUpdate := time.Now()
+
+	for {
+		select {
+		case t := <- tick.C:
+			// To verify that we actually are still the
+			// master (not just we believe we are), try
+			// yet another compare-and-swap to check that
+			// the stored master address is still our own,
+			// and no-one stole our lock. If not, the TTL
+			// will be updated (and the lock renewed).
+			response, err := m.client.CompareAndSwap(m.Path, m.Addr, m.TTL, m.Addr, index)
+			if err != nil {
+				log.Printf("error updating lock: %s", err)
+
+				// If we can't renew the lock for a
+				// TTL, we must assume we lost it.
+				if t.Sub(lastUpdate) > ttl {
+					log.Printf("too many errors, lost lock")
+					return
+				}
+			}
+			index = response.ModifiedIndex
+			lastUpdate = t
+		case <-m.stop:
+			return
+		}
+	}
+}
+
+func (m *MasterElection) runSlave(index uint64) {
+	m.setState(STATE_SLAVE)
+
+	for {
+		// Start a watch on the lock, waiting for its removal.
+		response, err := m.client.Watch(m.Path, index+1, nil, m.stop)
+		if err != nil {
+			log.Printf("slave Watch() error: %+v", err)
+			return
+		}
+
+		if response.Action == "delete" || response.Action == "expire" {
+			return
+		}
+
+		index = response.ModifiedIndex
+	}
+
+}
+
 func (m *MasterElection) Run() {
 	go m.stopper()
 
 	// Start as a slave.
 	m.setState(STATE_SLAVE)
 
-	halfttl := time.Second * time.Duration(m.TTL / 2)
+	var watchIndex uint64
 
 	for !m.stopped {
+
+		// Since a failed Create does not return the
+		// RAFT index, let's optimistically query the lock
+		// before starting just to set a baseline for the
+		// index.
+		if iresponse, err := m.client.Get(m.Path, false); err == nil {
+			log.Printf("lock already exists: %+v", iresponse)
+			watchIndex = iresponse.ModifiedIndex
+		}
+
 		// Try to acquire the lock. If we are currently the
 		// master, the previous value should be our own
 		// address, otherwise it should be unset.
-		prevValue := ""
-		if m.State == STATE_MASTER {
-			prevValue = m.Addr
-		}
-		if _, ok, _ := m.client.TestAndSet(m.Path, prevValue, m.Addr, m.TTL); ok {
+		response, err := m.client.Create(m.Path, m.Addr, m.TTL)
+
+		if err == nil {
 			// Howdy, we're the master now. Wait a while
 			// and renew our TTL.
-			m.setState(STATE_MASTER)
-			time.Sleep(halfttl)
+			m.runMaster(response.ModifiedIndex)
 		} else {
-			// We're not the master. We could wait for a
-			// DELETE but I'm not sure if that's what you
-			// get on expiry, so we just wait for the
-			// first event which will be another SET from
-			// the current master. Oh well.
-			m.setState(STATE_SLAVE)
-			_, err := m.client.Watch(m.Path, 0, nil, nil)
-			if err != nil {
-				log.Printf("%s: watch error: %s", m.Path, err)
-			}
+			// We're not the master. Wait until the lock
+			// is deleted or expires.
+			m.runSlave(watchIndex)
 		}
 	}
 }
diff --git a/node/node.go b/node/node.go
index a4825a3b..351f1520 100644
--- a/node/node.go
+++ b/node/node.go
@@ -86,7 +86,7 @@ func NewConfigSyncer(client *etcd.Client, config *ClusterConfig, upch chan bool,
 	return &ConfigSyncer{
 		client: client,
 		config: config,
-		rch:    make(chan *etcd.Response, 10),
+		rch:    make(chan *etcd.Response),
 		upch:   upch,
 		stop:   stop,
 	}
@@ -97,15 +97,13 @@ func (w *ConfigSyncer) syncer() {
 	for {
 		select {
 		case response := <-w.rch:
-
-			switch response.Action {
-			case "DELETE":
+			if response.Action == "delete" {
 				mountName := keyToMount(response.Key)
 				log.Printf("deleted mount %s", mountName)
 				w.config.delMount(mountName)
-			case "SET":
-				w.updateConfigWithResponse(response)
-			default:
+			} else if response.Action == "set" || response.Action == "create" || response.Action == "update" {
+				w.updateConfigWithResponse(response.Key, response.Value)
+			} else {
 				continue
 			}
 
@@ -113,7 +111,7 @@ func (w *ConfigSyncer) syncer() {
 			// the Watcher dies, it knows where to start
 			// from and we do not have to download the
 			// full configuration again.
-			w.index = response.Index
+			w.index = response.ModifiedIndex
 
 			// Trigger an update.
 			trigger(w.upch)
@@ -124,12 +122,12 @@ func (w *ConfigSyncer) syncer() {
 	}
 }
 
-func (w *ConfigSyncer) updateConfigWithResponse(response *etcd.Response) {
-	mountName := keyToMount(response.Key)
-	log.Printf("updating mount %s: %+v", mountName, response)
+func (w *ConfigSyncer) updateConfigWithResponse(key, value string) {
+	mountName := keyToMount(key)
+	log.Printf("updating mount %s: %s", mountName, value)
 	var m radioai.Mount
-	if err := json.NewDecoder(strings.NewReader(response.Value)).Decode(&m); err != nil {
-		log.Printf("corrupted data: %s: %s", response.Value, err)
+	if err := json.NewDecoder(strings.NewReader(value)).Decode(&m); err != nil {
+		log.Printf("corrupted data: %s: %s", value, err)
 	} else {
 		w.config.setMount(&m)
 	}
@@ -142,14 +140,13 @@ func (w *ConfigSyncer) Run() {
 	// Run until the first successful Get().
 	log.Printf("attempting to retrieve initial config...")
 	for {
-		responses, err := w.client.Get(radioai.MountPrefix)
-		log.Printf("Get(): %+v", responses)
-		if err == nil {
+		response, err := w.client.Get(radioai.MountPrefix, false)
+		if err == nil && response.Dir {
 			// Directly update the configuration.
-			for _, r := range responses {
-				w.updateConfigWithResponse(r)
-				w.index = r.Index
+			for _, r := range response.Kvs {
+				w.updateConfigWithResponse(r.Key, r.Value)
 			}
+			w.index = response.ModifiedIndex
 			break
 		}
 		log.Printf("Get error: %s", err)
@@ -207,8 +204,7 @@ func NewRadioNode(ip string, client *etcd.Client) *RadioNode {
 	// MasterElection changes trigger an update.
 	mech := make(chan int)
 	go func() {
-		for state := range mech {
-			log.Printf("master election status changed: %d", state)
+		for _ = range mech {
 			trigger(upch)
 		}
 	}()
@@ -286,4 +282,7 @@ func (rc *RadioNode) Run() {
 // Stop everything.
 func (rc *RadioNode) Stop() {
 	close(rc.stop)
+
+	// We should use WaitGroups here. Instead, wait 2 seconds.
+	time.Sleep(2 * time.Second)
 }
-- 
GitLab