diff --git a/api.go b/api.go index 7b7cbe9063e936664be4f717cffe6c9e5cb166bc..2d46f946946df140d02763348c0d5eb015f9e3da 100644 --- a/api.go +++ b/api.go @@ -138,19 +138,19 @@ func (nc *nodesCache) Get(fn getNodesFunc) ([]*NodeStatus, error) { return nc.nodes, err } -// RadioAPI is the actual API to the streaming cluster's database. -type RadioAPI struct { +// Client is the actual API to the streaming cluster's database. +type Client struct { client EtcdClient activeNodesCache *nodesCache } -func NewRadioAPI(client EtcdClient) *RadioAPI { - return &RadioAPI{client, newNodesCache()} +func NewClient(client EtcdClient) *Client { + return &Client{client, newNodesCache()} } // GetMount returns data on a specific mountpoint (returns nil if not // found). -func (r *RadioAPI) GetMount(mountName string) (*Mount, error) { +func (r *Client) GetMount(mountName string) (*Mount, error) { response, err := r.client.Get(mountEtcdPath(mountName), false, false) if err != nil || response.Node == nil { return nil, err @@ -167,7 +167,7 @@ func (r *RadioAPI) GetMount(mountName string) (*Mount, error) { } // SetMount creates or updates a mountpoint. -func (r *RadioAPI) SetMount(m *Mount) error { +func (r *Client) SetMount(m *Mount) error { var buf bytes.Buffer if err := json.NewEncoder(&buf).Encode(m); err != nil { return err @@ -178,13 +178,13 @@ func (r *RadioAPI) SetMount(m *Mount) error { } // DelMount removes a mountpoint. -func (r *RadioAPI) DelMount(mountName string) error { +func (r *Client) DelMount(mountName string) error { _, err := r.client.Delete(mountEtcdPath(mountName), false) return err } // ListMounts returns a list of all the configured mountpoints. -func (r *RadioAPI) ListMounts() ([]*Mount, error) { +func (r *Client) ListMounts() ([]*Mount, error) { response, err := r.client.Get(MountPrefix, true, false) if err != nil || response.Node == nil { return nil, err @@ -216,7 +216,7 @@ type MasterNodeInfo struct { } // GetMasterAddr returns the address of the current master server. -func (r *RadioAPI) GetMasterInfo() (*MasterNodeInfo, error) { +func (r *Client) GetMasterInfo() (*MasterNodeInfo, error) { response, err := r.client.Get(MasterElectionPath, false, false) if err != nil || response.Node == nil { return nil, err @@ -232,7 +232,7 @@ func (r *RadioAPI) GetMasterInfo() (*MasterNodeInfo, error) { } // GetNodes returns the list of active cluster nodes. -func (r *RadioAPI) doGetNodes() ([]*NodeStatus, error) { +func (r *Client) doGetNodes() ([]*NodeStatus, error) { response, err := r.client.Get(NodePrefix, false, false) if err != nil || response.Node == nil { return nil, err @@ -250,7 +250,7 @@ func (r *RadioAPI) doGetNodes() ([]*NodeStatus, error) { return result, nil } -func (r *RadioAPI) GetNodes() ([]*NodeStatus, error) { +func (r *Client) GetNodes() ([]*NodeStatus, error) { return r.activeNodesCache.Get(r.doGetNodes) } diff --git a/cmd/radioctl/radioctl.go b/cmd/radioctl/radioctl.go index 1c93058243ac6fbdd63b06c060be31c6096d0451..73d3835e41fd50e15c811e48f916cb2c0c438311 100644 --- a/cmd/radioctl/radioctl.go +++ b/cmd/radioctl/radioctl.go @@ -40,9 +40,8 @@ func (b *BaseCommand) Command() *commander.Command { func (b *BaseCommand) Run(args []string) { } -func getClient() *autoradio.RadioAPI { - etc := autoradio.NewEtcdClient() - return autoradio.NewRadioAPI(etc) +func getClient() *autoradio.Client { + return autoradio.NewClient(autoradio.NewEtcdClient(false)) } func generateUsername(path string) string { diff --git a/cmd/radiod/radiod.go b/cmd/radiod/radiod.go index 3f469fbc44bdee40e81d3cf4256ae30158410869..25cec385d0f6462942a922adf8fd8ef6d5a4e9a4 100644 --- a/cmd/radiod/radiod.go +++ b/cmd/radiod/radiod.go @@ -34,7 +34,7 @@ func main() { instrumentation.NewCounter("radiod.restarts").Incr() - client := autoradio.NewEtcdClient() + client := autoradio.NewEtcdClient(true) bwLimitBytes := float64(*bwLimit * 1000000 / 8) n := node.NewRadioNode(*name, util.IPListWithDefault(*publicIps, "127.0.0.1"), *netDev, bwLimitBytes, client) diff --git a/cmd/redirectord/redirectord.go b/cmd/redirectord/redirectord.go index ed92ba5c01e568c06140b480fb010fd0a2b2f215..548c41668e5274c1cc2fec9aa675e97abd2a17f4 100644 --- a/cmd/redirectord/redirectord.go +++ b/cmd/redirectord/redirectord.go @@ -35,12 +35,11 @@ func main() { instrumentation.NewCounter("redirectord.restarts").Incr() - client := autoradio.NewEtcdClient() - api := autoradio.NewRadioAPI(client) + client := autoradio.NewClient(autoradio.NewEtcdClient(false)) - dnsRed := fe.NewDnsRedirector(api, *domain, util.IPListWithDefault(*publicIps, "127.0.0.1"), dnsTtl) + dnsRed := fe.NewDnsRedirector(client, *domain, util.IPListWithDefault(*publicIps, "127.0.0.1"), dnsTtl) dnsRed.Run(fmt.Sprintf(":%d", *dnsPort)) - red := fe.NewHttpRedirector(api, *domain, *lbPolicy) + red := fe.NewHttpRedirector(client, *domain, *lbPolicy) red.Run(fmt.Sprintf(":%d", *httpPort), *staticDir, *templateDir) } diff --git a/etcd_client.go b/etcd_client.go index 39dc469ca6acd617c213bb0d9475605fbce331c9..8bfc3a0243340cc01efbd4507e85e3b7f967e9bd 100644 --- a/etcd_client.go +++ b/etcd_client.go @@ -45,7 +45,7 @@ func resolveAll(input []string, proto string) []string { return result } -func NewEtcdClient() EtcdClient { +func NewEtcdClient(strongReads bool) EtcdClient { proto := "http" if *etcdCertFile != "" && *etcdKeyFile != "" { proto = "https" @@ -73,7 +73,12 @@ func NewEtcdClient() EtcdClient { c = etcd.NewClient(machines) } - c.SetConsistency(etcd.WEAK_CONSISTENCY) + if strongReads { + c.SetConsistency(etcd.STRONG_CONSISTENCY) + } else { + c.SetConsistency(etcd.WEAK_CONSISTENCY) + } + return c } diff --git a/fe/dns.go b/fe/dns.go index f5f9675451dc5c2233f05111992a75e29635d357..45c5b5a84221c51873891281db63fac8d4b34b14 100644 --- a/fe/dns.go +++ b/fe/dns.go @@ -32,7 +32,7 @@ var ( // DNS server. type DnsRedirector struct { - client *autoradio.RadioAPI + client *autoradio.Client origin string originNumParts int publicIps []net.IP @@ -42,7 +42,7 @@ type DnsRedirector struct { // NewDnsRedirector returns a DNS server for the given origin and // publicIp. The A records served will have the specified ttl. -func NewDnsRedirector(client *autoradio.RadioAPI, origin string, publicIps []net.IP, ttl int) *DnsRedirector { +func NewDnsRedirector(client *autoradio.Client, origin string, publicIps []net.IP, ttl int) *DnsRedirector { if !strings.HasSuffix(origin, ".") { origin += "." } diff --git a/fe/http.go b/fe/http.go index 56247e78abd90658e3f516c8575fb07634c724a6..b48a79d5ca093fd3d1108630dcd7fd6f01db5187 100644 --- a/fe/http.go +++ b/fe/http.go @@ -55,11 +55,11 @@ func statsHandler(h http.Handler) http.HandlerFunc { type HttpRedirector struct { domain string lb LoadBalancingPolicy - client *autoradio.RadioAPI + client *autoradio.Client template *template.Template } -func NewHttpRedirector(client *autoradio.RadioAPI, domain string, lbpolicy string) *HttpRedirector { +func NewHttpRedirector(client *autoradio.Client, domain string, lbpolicy string) *HttpRedirector { return &HttpRedirector{ client: client, domain: domain, diff --git a/node/node.go b/node/node.go index 24c3f2b75bc828e2d9efd944799ee400e3b6c51a..73a77fc89083ef9ef79fa1958eea3c5545f9ce89 100644 --- a/node/node.go +++ b/node/node.go @@ -280,8 +280,8 @@ func NewRadioNode(name string, ips []net.IP, netDev string, bwLimit float64, cli } } -// The presence goroutine continuously updates our entry in the list -// of nodes. +// The presence goroutine periodically updates our entry in the list +// of nodes with the current node statistics. func (rc *RadioNode) presence() { ticker := time.NewTicker(time.Duration(rc.heartbeat) * time.Second / 3) @@ -292,18 +292,18 @@ func (rc *RadioNode) presence() { select { case <-ticker.C: // Build our NodeStatus. - icecastSt := rc.icecast.GetStatus() - nodeSt := autoradio.NodeStatus{ + icecastStatus := rc.icecast.GetStatus() + nodeStatus := autoradio.NodeStatus{ Name: rc.name, IP: rc.ips, - IcecastUp: icecastSt.Up, - Mounts: icecastSt.Mounts, + IcecastUp: icecastStatus.Up, + Mounts: icecastStatus.Mounts, BandwidthUsage: rc.bw.GetUsage(), } // Update our node entry in the database. var buf bytes.Buffer - json.NewEncoder(&buf).Encode(&nodeSt) + json.NewEncoder(&buf).Encode(&nodeStatus) if _, err := rc.client.Set(key, buf.String(), rc.heartbeat); err != nil { log.Printf("presence: Set(): %s", err) }