Skip to content
Snippets Groups Projects
Commit a0109ddb authored by ale's avatar ale
Browse files

Merge branch 'master' into debian

parents 3118910d f1c2310a
Branches
No related tags found
No related merge requests found
...@@ -118,10 +118,7 @@ func createMount(args []string) { ...@@ -118,10 +118,7 @@ func createMount(args []string) {
// Check if the mount already exists. // Check if the mount already exists.
client := getClient() client := getClient()
oldm, err := client.GetMount(path) oldm, _ := client.GetMount(path)
if err != nil {
log.Fatal(err)
}
if oldm != nil { if oldm != nil {
log.Fatal("A mount with that name already exists!") log.Fatal("A mount with that name already exists!")
} }
......
...@@ -11,7 +11,7 @@ import ( ...@@ -11,7 +11,7 @@ import (
) )
var ( var (
etcdMachines = flag.String("etcd-servers", "localhost:7800", "Etcd servers (comma-separated list)") etcdMachines = flag.String("etcd-servers", "localhost:4001", "Etcd servers (comma-separated list)")
etcdCertFile = flag.String("etcd-cert", "", "SSL certificate for etcd client") etcdCertFile = flag.String("etcd-cert", "", "SSL certificate for etcd client")
etcdKeyFile = flag.String("etcd-key", "", "SSL private key for etcd client") etcdKeyFile = flag.String("etcd-key", "", "SSL private key for etcd client")
) )
...@@ -24,7 +24,8 @@ func loadFile(path string) string { ...@@ -24,7 +24,8 @@ func loadFile(path string) string {
return string(data) return string(data)
} }
func resolveAll(input []string) []string { // Resolve a list of etcd host:port specs, returning URLs.
func resolveAll(input []string, proto string) []string {
result := make([]string, 0) result := make([]string, 0)
for _, hostport := range input { for _, hostport := range input {
host, port, err := net.SplitHostPort(hostport) host, port, err := net.SplitHostPort(hostport)
...@@ -36,19 +37,27 @@ func resolveAll(input []string) []string { ...@@ -36,19 +37,27 @@ func resolveAll(input []string) []string {
log.Fatal("Error resolving etcd server spec '%s': %s", hostport, err) log.Fatal("Error resolving etcd server spec '%s': %s", hostport, err)
} }
for _, a := range addrs { for _, a := range addrs {
result = append(result, net.JoinHostPort(a, port)) url := proto + "://" + net.JoinHostPort(a, port)
result = append(result, url)
} }
} }
return result return result
} }
func NewEtcdClient() *etcd.Client { func NewEtcdClient() *etcd.Client {
machines := resolveAll(strings.Split(*etcdMachines, ",")) proto := "http"
if *etcdCertFile != "" && *etcdKeyFile != "" {
proto = "https"
}
machines := resolveAll(strings.Split(*etcdMachines, ","), proto)
if len(machines) == 0 { if len(machines) == 0 {
log.Fatal("No etcd servers specified!") log.Fatal("No etcd servers specified!")
} }
log.Printf("etcd servers: %+v", machines)
c := etcd.NewClient(machines) c := etcd.NewClient(machines)
if *etcdCertFile != "" && *etcdKeyFile != "" { if proto == "https" {
c.SetScheme(etcd.HTTPS) c.SetScheme(etcd.HTTPS)
if _, err := c.SetCertAndKey(loadFile(*etcdCertFile), loadFile(*etcdKeyFile)); err != nil { if _, err := c.SetCertAndKey(loadFile(*etcdCertFile), loadFile(*etcdKeyFile)); err != nil {
log.Fatal("Error setting up SSL for etcd client: %s", err) log.Fatal("Error setting up SSL for etcd client: %s", err)
......
...@@ -72,6 +72,7 @@ type ConfigSyncer struct { ...@@ -72,6 +72,7 @@ type ConfigSyncer struct {
rch chan *etcd.Response rch chan *etcd.Response
upch chan bool upch chan bool
stop chan bool stop chan bool
index uint64
} }
func NewConfigSyncer(client *etcd.Client, config *ClusterConfig, upch chan bool, stop chan bool) *ConfigSyncer { func NewConfigSyncer(client *etcd.Client, config *ClusterConfig, upch chan bool, stop chan bool) *ConfigSyncer {
...@@ -92,18 +93,24 @@ func (w *ConfigSyncer) syncer() { ...@@ -92,18 +93,24 @@ func (w *ConfigSyncer) syncer() {
// the path, but keep the leading slash. // the path, but keep the leading slash.
mountName := response.Key[len(mountPrefix)-1:] mountName := response.Key[len(mountPrefix)-1:]
if response.Action == "DELETE" { switch response.Action {
log.Printf("deleted mount '%s'", mountName) case "DELETE":
log.Printf("deleted mount %s", mountName)
w.config.delMount(mountName) w.config.delMount(mountName)
} else { case "SET":
log.Printf("update to mount '%s'", mountName) log.Printf("update to mount %s: %+v", mountName, response)
var m Mount var m Mount
if err := json.NewDecoder(strings.NewReader(response.Value)).Decode(&m); err != nil { if err := json.NewDecoder(strings.NewReader(response.Value)).Decode(&m); err != nil {
log.Printf("corrupted data: %s", err) log.Printf("corrupted data: %s", err)
continue
} else { } else {
w.config.setMount(&m) w.config.setMount(&m)
} }
default:
continue
} }
w.index = response.Index
trigger(w.upch) trigger(w.upch)
case <-w.stop: case <-w.stop:
...@@ -119,26 +126,42 @@ func (w *ConfigSyncer) Run() { ...@@ -119,26 +126,42 @@ func (w *ConfigSyncer) Run() {
go w.syncer() go w.syncer()
// Run until the first successful Get(). // Run until the first successful Get().
var index uint64
for { for {
responses, err := w.client.Get(mountPrefix) responses, err := w.client.Get(mountPrefix)
if err == nil { if err == nil {
// Inject all the replies into the channel. // Inject all the replies into the channel.
for _, r := range responses { for _, r := range responses {
index = r.Index w.index = r.Index
w.rch <- r w.rch <- r
} }
break break
} }
log.Printf("Get error: %s", err) log.Printf("Get error: %s", err)
time.Sleep(1 * time.Second)
// Wait 1 second, but watch the stop channel.
select {
case <-time.After(1 * time.Second):
case <-w.stop:
return
}
} }
// Update the icecast daemon now that we have a full config.
trigger(w.upch)
// Now start the watcher. // Now start the watcher.
_, err := w.client.Watch(mountPrefix, index, w.rch, nil) go func() {
if err != nil { for {
log.Printf("Watch error: %s", err) curIndex := w.index + 1
} log.Printf("starting watcher at index %d", curIndex)
_, err := w.client.Watch(mountPrefix, curIndex, w.rch, w.stop)
if err == etcd.ErrWatchStoppedByUser {
return
} else if err != nil {
log.Printf("Watch(): %s", err)
}
}
}()
} }
// An active streaming node, managing the local icecast server. // An active streaming node, managing the local icecast server.
...@@ -203,7 +226,7 @@ func (rc *RadioNode) presence() { ...@@ -203,7 +226,7 @@ func (rc *RadioNode) presence() {
select { select {
case <-ticker.C: case <-ticker.C:
if _, err := rc.client.Set(nodePrefix + rc.ip, rc.ip, rc.livenessTtl); err != nil { if _, err := rc.client.Set(nodePrefix + rc.ip, rc.ip, rc.livenessTtl); err != nil {
log.Printf("Set() error: %s", err) log.Printf("presence: Set(): %s", err)
} }
case <-rc.stop: case <-rc.stop:
return return
...@@ -221,11 +244,12 @@ func (rc *RadioNode) Run() { ...@@ -221,11 +244,12 @@ func (rc *RadioNode) Run() {
// Start the presence heartbeat. // Start the presence heartbeat.
go rc.presence() go rc.presence()
log.Printf("starting icecast updater")
for { for {
select { select {
case <-rc.upch: case <-rc.upch:
if err := rc.icecast.Update(rc.Config, rc.me.IsMaster(), rc.me.MasterAddr); err != nil { if err := rc.icecast.Update(rc.Config, rc.me.IsMaster(), rc.me.MasterAddr); err != nil {
log.Printf("Update() failed: %s", err) log.Printf("Update(): %s", err)
} }
// Do not reload icecast more often than once // Do not reload icecast more often than once
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment