diff --git a/api.go b/api.go index c5661075e1ee7c2ec05259c3ac2c967309c963ff..ce3e184927e2d35e985dd8d1d38f5081b4f987aa 100644 --- a/api.go +++ b/api.go @@ -14,9 +14,9 @@ import ( ) var ( - masterElectionPath = "/icecast/cluster/master" - mountPrefix = "/icecast/mounts/" - nodePrefix = "/icecast/nodes/" + MasterElectionPath = "/icecast/cluster/master" + MountPrefix = "/icecast/mounts/" + NodePrefix = "/icecast/nodes/" ) // A mountpoint for a stream. @@ -35,7 +35,7 @@ type Mount struct { } func mountPath(mountName string) string { - return mountPrefix + mountName[1:] + return MountPrefix + mountName[1:] } // Cache the list of active nodes. @@ -121,7 +121,7 @@ func (r *RadioAPI) DelMount(mountName string) error { // ListMounts returns a list of all the configured mountpoints. func (r *RadioAPI) ListMounts() ([]*Mount, error) { - response, err := r.client.Get(mountPrefix) + response, err := r.client.Get(MountPrefix) if err != nil { return nil, err } @@ -138,7 +138,7 @@ func (r *RadioAPI) ListMounts() ([]*Mount, error) { // GetMasterAddr returns the address of the current master server. func (r *RadioAPI) GetMasterAddr() (string, error) { - response, err := r.client.Get(masterElectionPath) + response, err := r.client.Get(MasterElectionPath) if err != nil { return "", err } @@ -150,7 +150,7 @@ func (r *RadioAPI) GetMasterAddr() (string, error) { // GetNodes returns the list of active cluster nodes. func (r *RadioAPI) doGetNodes() ([]string, error) { - response, err := r.client.Get(nodePrefix) + response, err := r.client.Get(NodePrefix) if err != nil { return nil, err } diff --git a/cmd/radiod/radiod.go b/cmd/radiod/radiod.go index 13d2c1cd6d441f4391b8aad6c96e22f86d6c44d3..30dc0b2fea24df3ea0372031fbff9279c7e63f48 100644 --- a/cmd/radiod/radiod.go +++ b/cmd/radiod/radiod.go @@ -8,6 +8,7 @@ import ( "syscall" "git.autistici.org/ale/radioai" + "git.autistici.org/ale/radioai/node" ) var ( @@ -19,16 +20,16 @@ func main() { flag.Parse() client := radioai.NewEtcdClient() - node := radioai.NewRadioNode(*publicIp, client) + n := node.NewRadioNode(*publicIp, client) // Set up a clean shutdown function on SIGTERM. stopch := make(chan os.Signal) go func() { <- stopch log.Printf("terminating...") - node.Stop() + n.Stop() }() signal.Notify(stopch, syscall.SIGTERM, syscall.SIGINT) - node.Run() -} \ No newline at end of file + n.Run() +} diff --git a/cmd/redirectord/redirectord.go b/cmd/redirectord/redirectord.go index f910ff89a9668a3e07cfff2cedf888660baf3829..ccec7671798eba0a197a2c2e5723be439d41f5b6 100644 --- a/cmd/redirectord/redirectord.go +++ b/cmd/redirectord/redirectord.go @@ -4,10 +4,9 @@ import ( "flag" "fmt" "log" - "net/http" - "time" "git.autistici.org/ale/radioai" + "git.autistici.org/ale/radioai/fe" ) var ( @@ -16,6 +15,9 @@ var ( httpPort = flag.Int("http-port", 80, "HTTP port") publicIp = flag.String("ip", "127.0.0.1", "Public IP for this machine") + staticDir = flag.String("static-dir", "./static", "Static content directory") + templateDir = flag.String("template-dir", "./templates", "HTML templates directory") + // Default DNS TTL (seconds). dnsTtl = 5 ) @@ -29,18 +31,10 @@ func main() { client := radioai.NewEtcdClient() api := radioai.NewRadioAPI(client) - red := radioai.NewHttpRedirector(api) - dnsRed := radioai.NewDnsRedirector(api, *domain, *publicIp, dnsTtl) + dnsRed := fe.NewDnsRedirector(api, *domain, *publicIp, dnsTtl) dnsRed.Run(fmt.Sprintf(":%d", *dnsPort)) - httpServer := &http.Server{ - Addr: fmt.Sprintf(":%d", *httpPort), - Handler: red, - ReadTimeout: 10 * time.Second, - WriteTimeout: 10 * time.Second, - } - - log.Printf("starting HTTP server on %s/tcp", httpServer.Addr) - log.Fatal(httpServer.ListenAndServe()) + red := fe.NewHttpRedirector(api, *domain) + red.Run(fmt.Sprintf(":%d", *httpPort), *staticDir, *templateDir) } diff --git a/dns.go b/fe/dns.go similarity index 95% rename from dns.go rename to fe/dns.go index 84c655adddd78f7c4c2cac06b855a114a6e20e1d..e52fa49c74564fe8be013ab7b27daf5bfbd9479d 100644 --- a/dns.go +++ b/fe/dns.go @@ -1,4 +1,4 @@ -package radioai +package fe import ( "fmt" @@ -9,6 +9,7 @@ import ( "time" "github.com/miekg/dns" + "git.autistici.org/ale/radioai" ) var ( @@ -26,7 +27,7 @@ var ( ) type DnsRedirector struct { - client *RadioAPI + client *radioai.RadioAPI origin string originNumParts int publicIp string @@ -34,7 +35,7 @@ type DnsRedirector struct { soa dns.RR } -func NewDnsRedirector(client *RadioAPI, origin, publicIp string, ttl int) *DnsRedirector { +func NewDnsRedirector(client *radioai.RadioAPI, origin, publicIp string, ttl int) *DnsRedirector { if !strings.HasSuffix(origin, ".") { origin += "." } diff --git a/fe/gzip.go b/fe/gzip.go new file mode 100644 index 0000000000000000000000000000000000000000..9513dbb50d05a92cc5d8be72a9a4d3a03e4f7b4a --- /dev/null +++ b/fe/gzip.go @@ -0,0 +1,205 @@ +package fe + +import ( + "compress/gzip" + "io" + "net/http" + "strings" +) + +// Slightly modified by ale@incal.net, based on: +// https://github.com/PuerkitoBio/ghost + +// Thanks to Andrew Gerrand for inspiration: +// https://groups.google.com/d/msg/golang-nuts/eVnTcMwNVjM/4vYU8id9Q2UJ +// +// Also, node's Connect library implementation of the compress middleware: +// https://github.com/senchalabs/connect/blob/master/lib/middleware/compress.js +// +// And StackOverflow's explanation of Vary: Accept-Encoding header: +// http://stackoverflow.com/questions/7848796/what-does-varyaccept-encoding-mean + +// Internal gzipped writer that satisfies both the (body) writer in gzipped format, +// and maintains the rest of the ResponseWriter interface for header manipulation. +type gzipResponseWriter struct { + io.Writer + http.ResponseWriter + r *http.Request // Keep a hold of the Request, for the filter function + filtered bool // Has the request been run through the filter function? + dogzip bool // Should we do GZIP compression for this request? + filterFn func(http.ResponseWriter, *http.Request) bool +} + +// Make sure the filter function is applied. +func (w *gzipResponseWriter) applyFilter() { + if !w.filtered { + if w.dogzip = w.filterFn(w, w.r); w.dogzip { + setGzipHeaders(w.Header()) + } + w.filtered = true + } +} + +// Unambiguous Write() implementation (otherwise both ResponseWriter and Writer +// want to claim this method). +func (w *gzipResponseWriter) Write(b []byte) (int, error) { + w.applyFilter() + if w.dogzip { + // Write compressed + return w.Writer.Write(b) + } + // Write uncompressed + return w.ResponseWriter.Write(b) +} + +// Intercept the WriteHeader call to correctly set the GZIP headers. +func (w *gzipResponseWriter) WriteHeader(code int) { + w.applyFilter() + w.ResponseWriter.WriteHeader(code) +} + +// Implement WrapWriter interface +func (w *gzipResponseWriter) WrappedWriter() http.ResponseWriter { + return w.ResponseWriter +} + +var ( + defaultFilterTypes = [...]string{ + "text/", + "javascript", + "json", + } +) + +// Default filter to check if the response should be GZIPped. +// By default, all text (html, css, xml, ...), javascript and json +// content types are candidates for GZIP. +func defaultFilter(w http.ResponseWriter, r *http.Request) bool { + hdr := w.Header() + for _, tp := range defaultFilterTypes { + ok := headerMatch(hdr, "Content-Type", tp) + if ok { + return true + } + } + return false +} + +// GZIPHandlerFunc is the same as GZIPHandler, it is just a convenience +// signature that accepts a func(http.ResponseWriter, *http.Request) instead of +// a http.Handler interface. It saves the boilerplate http.HandlerFunc() cast. +func GZIPHandlerFunc(h http.HandlerFunc, filterFn func(http.ResponseWriter, *http.Request) bool) http.HandlerFunc { + return GZIPHandler(h, filterFn) +} + +// Gzip compression HTTP handler. If the client supports it, it compresses the response +// written by the wrapped handler. The filter function is called when the response is about +// to be written to determine if compression should be applied. If this argument is nil, +// the default filter will GZIP only content types containing /json|text|javascript/. +func GZIPHandler(h http.Handler, filterFn func(http.ResponseWriter, *http.Request) bool) http.HandlerFunc { + if filterFn == nil { + filterFn = defaultFilter + } + return func(w http.ResponseWriter, r *http.Request) { + if _, ok := getGzipWriter(w); ok { + // Self-awareness, gzip handler is already set up + h.ServeHTTP(w, r) + return + } + hdr := w.Header() + setVaryHeader(hdr) + + // Do nothing on a HEAD request + if r.Method == "HEAD" { + h.ServeHTTP(w, r) + return + } + if !acceptsGzip(r.Header) { + // No gzip support from the client, return uncompressed + h.ServeHTTP(w, r) + return + } + + // Prepare a gzip response container + gz := gzip.NewWriter(w) + gzw := &gzipResponseWriter{ + Writer: gz, + ResponseWriter: w, + r: r, + filterFn: filterFn, + } + h.ServeHTTP(gzw, r) + // Iff the handler completed successfully (no panic) and GZIP was indeed used, close the gzip writer, + // which seems to generate a Write to the underlying writer. + if gzw.dogzip { + gz.Close() + } + } +} + +// Add the vary by "accept-encoding" header if it is not already set. +func setVaryHeader(hdr http.Header) { + if !headerMatch(hdr, "Vary", "accept-encoding") { + hdr.Add("Vary", "Accept-Encoding") + } +} + +// Checks if the client accepts GZIP-encoded responses. +func acceptsGzip(hdr http.Header) bool { + ok := headerMatch(hdr, "Accept-Encoding", "gzip") + if !ok { + ok = headerEquals(hdr, "Accept-Encoding", "*") + } + return ok +} + +func setGzipHeaders(hdr http.Header) { + // The content-type will be explicitly set somewhere down the path of handlers + hdr.Set("Content-Encoding", "gzip") + hdr.Del("Content-Length") +} + +// Helper function to retrieve the gzip writer. +func getGzipWriter(w http.ResponseWriter) (*gzipResponseWriter, bool) { + gz, ok := GetResponseWriter(w, func(tst http.ResponseWriter) bool { + _, ok := tst.(*gzipResponseWriter) + return ok + }) + if ok { + return gz.(*gzipResponseWriter), true + } + return nil, false +} + +func headerMatch(hdr http.Header, name, s string) bool { + return strings.Contains(hdr.Get(name), s) +} + +func headerEquals(hdr http.Header, name, s string) bool { + return hdr.Get(name) == s +} + +// This interface can be implemented by an augmented ResponseWriter, so that +// it doesn't hide other augmented writers in the chain. +type WrapWriter interface { + http.ResponseWriter + WrappedWriter() http.ResponseWriter +} + +// Helper function to retrieve a specific ResponseWriter. +func GetResponseWriter(w http.ResponseWriter, + predicate func(http.ResponseWriter) bool) (http.ResponseWriter, bool) { + + for { + // Check if this writer is the one we're looking for + if w != nil && predicate(w) { + return w, true + } + // If it is a WrapWriter, move back the chain of wrapped writers + ww, ok := w.(WrapWriter) + if !ok { + return nil, false + } + w = ww.WrappedWriter() + } +} diff --git a/http.go b/fe/http.go similarity index 60% rename from http.go rename to fe/http.go index a2986e7e065a330414acb6f8c590f33e46ceb3f9..cc72036dfa46b79d444dbe3731a23c4e916257ba 100644 --- a/http.go +++ b/fe/http.go @@ -1,14 +1,20 @@ -package radioai +package fe import ( + "bytes" "fmt" + "html/template" "io" + "log" "math/rand" "net/http" "net/http/httputil" + "path/filepath" "strconv" "strings" "time" + + "git.autistici.org/ale/radioai" ) // HTTP redirector. @@ -21,12 +27,15 @@ import ( // a .m3u file directly pointing at the relays. // type HttpRedirector struct { - client *RadioAPI + domain string + client *radioai.RadioAPI + template *template.Template } -func NewHttpRedirector(client *RadioAPI) *HttpRedirector { +func NewHttpRedirector(client *radioai.RadioAPI, domain string) *HttpRedirector { return &HttpRedirector{ client: client, + domain: domain, } } @@ -40,7 +49,7 @@ func (h *HttpRedirector) pickActiveNode() string { } // Parse the request and extract the mount path. -func (h *HttpRedirector) getMount(r *http.Request) (*Mount, error) { +func (h *HttpRedirector) getMount(r *http.Request) (*radioai.Mount, error) { path := r.URL.Path if strings.HasSuffix(path, ".m3u") { path = path[:len(path)-4] @@ -95,10 +104,55 @@ func (h *HttpRedirector) serveSource(w http.ResponseWriter, r *http.Request) { proxy.ServeHTTP(w, r) } +func (h *HttpRedirector) serveStatusPage(w http.ResponseWriter, r *http.Request) { + nodes, _ := h.client.GetNodes() + mounts, _ := h.client.ListMounts() + ctx := struct { + Domain string + Nodes []string + Mounts []*radioai.Mount + }{h.domain, nodes, mounts} + + var buf bytes.Buffer + if err := h.template.ExecuteTemplate(&buf, "index.html", ctx); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.Write(buf.Bytes()) +} + func (h *HttpRedirector) ServeHTTP(w http.ResponseWriter, r *http.Request) { - if r.Method == "SOURCE" { + if r.URL.Path == "" || r.URL.Path == "/" { + h.serveStatusPage(w, r) + } else if r.Method == "SOURCE" { h.serveSource(w, r) } else { h.serveRelay(w, r) } } + +func (h *HttpRedirector) Run(addr, staticDir, templateDir string) { + h.template = template.Must( + template.ParseGlob( + filepath.Join(templateDir, "*.html"))) + + mux := http.NewServeMux() + mux.HandleFunc( + "/static/", + GZIPHandler( + http.StripPrefix( + "/static/", + http.FileServer(http.Dir(staticDir))), + nil)) + mux.Handle("/", h) + + httpServer := &http.Server{ + Addr: addr, + Handler: mux, + ReadTimeout: 10 * time.Second, + WriteTimeout: 10 * time.Second, + } + + log.Printf("starting HTTP server on %s/tcp", httpServer.Addr) + log.Fatal(httpServer.ListenAndServe()) +} diff --git a/fe/static/fullbg.js b/fe/static/fullbg.js new file mode 100644 index 0000000000000000000000000000000000000000..f038380330689e829f7d77b3fe2565452144492c --- /dev/null +++ b/fe/static/fullbg.js @@ -0,0 +1,50 @@ +/** + * jQuery.fullBg + * Version 1.0 + * Copyright (c) 2010 c.bavota - http://bavotasan.com + * Dual licensed under MIT and GPL. + * Date: 02/23/2010 +**/ +(function($) { + $.fn.fullBg = function(){ + var bgImg = $(this); + + function resizeImg() { + var imgwidth = bgImg.width(); + var imgheight = bgImg.height(); + + var winwidth = $(window).width(); + var winheight = $(window).height(); + + var widthratio = winwidth / imgwidth; + var heightratio = winheight / imgheight; + + var widthdiff = heightratio * imgwidth; + var heightdiff = widthratio * imgheight; + + var newwidth, newheight; + if(heightdiff>winheight) { + newwidth = winwidth; + newheight = heightdiff; + } else { + newwidth = widthdiff; + newheight = winheight; + } + var xoffset = (newwidth - winwidth) / 2, + yoffset = (newheight - winheight) / 2; + + bgImg.css({ + width: newwidth+'px', + height: newheight+'px', + top: '-'+yoffset+'px', + left: '-'+xoffset+'px' + }); + } + resizeImg(); + $(window).resize(function() { + resizeImg(); + }); + + return this; + }; +})(jQuery) diff --git a/fe/static/radiomast_bw.jpg b/fe/static/radiomast_bw.jpg new file mode 100644 index 0000000000000000000000000000000000000000..d6d52f22f58821f24e350832680a128db004fc43 Binary files /dev/null and b/fe/static/radiomast_bw.jpg differ diff --git a/fe/static/style.css b/fe/static/style.css new file mode 100644 index 0000000000000000000000000000000000000000..6a2dbdaa75cdc874edb07a6e44947492749bb0b7 --- /dev/null +++ b/fe/static/style.css @@ -0,0 +1,74 @@ +/* Space out content a bit */ +body { + padding-top: 80px; + padding-bottom: 20px; +} + +/* Everything gets side spacing for mobile first views */ +.header, +.mainitems, +.footer { + padding-left: 15px; + padding-right: 15px; +} + +/* Custom page header */ +.header { + border-bottom: 1px solid #e5e5e5; +} +/* Make the masthead heading the same height as the navigation */ +.header h3 { + margin-top: 0; + margin-bottom: 0; + line-height: 40px; + padding-bottom: 19px; +} + +/* Custom page footer */ +.footer { + padding-top: 19px; + color: #777; + border-top: 1px solid #e5e5e5; +} + +/* Customize container */ +@media (min-width: 768px) { + .container { + max-width: 730px; + } +} +.container-narrow > hr { + margin: 30px 0; +} + +.mainitems { + margin: 40px 0; +} +.mainitems p + h4 { + margin-top: 28px; +} + +/* Responsive: Portrait tablets and up */ +@media screen and (min-width: 768px) { + /* Remove the padding we set earlier */ + .header, + .mainitems, + .footer { + padding-left: 0; + padding-right: 0; + } + /* Space out the masthead */ + .header { + margin-bottom: 30px; + } +} + +#bgImg { + display: none; + position: fixed; + top: 0; + left: 0; + overflow: hidden; + z-index: -100; + opacity: 0.3; +} \ No newline at end of file diff --git a/fe/templates/index.html b/fe/templates/index.html new file mode 100644 index 0000000000000000000000000000000000000000..973fc678b0da52d51b8e26b9bae649fdb12654f3 --- /dev/null +++ b/fe/templates/index.html @@ -0,0 +1,59 @@ +<!DOCTYPE html> +<html> + <head> + <title>stream.{{.Domain}}</title> + <link rel="stylesheet" + href="//netdna.bootstrapcdn.com/bootstrap/3.0.2/css/bootstrap.min.css"> + <link rel="stylesheet" href="/static/style.css"> + <link rel="shortcut icon" href="/static/favicon.png"> + + <script src="//ajax.googleapis.com/ajax/libs/jquery/2.0.3/jquery.min.js"></script> + <script src="/static/fullbg.js"></script> + </head> + <body> + + <div class="container"> + <div class="page-header"> + <h1>{{.Domain}} + <small> + streaming network + </small> + </h1> + </div> + + <div class="row mainitems"> + <div class="col-lg-6"> + <h4>Streams</h4> + {{$domain := .Domain}} + {{range .Mounts}} + <p> + <a href="http://stream.{{$domain}}{{.Name}}.m3u">{{.Name}}</a> + </p> + {{end}} + </div> + + <div class="col-lg-6"> + <h4>Nodes</h4> + {{range .Nodes}} + <p>{{.}}</p> + {{end}} + </div> + </div> + + <div class="footer"> + powered by + <a href="https://git.autistici.org/public/ale/radioai"> + radioai 0.1 + </a> + </div> + </div> + + <img src="/static/radiomast_bw.jpg" id="bgImg"> + + <script type="text/javascript"> + $(function() { + $('#bgImg').fullBg().show(); + }); + </script> + </body> +</html> diff --git a/icecast.go b/node/icecast.go similarity index 98% rename from icecast.go rename to node/icecast.go index a7fd495ab9929e190c16fa4fbb9ea4eeea2e60bf..843024feba4487bc01e3abb6b8d746aceacd1a57 100644 --- a/icecast.go +++ b/node/icecast.go @@ -1,4 +1,4 @@ -package radioai +package node import ( "os" diff --git a/icecast_config.go b/node/icecast_config.go similarity index 95% rename from icecast_config.go rename to node/icecast_config.go index 776b504f369e3e618484eb2ebd7a5ca6cb9d7358..b01b4aafba6448c48c6c5d06974665ae58123d00 100644 --- a/icecast_config.go +++ b/node/icecast_config.go @@ -1,10 +1,12 @@ -package radioai +package node import ( "bytes" "encoding/xml" "io" "os" + + "git.autistici.org/ale/radioai" ) var ( @@ -96,8 +98,8 @@ type icecastConfig struct { func defaultDebianConfig(publicIp string) *icecastConfig { // Pick some random passwords on startup. We don't use them, // but icecast is happier if they're set. - sourcePw := GeneratePassword() - adminPw := GeneratePassword() + sourcePw := radioai.GeneratePassword() + adminPw := radioai.GeneratePassword() return &icecastConfig{ XMLName: xml.Name{"", "icecast"}, @@ -172,7 +174,7 @@ func (c *icecastConfig) EncodeToFile(path string) error { return err } -func mountToConfig(m *Mount) iceMountConfig { +func mountToConfig(m *radioai.Mount) iceMountConfig { mconfig := iceMountConfig{ Name: m.Name, Username: m.Username, @@ -188,7 +190,7 @@ func mountToConfig(m *Mount) iceMountConfig { return mconfig } -func mountToRelay(masterAddr string, m *Mount) iceRelayConfig { +func mountToRelay(masterAddr string, m *radioai.Mount) iceRelayConfig { return iceRelayConfig{ Mount: m.Name, LocalMount: m.Name, diff --git a/icecast_config_test.go b/node/icecast_config_test.go similarity index 98% rename from icecast_config_test.go rename to node/icecast_config_test.go index 88967483cf30dde6a7f6a8add525767c00cef14d..747332a5875b9cde84cc25581f3724fd61a011f9 100644 --- a/icecast_config_test.go +++ b/node/icecast_config_test.go @@ -1,4 +1,4 @@ -package radioai +package node import ( "strings" diff --git a/node.go b/node/node.go similarity index 89% rename from node.go rename to node/node.go index 36fad457d4bae584f50bd3d1531451dbe4b1afdb..30ec6458e0060515ce697ab2af89e2a34200e716 100644 --- a/node.go +++ b/node/node.go @@ -1,4 +1,4 @@ -package radioai +package node import ( "encoding/json" @@ -7,6 +7,7 @@ import ( "sync" "time" + "git.autistici.org/ale/radioai" "git.autistici.org/ale/radioai/masterelection" "github.com/coreos/go-etcd/etcd" ) @@ -21,28 +22,28 @@ func trigger(c chan bool) { // In-memory representation of the overall configuration (basically // just a list of the known mounts). type ClusterConfig struct { - mounts map[string]*Mount + mounts map[string]*radioai.Mount lock sync.Mutex } func NewClusterConfig() *ClusterConfig { return &ClusterConfig{ - mounts: make(map[string]*Mount), + mounts: make(map[string]*radioai.Mount), } } // TODO: remove? -func (c *ClusterConfig) GetMount(name string) *Mount { +func (c *ClusterConfig) GetMount(name string) *radioai.Mount { c.lock.Lock() defer c.lock.Unlock() return c.mounts[name] } // TODO: remove? -func (c *ClusterConfig) ListMounts() []*Mount { +func (c *ClusterConfig) ListMounts() []*radioai.Mount { c.lock.Lock() defer c.lock.Unlock() - result := make([]*Mount, 0, len(c.mounts)) + result := make([]*radioai.Mount, 0, len(c.mounts)) for _, m := range c.mounts { result = append(result, m) } @@ -50,7 +51,7 @@ func (c *ClusterConfig) ListMounts() []*Mount { } // Update a mount (in-memory only). -func (c *ClusterConfig) setMount(m *Mount) { +func (c *ClusterConfig) setMount(m *radioai.Mount) { c.lock.Lock() defer c.lock.Unlock() c.mounts[m.Name] = m @@ -91,7 +92,7 @@ func (w *ConfigSyncer) syncer() { case response := <-w.rch: // Remove mountPrefix from the beginning of // the path, but keep the leading slash. - mountName := response.Key[len(mountPrefix)-1:] + mountName := response.Key[len(radioai.MountPrefix)-1:] switch response.Action { case "DELETE": @@ -99,7 +100,7 @@ func (w *ConfigSyncer) syncer() { w.config.delMount(mountName) case "SET": log.Printf("update to mount %s: %+v", mountName, response) - var m Mount + var m radioai.Mount if err := json.NewDecoder(strings.NewReader(response.Value)).Decode(&m); err != nil { log.Printf("corrupted data: %s", err) continue @@ -133,7 +134,7 @@ func (w *ConfigSyncer) Run() { // Run until the first successful Get(). for { - responses, err := w.client.Get(mountPrefix) + responses, err := w.client.Get(radioai.MountPrefix) if err == nil { // Inject all the replies into the channel. for _, r := range responses { @@ -160,7 +161,7 @@ func (w *ConfigSyncer) Run() { for { curIndex := w.index + 1 log.Printf("starting watcher at index %d", curIndex) - _, err := w.client.Watch(mountPrefix, curIndex, w.rch, w.stop) + _, err := w.client.Watch(radioai.MountPrefix, curIndex, w.rch, w.stop) if err == etcd.ErrWatchStoppedByUser { return } else if err != nil { @@ -210,7 +211,7 @@ func NewRadioNode(ip string, client *etcd.Client) *RadioNode { client: client, me: masterelection.NewMasterElection( client, - masterElectionPath, + radioai.MasterElectionPath, ip, 5, mech, @@ -231,7 +232,7 @@ func (rc *RadioNode) presence() { for { select { case <-ticker.C: - if _, err := rc.client.Set(nodePrefix + rc.ip, rc.ip, rc.livenessTtl); err != nil { + if _, err := rc.client.Set(radioai.NodePrefix + rc.ip, rc.ip, rc.livenessTtl); err != nil { log.Printf("presence: Set(): %s", err) } case <-rc.stop: