Skip to content
Snippets Groups Projects
Commit 7c4bf748 authored by ale's avatar ale
Browse files

rename RadioAPI to Client; use strong read consistency for radiod

parent 22b8567e
No related branches found
No related tags found
No related merge requests found
......@@ -138,19 +138,19 @@ func (nc *nodesCache) Get(fn getNodesFunc) ([]*NodeStatus, error) {
return nc.nodes, err
}
// RadioAPI is the actual API to the streaming cluster's database.
type RadioAPI struct {
// Client is the actual API to the streaming cluster's database.
type Client struct {
client EtcdClient
activeNodesCache *nodesCache
}
func NewRadioAPI(client EtcdClient) *RadioAPI {
return &RadioAPI{client, newNodesCache()}
func NewClient(client EtcdClient) *Client {
return &Client{client, newNodesCache()}
}
// GetMount returns data on a specific mountpoint (returns nil if not
// found).
func (r *RadioAPI) GetMount(mountName string) (*Mount, error) {
func (r *Client) GetMount(mountName string) (*Mount, error) {
response, err := r.client.Get(mountEtcdPath(mountName), false, false)
if err != nil || response.Node == nil {
return nil, err
......@@ -167,7 +167,7 @@ func (r *RadioAPI) GetMount(mountName string) (*Mount, error) {
}
// SetMount creates or updates a mountpoint.
func (r *RadioAPI) SetMount(m *Mount) error {
func (r *Client) SetMount(m *Mount) error {
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(m); err != nil {
return err
......@@ -178,13 +178,13 @@ func (r *RadioAPI) SetMount(m *Mount) error {
}
// DelMount removes a mountpoint.
func (r *RadioAPI) DelMount(mountName string) error {
func (r *Client) DelMount(mountName string) error {
_, err := r.client.Delete(mountEtcdPath(mountName), false)
return err
}
// ListMounts returns a list of all the configured mountpoints.
func (r *RadioAPI) ListMounts() ([]*Mount, error) {
func (r *Client) ListMounts() ([]*Mount, error) {
response, err := r.client.Get(MountPrefix, true, false)
if err != nil || response.Node == nil {
return nil, err
......@@ -216,7 +216,7 @@ type MasterNodeInfo struct {
}
// GetMasterAddr returns the address of the current master server.
func (r *RadioAPI) GetMasterInfo() (*MasterNodeInfo, error) {
func (r *Client) GetMasterInfo() (*MasterNodeInfo, error) {
response, err := r.client.Get(MasterElectionPath, false, false)
if err != nil || response.Node == nil {
return nil, err
......@@ -232,7 +232,7 @@ func (r *RadioAPI) GetMasterInfo() (*MasterNodeInfo, error) {
}
// GetNodes returns the list of active cluster nodes.
func (r *RadioAPI) doGetNodes() ([]*NodeStatus, error) {
func (r *Client) doGetNodes() ([]*NodeStatus, error) {
response, err := r.client.Get(NodePrefix, false, false)
if err != nil || response.Node == nil {
return nil, err
......@@ -250,7 +250,7 @@ func (r *RadioAPI) doGetNodes() ([]*NodeStatus, error) {
return result, nil
}
func (r *RadioAPI) GetNodes() ([]*NodeStatus, error) {
func (r *Client) GetNodes() ([]*NodeStatus, error) {
return r.activeNodesCache.Get(r.doGetNodes)
}
......
......@@ -40,9 +40,8 @@ func (b *BaseCommand) Command() *commander.Command {
func (b *BaseCommand) Run(args []string) {
}
func getClient() *autoradio.RadioAPI {
etc := autoradio.NewEtcdClient()
return autoradio.NewRadioAPI(etc)
func getClient() *autoradio.Client {
return autoradio.NewClient(autoradio.NewEtcdClient(false))
}
func generateUsername(path string) string {
......
......@@ -34,7 +34,7 @@ func main() {
instrumentation.NewCounter("radiod.restarts").Incr()
client := autoradio.NewEtcdClient()
client := autoradio.NewEtcdClient(true)
bwLimitBytes := float64(*bwLimit * 1000000 / 8)
n := node.NewRadioNode(*name, util.IPListWithDefault(*publicIps, "127.0.0.1"), *netDev, bwLimitBytes, client)
......
......@@ -35,12 +35,11 @@ func main() {
instrumentation.NewCounter("redirectord.restarts").Incr()
client := autoradio.NewEtcdClient()
api := autoradio.NewRadioAPI(client)
client := autoradio.NewClient(autoradio.NewEtcdClient(false))
dnsRed := fe.NewDnsRedirector(api, *domain, util.IPListWithDefault(*publicIps, "127.0.0.1"), dnsTtl)
dnsRed := fe.NewDnsRedirector(client, *domain, util.IPListWithDefault(*publicIps, "127.0.0.1"), dnsTtl)
dnsRed.Run(fmt.Sprintf(":%d", *dnsPort))
red := fe.NewHttpRedirector(api, *domain, *lbPolicy)
red := fe.NewHttpRedirector(client, *domain, *lbPolicy)
red.Run(fmt.Sprintf(":%d", *httpPort), *staticDir, *templateDir)
}
......@@ -45,7 +45,7 @@ func resolveAll(input []string, proto string) []string {
return result
}
func NewEtcdClient() EtcdClient {
func NewEtcdClient(strongReads bool) EtcdClient {
proto := "http"
if *etcdCertFile != "" && *etcdKeyFile != "" {
proto = "https"
......@@ -73,7 +73,12 @@ func NewEtcdClient() EtcdClient {
c = etcd.NewClient(machines)
}
c.SetConsistency(etcd.WEAK_CONSISTENCY)
if strongReads {
c.SetConsistency(etcd.STRONG_CONSISTENCY)
} else {
c.SetConsistency(etcd.WEAK_CONSISTENCY)
}
return c
}
......
......@@ -32,7 +32,7 @@ var (
// DNS server.
type DnsRedirector struct {
client *autoradio.RadioAPI
client *autoradio.Client
origin string
originNumParts int
publicIps []net.IP
......@@ -42,7 +42,7 @@ type DnsRedirector struct {
// NewDnsRedirector returns a DNS server for the given origin and
// publicIp. The A records served will have the specified ttl.
func NewDnsRedirector(client *autoradio.RadioAPI, origin string, publicIps []net.IP, ttl int) *DnsRedirector {
func NewDnsRedirector(client *autoradio.Client, origin string, publicIps []net.IP, ttl int) *DnsRedirector {
if !strings.HasSuffix(origin, ".") {
origin += "."
}
......
......@@ -55,11 +55,11 @@ func statsHandler(h http.Handler) http.HandlerFunc {
type HttpRedirector struct {
domain string
lb LoadBalancingPolicy
client *autoradio.RadioAPI
client *autoradio.Client
template *template.Template
}
func NewHttpRedirector(client *autoradio.RadioAPI, domain string, lbpolicy string) *HttpRedirector {
func NewHttpRedirector(client *autoradio.Client, domain string, lbpolicy string) *HttpRedirector {
return &HttpRedirector{
client: client,
domain: domain,
......
......@@ -280,8 +280,8 @@ func NewRadioNode(name string, ips []net.IP, netDev string, bwLimit float64, cli
}
}
// The presence goroutine continuously updates our entry in the list
// of nodes.
// The presence goroutine periodically updates our entry in the list
// of nodes with the current node statistics.
func (rc *RadioNode) presence() {
ticker := time.NewTicker(time.Duration(rc.heartbeat) * time.Second / 3)
......@@ -292,18 +292,18 @@ func (rc *RadioNode) presence() {
select {
case <-ticker.C:
// Build our NodeStatus.
icecastSt := rc.icecast.GetStatus()
nodeSt := autoradio.NodeStatus{
icecastStatus := rc.icecast.GetStatus()
nodeStatus := autoradio.NodeStatus{
Name: rc.name,
IP: rc.ips,
IcecastUp: icecastSt.Up,
Mounts: icecastSt.Mounts,
IcecastUp: icecastStatus.Up,
Mounts: icecastStatus.Mounts,
BandwidthUsage: rc.bw.GetUsage(),
}
// Update our node entry in the database.
var buf bytes.Buffer
json.NewEncoder(&buf).Encode(&nodeSt)
json.NewEncoder(&buf).Encode(&nodeStatus)
if _, err := rc.client.Set(key, buf.String(), rc.heartbeat); err != nil {
log.Printf("presence: Set(): %s", err)
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment