Skip to content
Snippets Groups Projects
Commit 226cfc8c authored by ale's avatar ale
Browse files

prevent panic on closed Watch channel

go-etcd now closes the channel passed to Watch() on error, so we need
to restart the syncer every time and use a temp channel.
parent 7bb74cd4
No related branches found
No related tags found
No related merge requests found
...@@ -85,7 +85,6 @@ func (c *ClusterConfig) delMount(name string) { ...@@ -85,7 +85,6 @@ func (c *ClusterConfig) delMount(name string) {
type ConfigSyncer struct { type ConfigSyncer struct {
client *etcd.Client client *etcd.Client
config *ClusterConfig config *ClusterConfig
rch chan *etcd.Response
upch chan bool upch chan bool
stop chan bool stop chan bool
index uint64 index uint64
...@@ -95,17 +94,30 @@ func NewConfigSyncer(client *etcd.Client, config *ClusterConfig, upch chan bool, ...@@ -95,17 +94,30 @@ func NewConfigSyncer(client *etcd.Client, config *ClusterConfig, upch chan bool,
return &ConfigSyncer{ return &ConfigSyncer{
client: client, client: client,
config: config, config: config,
rch: make(chan *etcd.Response),
upch: upch, upch: upch,
stop: stop, stop: stop,
} }
} }
func (w *ConfigSyncer) syncer() { func (w *ConfigSyncer) setIndex(index uint64) {
log.Printf("starting syncer") w.index = index
configIndex.Set(int64(index))
}
func (w *ConfigSyncer) syncer(ch chan *etcd.Response) {
for { for {
select { select {
case response := <-w.rch: case response, ok := <-ch:
if !ok {
return
}
// Update the 'last seen' index, so that if
// the Watcher dies, it knows where to start
// from and we do not have to download the
// full configuration again. Do this even for
// actions we don't care about, just in case.
w.setIndex(response.EtcdIndex)
if response.Action == "delete" { if response.Action == "delete" {
mountName := keyToMount(response.Node.Key) mountName := keyToMount(response.Node.Key)
...@@ -117,13 +129,6 @@ func (w *ConfigSyncer) syncer() { ...@@ -117,13 +129,6 @@ func (w *ConfigSyncer) syncer() {
continue continue
} }
// Update the 'last seen' index, so that if
// the Watcher dies, it knows where to start
// from and we do not have to download the
// full configuration again.
w.index = response.EtcdIndex
configIndex.Set(int64(w.index))
// Trigger an update. // Trigger an update.
trigger(w.upch) trigger(w.upch)
...@@ -144,12 +149,8 @@ func (w *ConfigSyncer) updateConfigWithResponse(key, value string) { ...@@ -144,12 +149,8 @@ func (w *ConfigSyncer) updateConfigWithResponse(key, value string) {
} }
} }
// Start the ConfigSyncer in the background. It will wait for // Load full configuration from etcd. This will trigger the update channel.
// initialization to complete, so that when this function returns, the func (w *ConfigSyncer) loadFullConfig() {
// in-memory configuration has already been fully synchronized.
func (w *ConfigSyncer) Start() {
// Run until the first successful Get().
log.Printf("attempting to retrieve initial config...")
for { for {
response, err := w.client.Get(autoradio.MountPrefix, false, false) response, err := w.client.Get(autoradio.MountPrefix, false, false)
if err == nil && response.Node != nil && response.Node.Dir { if err == nil && response.Node != nil && response.Node.Dir {
...@@ -157,7 +158,7 @@ func (w *ConfigSyncer) Start() { ...@@ -157,7 +158,7 @@ func (w *ConfigSyncer) Start() {
for _, n := range response.Node.Nodes { for _, n := range response.Node.Nodes {
w.updateConfigWithResponse(n.Key, n.Value) w.updateConfigWithResponse(n.Key, n.Value)
} }
w.index = response.EtcdIndex w.setIndex(response.EtcdIndex)
break break
} }
log.Printf("Get error: %s", err) log.Printf("Get error: %s", err)
...@@ -173,18 +174,32 @@ func (w *ConfigSyncer) Start() { ...@@ -173,18 +174,32 @@ func (w *ConfigSyncer) Start() {
// Update the icecast daemon now that we have a full config. // Update the icecast daemon now that we have a full config.
log.Printf("triggering initial reload") log.Printf("triggering initial reload")
trigger(w.upch) trigger(w.upch)
}
// Start the ConfigSyncer in the background. It will wait for
// initialization to complete, so that when this function returns, the
// in-memory configuration has already been fully synchronized.
func (w *ConfigSyncer) Start() {
// Run until the first successful Get().
log.Printf("attempting to retrieve initial config")
w.loadFullConfig()
// Now start the watcher, and the syncer. // Main watch loop. Remember that etcd.Watch() will close the
go w.syncer() // receiver channel when it returns, so we need to start a new
// syncer every time.
log.Printf("starting config syncer")
go func() { go func() {
for { for {
ch := make(chan *etcd.Response)
go w.syncer(ch)
curIndex := w.index + 1 curIndex := w.index + 1
log.Printf("starting watcher at index %d", curIndex) log.Printf("starting watcher at index %d", curIndex)
_, err := w.client.Watch(autoradio.MountPrefix, curIndex, true, w.rch, w.stop) _, err := w.client.Watch(autoradio.MountPrefix, curIndex, true, ch, w.stop)
if err == etcd.ErrWatchStoppedByUser { if err == etcd.ErrWatchStoppedByUser {
return return
} else if err != nil { } else if err != nil {
// Log the error, but keep watching. // Log the error and start over.
log.Printf("Watch(): %s", err) log.Printf("Watch(): %s", err)
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment