From 226cfc8caa1f1fadece9a04ebd101e15cd0e7c9f Mon Sep 17 00:00:00 2001 From: ale <ale@incal.net> Date: Sat, 11 Oct 2014 11:51:22 +0100 Subject: [PATCH] prevent panic on closed Watch channel go-etcd now closes the channel passed to Watch() on error, so we need to restart the syncer every time and use a temp channel. --- node/node.go | 61 ++++++++++++++++++++++++++++++++-------------------- 1 file changed, 38 insertions(+), 23 deletions(-) diff --git a/node/node.go b/node/node.go index 483b8758..5dbc2545 100644 --- a/node/node.go +++ b/node/node.go @@ -85,7 +85,6 @@ func (c *ClusterConfig) delMount(name string) { type ConfigSyncer struct { client *etcd.Client config *ClusterConfig - rch chan *etcd.Response upch chan bool stop chan bool index uint64 @@ -95,17 +94,30 @@ func NewConfigSyncer(client *etcd.Client, config *ClusterConfig, upch chan bool, return &ConfigSyncer{ client: client, config: config, - rch: make(chan *etcd.Response), upch: upch, stop: stop, } } -func (w *ConfigSyncer) syncer() { - log.Printf("starting syncer") +func (w *ConfigSyncer) setIndex(index uint64) { + w.index = index + configIndex.Set(int64(index)) +} + +func (w *ConfigSyncer) syncer(ch chan *etcd.Response) { for { select { - case response := <-w.rch: + case response, ok := <-ch: + if !ok { + return + } + + // Update the 'last seen' index, so that if + // the Watcher dies, it knows where to start + // from and we do not have to download the + // full configuration again. Do this even for + // actions we don't care about, just in case. + w.setIndex(response.EtcdIndex) if response.Action == "delete" { mountName := keyToMount(response.Node.Key) @@ -117,13 +129,6 @@ func (w *ConfigSyncer) syncer() { continue } - // Update the 'last seen' index, so that if - // the Watcher dies, it knows where to start - // from and we do not have to download the - // full configuration again. - w.index = response.EtcdIndex - configIndex.Set(int64(w.index)) - // Trigger an update. trigger(w.upch) @@ -144,12 +149,8 @@ func (w *ConfigSyncer) updateConfigWithResponse(key, value string) { } } -// Start the ConfigSyncer in the background. It will wait for -// initialization to complete, so that when this function returns, the -// in-memory configuration has already been fully synchronized. -func (w *ConfigSyncer) Start() { - // Run until the first successful Get(). - log.Printf("attempting to retrieve initial config...") +// Load full configuration from etcd. This will trigger the update channel. +func (w *ConfigSyncer) loadFullConfig() { for { response, err := w.client.Get(autoradio.MountPrefix, false, false) if err == nil && response.Node != nil && response.Node.Dir { @@ -157,7 +158,7 @@ func (w *ConfigSyncer) Start() { for _, n := range response.Node.Nodes { w.updateConfigWithResponse(n.Key, n.Value) } - w.index = response.EtcdIndex + w.setIndex(response.EtcdIndex) break } log.Printf("Get error: %s", err) @@ -173,18 +174,32 @@ func (w *ConfigSyncer) Start() { // Update the icecast daemon now that we have a full config. log.Printf("triggering initial reload") trigger(w.upch) +} + +// Start the ConfigSyncer in the background. It will wait for +// initialization to complete, so that when this function returns, the +// in-memory configuration has already been fully synchronized. +func (w *ConfigSyncer) Start() { + // Run until the first successful Get(). + log.Printf("attempting to retrieve initial config") + w.loadFullConfig() - // Now start the watcher, and the syncer. - go w.syncer() + // Main watch loop. Remember that etcd.Watch() will close the + // receiver channel when it returns, so we need to start a new + // syncer every time. + log.Printf("starting config syncer") go func() { for { + ch := make(chan *etcd.Response) + go w.syncer(ch) + curIndex := w.index + 1 log.Printf("starting watcher at index %d", curIndex) - _, err := w.client.Watch(autoradio.MountPrefix, curIndex, true, w.rch, w.stop) + _, err := w.client.Watch(autoradio.MountPrefix, curIndex, true, ch, w.stop) if err == etcd.ErrWatchStoppedByUser { return } else if err != nil { - // Log the error, but keep watching. + // Log the error and start over. log.Printf("Watch(): %s", err) } } -- GitLab