diff --git a/node/node.go b/node/node.go index 0faa5493e4e319ab6afd0b522adabb5849dfb4a9..a4825a3b0164605e0f8bb3c157fef6790edcda24 100644 --- a/node/node.go +++ b/node/node.go @@ -19,6 +19,12 @@ func trigger(c chan bool) { } } +// Remove mountPrefix from the beginning of the path, but keep the +// leading slash. +func keyToMount(key string) string { + return key[len(radioai.MountPrefix)-1:] +} + // In-memory representation of the overall configuration (basically // just a list of the known mounts). type ClusterConfig struct { @@ -80,33 +86,25 @@ func NewConfigSyncer(client *etcd.Client, config *ClusterConfig, upch chan bool, return &ConfigSyncer{ client: client, config: config, - rch: make(chan *etcd.Response, 100), + rch: make(chan *etcd.Response, 10), upch: upch, stop: stop, } } func (w *ConfigSyncer) syncer() { + log.Printf("starting syncer") for { select { case response := <-w.rch: - // Remove mountPrefix from the beginning of - // the path, but keep the leading slash. - mountName := response.Key[len(radioai.MountPrefix)-1:] switch response.Action { case "DELETE": + mountName := keyToMount(response.Key) log.Printf("deleted mount %s", mountName) w.config.delMount(mountName) case "SET": - log.Printf("update to mount %s: %+v", mountName, response) - var m radioai.Mount - if err := json.NewDecoder(strings.NewReader(response.Value)).Decode(&m); err != nil { - log.Printf("corrupted data: %s", err) - continue - } else { - w.config.setMount(&m) - } + w.updateConfigWithResponse(response) default: continue } @@ -126,20 +124,31 @@ func (w *ConfigSyncer) syncer() { } } +func (w *ConfigSyncer) updateConfigWithResponse(response *etcd.Response) { + mountName := keyToMount(response.Key) + log.Printf("updating mount %s: %+v", mountName, response) + var m radioai.Mount + if err := json.NewDecoder(strings.NewReader(response.Value)).Decode(&m); err != nil { + log.Printf("corrupted data: %s: %s", response.Value, err) + } else { + w.config.setMount(&m) + } +} + // Run the ConfigSyncer in the background. It will wait for // initialization to complete, so that when this function returns, the // in-memory configuration has already been fully synchronized. func (w *ConfigSyncer) Run() { - go w.syncer() - // Run until the first successful Get(). + log.Printf("attempting to retrieve initial config...") for { responses, err := w.client.Get(radioai.MountPrefix) + log.Printf("Get(): %+v", responses) if err == nil { - // Inject all the replies into the channel. + // Directly update the configuration. for _, r := range responses { + w.updateConfigWithResponse(r) w.index = r.Index - w.rch <- r } break } @@ -154,9 +163,11 @@ func (w *ConfigSyncer) Run() { } // Update the icecast daemon now that we have a full config. + log.Printf("triggering initial reload") trigger(w.upch) - // Now start the watcher. + // Now start the watcher, and the syncer. + go w.syncer() go func() { for { curIndex := w.index + 1 @@ -173,7 +184,7 @@ func (w *ConfigSyncer) Run() { // An active streaming node, managing the local icecast server. type RadioNode struct { - Config *ClusterConfig + Config *ClusterConfig ip string client *etcd.Client @@ -206,7 +217,7 @@ func NewRadioNode(ip string, client *etcd.Client) *RadioNode { stopch := make(chan bool) return &RadioNode{ - Config: config, + Config: config, ip: ip, client: client, me: masterelection.NewMasterElection( @@ -227,12 +238,12 @@ func NewRadioNode(ip string, client *etcd.Client) *RadioNode { // The presence goroutine continuously updates our entry in the list // of nodes. func (rc *RadioNode) presence() { - ticker := time.NewTicker(time.Duration(rc.livenessTtl / 2) * time.Second) + ticker := time.NewTicker(time.Duration(rc.livenessTtl/2) * time.Second) for { select { case <-ticker.C: - if _, err := rc.client.Set(radioai.NodePrefix + rc.ip, rc.ip, rc.livenessTtl); err != nil { + if _, err := rc.client.Set(radioai.NodePrefix+rc.ip, rc.ip, rc.livenessTtl); err != nil { log.Printf("presence: Set(): %s", err) } case <-rc.stop: @@ -258,7 +269,7 @@ func (rc *RadioNode) Run() { for { select { case <-rc.upch: - if err := rc.icecast.Update(rc.Config, rc.me.IsMaster(), rc.me.MasterAddr); err != nil { + if err := rc.icecast.Update(rc.Config, rc.me.IsMaster(), rc.me.GetMasterAddr()); err != nil { log.Printf("Update(): %s", err) }