Skip to content
Snippets Groups Projects
Commit e0e4d90d authored by ale's avatar ale
Browse files

properly initialize config pre-sync

parent 93ceaebe
Branches
Tags
No related merge requests found
...@@ -19,6 +19,12 @@ func trigger(c chan bool) { ...@@ -19,6 +19,12 @@ func trigger(c chan bool) {
} }
} }
// Remove mountPrefix from the beginning of the path, but keep the
// leading slash.
func keyToMount(key string) string {
return key[len(radioai.MountPrefix)-1:]
}
// In-memory representation of the overall configuration (basically // In-memory representation of the overall configuration (basically
// just a list of the known mounts). // just a list of the known mounts).
type ClusterConfig struct { type ClusterConfig struct {
...@@ -80,33 +86,25 @@ func NewConfigSyncer(client *etcd.Client, config *ClusterConfig, upch chan bool, ...@@ -80,33 +86,25 @@ func NewConfigSyncer(client *etcd.Client, config *ClusterConfig, upch chan bool,
return &ConfigSyncer{ return &ConfigSyncer{
client: client, client: client,
config: config, config: config,
rch: make(chan *etcd.Response, 100), rch: make(chan *etcd.Response, 10),
upch: upch, upch: upch,
stop: stop, stop: stop,
} }
} }
func (w *ConfigSyncer) syncer() { func (w *ConfigSyncer) syncer() {
log.Printf("starting syncer")
for { for {
select { select {
case response := <-w.rch: case response := <-w.rch:
// Remove mountPrefix from the beginning of
// the path, but keep the leading slash.
mountName := response.Key[len(radioai.MountPrefix)-1:]
switch response.Action { switch response.Action {
case "DELETE": case "DELETE":
mountName := keyToMount(response.Key)
log.Printf("deleted mount %s", mountName) log.Printf("deleted mount %s", mountName)
w.config.delMount(mountName) w.config.delMount(mountName)
case "SET": case "SET":
log.Printf("update to mount %s: %+v", mountName, response) w.updateConfigWithResponse(response)
var m radioai.Mount
if err := json.NewDecoder(strings.NewReader(response.Value)).Decode(&m); err != nil {
log.Printf("corrupted data: %s", err)
continue
} else {
w.config.setMount(&m)
}
default: default:
continue continue
} }
...@@ -126,20 +124,31 @@ func (w *ConfigSyncer) syncer() { ...@@ -126,20 +124,31 @@ func (w *ConfigSyncer) syncer() {
} }
} }
func (w *ConfigSyncer) updateConfigWithResponse(response *etcd.Response) {
mountName := keyToMount(response.Key)
log.Printf("updating mount %s: %+v", mountName, response)
var m radioai.Mount
if err := json.NewDecoder(strings.NewReader(response.Value)).Decode(&m); err != nil {
log.Printf("corrupted data: %s: %s", response.Value, err)
} else {
w.config.setMount(&m)
}
}
// Run the ConfigSyncer in the background. It will wait for // Run the ConfigSyncer in the background. It will wait for
// initialization to complete, so that when this function returns, the // initialization to complete, so that when this function returns, the
// in-memory configuration has already been fully synchronized. // in-memory configuration has already been fully synchronized.
func (w *ConfigSyncer) Run() { func (w *ConfigSyncer) Run() {
go w.syncer()
// Run until the first successful Get(). // Run until the first successful Get().
log.Printf("attempting to retrieve initial config...")
for { for {
responses, err := w.client.Get(radioai.MountPrefix) responses, err := w.client.Get(radioai.MountPrefix)
log.Printf("Get(): %+v", responses)
if err == nil { if err == nil {
// Inject all the replies into the channel. // Directly update the configuration.
for _, r := range responses { for _, r := range responses {
w.updateConfigWithResponse(r)
w.index = r.Index w.index = r.Index
w.rch <- r
} }
break break
} }
...@@ -154,9 +163,11 @@ func (w *ConfigSyncer) Run() { ...@@ -154,9 +163,11 @@ func (w *ConfigSyncer) Run() {
} }
// Update the icecast daemon now that we have a full config. // Update the icecast daemon now that we have a full config.
log.Printf("triggering initial reload")
trigger(w.upch) trigger(w.upch)
// Now start the watcher. // Now start the watcher, and the syncer.
go w.syncer()
go func() { go func() {
for { for {
curIndex := w.index + 1 curIndex := w.index + 1
...@@ -173,7 +184,7 @@ func (w *ConfigSyncer) Run() { ...@@ -173,7 +184,7 @@ func (w *ConfigSyncer) Run() {
// An active streaming node, managing the local icecast server. // An active streaming node, managing the local icecast server.
type RadioNode struct { type RadioNode struct {
Config *ClusterConfig Config *ClusterConfig
ip string ip string
client *etcd.Client client *etcd.Client
...@@ -206,7 +217,7 @@ func NewRadioNode(ip string, client *etcd.Client) *RadioNode { ...@@ -206,7 +217,7 @@ func NewRadioNode(ip string, client *etcd.Client) *RadioNode {
stopch := make(chan bool) stopch := make(chan bool)
return &RadioNode{ return &RadioNode{
Config: config, Config: config,
ip: ip, ip: ip,
client: client, client: client,
me: masterelection.NewMasterElection( me: masterelection.NewMasterElection(
...@@ -227,12 +238,12 @@ func NewRadioNode(ip string, client *etcd.Client) *RadioNode { ...@@ -227,12 +238,12 @@ func NewRadioNode(ip string, client *etcd.Client) *RadioNode {
// The presence goroutine continuously updates our entry in the list // The presence goroutine continuously updates our entry in the list
// of nodes. // of nodes.
func (rc *RadioNode) presence() { func (rc *RadioNode) presence() {
ticker := time.NewTicker(time.Duration(rc.livenessTtl / 2) * time.Second) ticker := time.NewTicker(time.Duration(rc.livenessTtl/2) * time.Second)
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
if _, err := rc.client.Set(radioai.NodePrefix + rc.ip, rc.ip, rc.livenessTtl); err != nil { if _, err := rc.client.Set(radioai.NodePrefix+rc.ip, rc.ip, rc.livenessTtl); err != nil {
log.Printf("presence: Set(): %s", err) log.Printf("presence: Set(): %s", err)
} }
case <-rc.stop: case <-rc.stop:
...@@ -258,7 +269,7 @@ func (rc *RadioNode) Run() { ...@@ -258,7 +269,7 @@ func (rc *RadioNode) Run() {
for { for {
select { select {
case <-rc.upch: case <-rc.upch:
if err := rc.icecast.Update(rc.Config, rc.me.IsMaster(), rc.me.MasterAddr); err != nil { if err := rc.icecast.Update(rc.Config, rc.me.IsMaster(), rc.me.GetMasterAddr()); err != nil {
log.Printf("Update(): %s", err) log.Printf("Update(): %s", err)
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment