diff --git a/cmd/radioctl/radioctl.go b/cmd/radioctl/radioctl.go index a7eb83a093bcb68ccd754bef85cf38a81f1e5910..c0310824c4ed3687663bc0840615de54f23fcd4b 100644 --- a/cmd/radioctl/radioctl.go +++ b/cmd/radioctl/radioctl.go @@ -118,10 +118,7 @@ func createMount(args []string) { // Check if the mount already exists. client := getClient() - oldm, err := client.GetMount(path) - if err != nil { - log.Fatal(err) - } + oldm, _ := client.GetMount(path) if oldm != nil { log.Fatal("A mount with that name already exists!") } diff --git a/etcd_client.go b/etcd_client.go index 31a4b04776aad49322ff1dd4c899a781b887f661..a6251e7c8edec1def38f9ad4d75f87c350bc8380 100644 --- a/etcd_client.go +++ b/etcd_client.go @@ -11,7 +11,7 @@ import ( ) var ( - etcdMachines = flag.String("etcd-servers", "localhost:7800", "Etcd servers (comma-separated list)") + etcdMachines = flag.String("etcd-servers", "localhost:4001", "Etcd servers (comma-separated list)") etcdCertFile = flag.String("etcd-cert", "", "SSL certificate for etcd client") etcdKeyFile = flag.String("etcd-key", "", "SSL private key for etcd client") ) @@ -24,7 +24,8 @@ func loadFile(path string) string { return string(data) } -func resolveAll(input []string) []string { +// Resolve a list of etcd host:port specs, returning URLs. +func resolveAll(input []string, proto string) []string { result := make([]string, 0) for _, hostport := range input { host, port, err := net.SplitHostPort(hostport) @@ -36,19 +37,27 @@ func resolveAll(input []string) []string { log.Fatal("Error resolving etcd server spec '%s': %s", hostport, err) } for _, a := range addrs { - result = append(result, net.JoinHostPort(a, port)) + url := proto + "://" + net.JoinHostPort(a, port) + result = append(result, url) } } return result } func NewEtcdClient() *etcd.Client { - machines := resolveAll(strings.Split(*etcdMachines, ",")) + proto := "http" + if *etcdCertFile != "" && *etcdKeyFile != "" { + proto = "https" + } + + machines := resolveAll(strings.Split(*etcdMachines, ","), proto) if len(machines) == 0 { log.Fatal("No etcd servers specified!") } + log.Printf("etcd servers: %+v", machines) + c := etcd.NewClient(machines) - if *etcdCertFile != "" && *etcdKeyFile != "" { + if proto == "https" { c.SetScheme(etcd.HTTPS) if _, err := c.SetCertAndKey(loadFile(*etcdCertFile), loadFile(*etcdKeyFile)); err != nil { log.Fatal("Error setting up SSL for etcd client: %s", err) diff --git a/node.go b/node.go index d8d391e1907f896ec512e89d51e3f4041120a913..9bfbda122f8475803d08ed00382a7c202cbb0110 100644 --- a/node.go +++ b/node.go @@ -72,6 +72,7 @@ type ConfigSyncer struct { rch chan *etcd.Response upch chan bool stop chan bool + index uint64 } func NewConfigSyncer(client *etcd.Client, config *ClusterConfig, upch chan bool, stop chan bool) *ConfigSyncer { @@ -92,18 +93,24 @@ func (w *ConfigSyncer) syncer() { // the path, but keep the leading slash. mountName := response.Key[len(mountPrefix)-1:] - if response.Action == "DELETE" { - log.Printf("deleted mount '%s'", mountName) + switch response.Action { + case "DELETE": + log.Printf("deleted mount %s", mountName) w.config.delMount(mountName) - } else { - log.Printf("update to mount '%s'", mountName) + case "SET": + log.Printf("update to mount %s: %+v", mountName, response) var m Mount if err := json.NewDecoder(strings.NewReader(response.Value)).Decode(&m); err != nil { log.Printf("corrupted data: %s", err) + continue } else { w.config.setMount(&m) } + default: + continue } + + w.index = response.Index trigger(w.upch) case <-w.stop: @@ -119,26 +126,42 @@ func (w *ConfigSyncer) Run() { go w.syncer() // Run until the first successful Get(). - var index uint64 for { responses, err := w.client.Get(mountPrefix) if err == nil { // Inject all the replies into the channel. for _, r := range responses { - index = r.Index + w.index = r.Index w.rch <- r } break } log.Printf("Get error: %s", err) - time.Sleep(1 * time.Second) + + // Wait 1 second, but watch the stop channel. + select { + case <-time.After(1 * time.Second): + case <-w.stop: + return + } } + // Update the icecast daemon now that we have a full config. + trigger(w.upch) + // Now start the watcher. - _, err := w.client.Watch(mountPrefix, index, w.rch, nil) - if err != nil { - log.Printf("Watch error: %s", err) - } + go func() { + for { + curIndex := w.index + 1 + log.Printf("starting watcher at index %d", curIndex) + _, err := w.client.Watch(mountPrefix, curIndex, w.rch, w.stop) + if err == etcd.ErrWatchStoppedByUser { + return + } else if err != nil { + log.Printf("Watch(): %s", err) + } + } + }() } // An active streaming node, managing the local icecast server. @@ -203,7 +226,7 @@ func (rc *RadioNode) presence() { select { case <-ticker.C: if _, err := rc.client.Set(nodePrefix + rc.ip, rc.ip, rc.livenessTtl); err != nil { - log.Printf("Set() error: %s", err) + log.Printf("presence: Set(): %s", err) } case <-rc.stop: return @@ -221,11 +244,12 @@ func (rc *RadioNode) Run() { // Start the presence heartbeat. go rc.presence() + log.Printf("starting icecast updater") for { select { case <-rc.upch: if err := rc.icecast.Update(rc.Config, rc.me.IsMaster(), rc.me.MasterAddr); err != nil { - log.Printf("Update() failed: %s", err) + log.Printf("Update(): %s", err) } // Do not reload icecast more often than once