diff --git a/node.go b/node.go index d8d391e1907f896ec512e89d51e3f4041120a913..9bfbda122f8475803d08ed00382a7c202cbb0110 100644 --- a/node.go +++ b/node.go @@ -72,6 +72,7 @@ type ConfigSyncer struct { rch chan *etcd.Response upch chan bool stop chan bool + index uint64 } func NewConfigSyncer(client *etcd.Client, config *ClusterConfig, upch chan bool, stop chan bool) *ConfigSyncer { @@ -92,18 +93,24 @@ func (w *ConfigSyncer) syncer() { // the path, but keep the leading slash. mountName := response.Key[len(mountPrefix)-1:] - if response.Action == "DELETE" { - log.Printf("deleted mount '%s'", mountName) + switch response.Action { + case "DELETE": + log.Printf("deleted mount %s", mountName) w.config.delMount(mountName) - } else { - log.Printf("update to mount '%s'", mountName) + case "SET": + log.Printf("update to mount %s: %+v", mountName, response) var m Mount if err := json.NewDecoder(strings.NewReader(response.Value)).Decode(&m); err != nil { log.Printf("corrupted data: %s", err) + continue } else { w.config.setMount(&m) } + default: + continue } + + w.index = response.Index trigger(w.upch) case <-w.stop: @@ -119,26 +126,42 @@ func (w *ConfigSyncer) Run() { go w.syncer() // Run until the first successful Get(). - var index uint64 for { responses, err := w.client.Get(mountPrefix) if err == nil { // Inject all the replies into the channel. for _, r := range responses { - index = r.Index + w.index = r.Index w.rch <- r } break } log.Printf("Get error: %s", err) - time.Sleep(1 * time.Second) + + // Wait 1 second, but watch the stop channel. + select { + case <-time.After(1 * time.Second): + case <-w.stop: + return + } } + // Update the icecast daemon now that we have a full config. + trigger(w.upch) + // Now start the watcher. - _, err := w.client.Watch(mountPrefix, index, w.rch, nil) - if err != nil { - log.Printf("Watch error: %s", err) - } + go func() { + for { + curIndex := w.index + 1 + log.Printf("starting watcher at index %d", curIndex) + _, err := w.client.Watch(mountPrefix, curIndex, w.rch, w.stop) + if err == etcd.ErrWatchStoppedByUser { + return + } else if err != nil { + log.Printf("Watch(): %s", err) + } + } + }() } // An active streaming node, managing the local icecast server. @@ -203,7 +226,7 @@ func (rc *RadioNode) presence() { select { case <-ticker.C: if _, err := rc.client.Set(nodePrefix + rc.ip, rc.ip, rc.livenessTtl); err != nil { - log.Printf("Set() error: %s", err) + log.Printf("presence: Set(): %s", err) } case <-rc.stop: return @@ -221,11 +244,12 @@ func (rc *RadioNode) Run() { // Start the presence heartbeat. go rc.presence() + log.Printf("starting icecast updater") for { select { case <-rc.upch: if err := rc.icecast.Update(rc.Config, rc.me.IsMaster(), rc.me.MasterAddr); err != nil { - log.Printf("Update() failed: %s", err) + log.Printf("Update(): %s", err) } // Do not reload icecast more often than once