diff --git a/api.go b/api.go index 4723534cee21db5b62362fea726b27c9b4e220d3..c07a67f7f45389870ed29f655edfcf61f17a9926 100644 --- a/api.go +++ b/api.go @@ -6,6 +6,7 @@ import ( "encoding/base64" "encoding/json" "errors" + "net" "strings" "sync" "time" @@ -78,8 +79,11 @@ type IcecastMountStatus struct { // Status of a node. This is used to report load and stream status. type NodeStatus struct { - // Public IP of this server. - IP string + // Short name of this node. + Name string + + // Public IP addresses of this server. + IP []net.IP // Is the Icecast server up? IcecastUp bool @@ -205,16 +209,28 @@ func (r *RadioAPI) ListMounts() ([]*Mount, error) { return result, nil } +// Location data for the master node. Having the IP address here saves +// another round-trip to etcd to retrieve the node info in the most +// common case. +type MasterNodeInfo struct { + Name string + IP []net.IP +} + // GetMasterAddr returns the address of the current master server. -func (r *RadioAPI) GetMasterAddr() (string, error) { +func (r *RadioAPI) GetMasterInfo() (*MasterNodeInfo, error) { response, err := r.client.Get(MasterElectionPath, false, false) if err != nil || response.Node == nil { - return "", err + return nil, err } if response.Node.Dir { - return "", ErrIsDirectory + return nil, ErrIsDirectory } - return response.Node.Value, nil + var m MasterNodeInfo + if err := json.NewDecoder(strings.NewReader(response.Node.Value)).Decode(&m); err != nil { + return nil, err + } + return &m, nil } // GetNodes returns the list of active cluster nodes. diff --git a/cmd/radiod/radiod.go b/cmd/radiod/radiod.go index 0298158fe748c2449b542ef04a4b7407d447926d..3f469fbc44bdee40e81d3cf4256ae30158410869 100644 --- a/cmd/radiod/radiod.go +++ b/cmd/radiod/radiod.go @@ -5,19 +5,30 @@ import ( "log" "os" "os/signal" + "strings" "syscall" "git.autistici.org/ale/autoradio" "git.autistici.org/ale/autoradio/instrumentation" "git.autistici.org/ale/autoradio/node" + "git.autistici.org/ale/autoradio/util" ) var ( - publicIp = flag.String("ip", "127.0.0.1", "Public IP for this machine") - netDev = flag.String("interface", "eth0", "Network interface to monitor for utilization") - bwLimit = flag.Int("bwlimit", 100, "Bandwidth usage limit (Mbps)") + name = flag.String("name", shortHostname(), "Name for this node") + publicIps = util.IPList("ip", "Public IP for this machine (may be specified more than once)") + netDev = flag.String("interface", "eth0", "Network interface to monitor for utilization") + bwLimit = flag.Int("bwlimit", 100, "Bandwidth usage limit (Mbps)") ) +func shortHostname() string { + hostname, _ := os.Hostname() + if r := strings.Index(hostname, "."); r >= 0 { + return hostname[:r] + } + return hostname +} + func main() { flag.Parse() @@ -25,7 +36,7 @@ func main() { client := autoradio.NewEtcdClient() bwLimitBytes := float64(*bwLimit * 1000000 / 8) - n := node.NewRadioNode(*publicIp, *netDev, bwLimitBytes, client) + n := node.NewRadioNode(*name, util.IPListWithDefault(*publicIps, "127.0.0.1"), *netDev, bwLimitBytes, client) // Set up a clean shutdown function on SIGTERM. stopch := make(chan os.Signal) diff --git a/cmd/redirectord/redirectord.go b/cmd/redirectord/redirectord.go index 0d16e8890f9f9b1ab8773cef6688b1f551771c57..ed92ba5c01e568c06140b480fb010fd0a2b2f215 100644 --- a/cmd/redirectord/redirectord.go +++ b/cmd/redirectord/redirectord.go @@ -8,13 +8,14 @@ import ( "git.autistici.org/ale/autoradio" "git.autistici.org/ale/autoradio/fe" "git.autistici.org/ale/autoradio/instrumentation" + "git.autistici.org/ale/autoradio/util" ) var ( - domain = flag.String("domain", "", "DNS domain to serve") - dnsPort = flag.Int("dns-port", 53, "DNS port") - httpPort = flag.Int("http-port", 80, "HTTP port") - publicIp = flag.String("ip", "127.0.0.1", "Public IP for this machine") + domain = flag.String("domain", "", "DNS domain to serve") + publicIps = util.IPList("ip", "Public IP for this machine (may be specified more than once)") + dnsPort = flag.Int("dns-port", 53, "DNS port") + httpPort = flag.Int("http-port", 80, "HTTP port") staticDir = flag.String("static-dir", "/usr/share/autoradio/htdocs/static", "Static content directory") templateDir = flag.String("template-dir", "/usr/share/autoradio/htdocs/templates", "HTML templates directory") @@ -37,7 +38,7 @@ func main() { client := autoradio.NewEtcdClient() api := autoradio.NewRadioAPI(client) - dnsRed := fe.NewDnsRedirector(api, *domain, *publicIp, dnsTtl) + dnsRed := fe.NewDnsRedirector(api, *domain, util.IPListWithDefault(*publicIps, "127.0.0.1"), dnsTtl) dnsRed.Run(fmt.Sprintf(":%d", *dnsPort)) red := fe.NewHttpRedirector(api, *domain, *lbPolicy) diff --git a/fe/common.go b/fe/common.go new file mode 100644 index 0000000000000000000000000000000000000000..367b5f4f834ae00cd0dd4d0ecf2b36ee7c8093b7 --- /dev/null +++ b/fe/common.go @@ -0,0 +1,33 @@ +package fe + +import ( + "math/rand" + "net" + "strings" +) + +// Filter a list of IP addresses by protocol. +func filterIpByProto(ips []net.IP, v6 bool) []net.IP { + var candidates []net.IP + for _, ip := range ips { + isIPv6 := (ip.To4() == nil) + if (isIPv6 && v6) || (!isIPv6 && !v6) { + candidates = append(candidates, ip) + } + } + return candidates +} + +// Pick a random IP for the specified proto. +func randomIpByProto(ips []net.IP, v6 bool) net.IP { + candidates := filterIpByProto(ips, v6) + if len(candidates) > 0 { + return candidates[rand.Intn(len(candidates))] + } + return nil +} + +// Return an appropriate name for a metric, given an IP address. +func ipToMetric(ip net.IP) string { + return strings.Replace(ip.String(), ".", "_", -1) +} diff --git a/fe/dns.go b/fe/dns.go index 05425950776214b936ddd7c326abe01918155518..f5f9675451dc5c2233f05111992a75e29635d357 100644 --- a/fe/dns.go +++ b/fe/dns.go @@ -35,14 +35,14 @@ type DnsRedirector struct { client *autoradio.RadioAPI origin string originNumParts int - publicIp string + publicIps []net.IP ttl int soa dns.RR } // NewDnsRedirector returns a DNS server for the given origin and // publicIp. The A records served will have the specified ttl. -func NewDnsRedirector(client *autoradio.RadioAPI, origin, publicIp string, ttl int) *DnsRedirector { +func NewDnsRedirector(client *autoradio.RadioAPI, origin string, publicIps []net.IP, ttl int) *DnsRedirector { if !strings.HasSuffix(origin, ".") { origin += "." } @@ -68,15 +68,15 @@ func NewDnsRedirector(client *autoradio.RadioAPI, origin, publicIp string, ttl i client: client, origin: origin, originNumParts: len(dns.SplitDomainName(origin)), - publicIp: publicIp, + publicIps: publicIps, ttl: ttl, soa: soa, } } -// Randomly shuffle a list of strings. -func shuffle(list []string) []string { - out := make([]string, len(list)) +// Randomly shuffle a list of addresses. +func shuffle(list []net.IP) []net.IP { + out := make([]net.IP, len(list)) for dst, src := range rand.Perm(len(list)) { out[dst] = list[src] } @@ -104,22 +104,34 @@ func ednsFromRequest(req, m *dns.Msg) { return } -// Create an A RR for a specific IP. -func (d *DnsRedirector) recordForIp(name string, ip string) *dns.A { - var fqdn string +func (d *DnsRedirector) withOrigin(name string) string { if name == "" { - fqdn = d.origin - } else { - fqdn = name + "." + d.origin + return d.origin + } + return name + "." + d.origin +} + +// Create an A RR for a specific IP. +func (d *DnsRedirector) recordForIp(name string, ip net.IP, v6 bool) dns.RR { + if v6 { + return &dns.AAAA{ + Hdr: dns.RR_Header{ + Name: d.withOrigin(name), + Rrtype: dns.TypeAAAA, + Class: dns.ClassINET, + Ttl: uint32(d.ttl), + }, + AAAA: ip, + } } return &dns.A{ Hdr: dns.RR_Header{ - Name: fqdn, + Name: d.withOrigin(name), Rrtype: dns.TypeA, Class: dns.ClassINET, Ttl: uint32(d.ttl), }, - A: net.ParseIP(ip), + A: ip, } } @@ -131,10 +143,10 @@ func (d *DnsRedirector) getQuestionName(req *dns.Msg) string { } // Flatten IPs from the list of nodes. -func flattenIPs(nodes []*autoradio.NodeStatus) []string { - var ips []string +func flattenIPs(nodes []*autoradio.NodeStatus) []net.IP { + var ips []net.IP for _, n := range nodes { - ips = append(ips, n.IP) + ips = append(ips, n.IP...) } return ips } @@ -161,7 +173,7 @@ func (d *DnsRedirector) serveDNS(w dns.ResponseWriter, req *dns.Msg) { m.Answer = append(m.Answer, d.soa) responseMsg = "SOA" - case req.Question[0].Qtype == dns.TypeA: + case req.Question[0].Qtype == dns.TypeA || req.Question[0].Qtype == dns.TypeAAAA: // Return an NXDOMAIN for unknown queries. if !isValidQuery(query) { m.SetRcode(req, dns.RcodeNameError) @@ -173,7 +185,7 @@ func (d *DnsRedirector) serveDNS(w dns.ResponseWriter, req *dns.Msg) { // really care about errors from GetNodes as long as // some nodes are returned (i.e. stale data from the // cache is accepted). - var ips []string + var ips []net.IP nodes, _ := d.client.GetNodes() if len(nodes) > 0 { ips = flattenIPs(nodes) @@ -182,8 +194,10 @@ func (d *DnsRedirector) serveDNS(w dns.ResponseWriter, req *dns.Msg) { // active nodes, fall back to serving our // public IP (just to avoid returning an empty // reply, which might be cached for longer). - ips = []string{d.publicIp} + ips = d.publicIps } + isV6 := (req.Question[0].Qtype == dns.TypeAAAA) + ips = filterIpByProto(ips, isV6) // Shuffle the list in random order, and keep only the // first N results. @@ -195,7 +209,7 @@ func (d *DnsRedirector) serveDNS(w dns.ResponseWriter, req *dns.Msg) { m.SetReply(req) m.MsgHdr.Authoritative = true for _, ip := range ips { - rec := d.recordForIp(query, ip) + rec := d.recordForIp(query, ip, isV6) m.Answer = append(m.Answer, rec) dnsTargetStats.IncrVar(ipToMetric(ip)) } @@ -234,7 +248,3 @@ func (d *DnsRedirector) Run(addr string) { }(proto) } } - -func ipToMetric(ip string) string { - return strings.Replace(ip, ".", "_", -1) -} diff --git a/fe/http.go b/fe/http.go index ed0f7d82596e0c9bf45efe097488f0e29fcc8532..56247e78abd90658e3f516c8575fb07634c724a6 100644 --- a/fe/http.go +++ b/fe/http.go @@ -67,12 +67,20 @@ func NewHttpRedirector(client *autoradio.RadioAPI, domain string, lbpolicy strin } } +// Pick a random IP with a protocol appropriate to the request (based +// on the remote address). +func randomIpForRequest(ips []net.IP, r *http.Request) net.IP { + remoteAddr := net.ParseIP(r.RemoteAddr) + isV6 := (remoteAddr != nil && (remoteAddr.To4() == nil)) + return randomIpByProto(ips, isV6) +} + // Return an active node, chosen according to the current load // balancing policy. -func (h *HttpRedirector) pickActiveNode() string { +func (h *HttpRedirector) pickActiveNode(r *http.Request) net.IP { nodes, _ := h.client.GetNodes() if nodes == nil { - return "" + return nil } // Filter nodes where Icecast is reported to be up. @@ -83,24 +91,24 @@ func (h *HttpRedirector) pickActiveNode() string { } } if len(okNodes) == 0 { - return "" + return nil } result := h.lb.GetNode(okNodes) if result == nil { - return "" + return nil } - return result.IP + return randomIpForRequest(result.IP, r) } -func icecastAddr(server string) string { - return net.JoinHostPort(server, strconv.Itoa(autoradio.IcecastPort)) +func icecastAddr(server net.IP) string { + return net.JoinHostPort(server.String(), strconv.Itoa(autoradio.IcecastPort)) } -func streamUrl(server, mountName string) string { +func streamUrl(server net.IP, mountName string) string { var serverAddr string if *proxyStreams { - serverAddr = server + serverAddr = server.String() } else { serverAddr = icecastAddr(server) } @@ -173,8 +181,8 @@ func redirect(w http.ResponseWriter, r *http.Request, urlStr string) { // Serve a response for a client connection to a relay. func (h *HttpRedirector) serveRelay(mount *autoradio.Mount, w http.ResponseWriter, r *http.Request) { // Find an active node. - relayAddr := h.pickActiveNode() - if relayAddr == "" { + relayAddr := h.pickActiveNode(r) + if relayAddr == nil { http.Error(w, "No active nodes", http.StatusServiceUnavailable) return } @@ -201,7 +209,7 @@ func (h *HttpRedirector) serveSource(mount *autoradio.Mount, w http.ResponseWrit sourceConnections.Incr() // Find the current master node. - masterAddr, err := h.client.GetMasterAddr() + masterInfo, err := h.client.GetMasterInfo() if err != nil { log.Printf("source error: no master: %v", err) http.Error(w, err.Error(), http.StatusServiceUnavailable) @@ -214,7 +222,7 @@ func (h *HttpRedirector) serveSource(mount *autoradio.Mount, w http.ResponseWrit proxy := &ReverseProxy{ Director: func(req *http.Request) { req.URL.Scheme = "http" - req.URL.Host = icecastAddr(masterAddr) + req.URL.Host = icecastAddr(masterInfo.IP[0]) req.URL.Path = autoradio.MountNameToIcecastPath(mount.Name) }, FlushInterval: 500 * time.Millisecond, diff --git a/node/node.go b/node/node.go index a4e9466d823cdce8d23c28acfe6023e744886e57..4d9628d6b4d4af063bb1c6d14bbec30d8b4f0a7a 100644 --- a/node/node.go +++ b/node/node.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/json" "log" + "net" "strings" "sync" "time" @@ -217,7 +218,8 @@ func (w *configWatcher) Start() { type RadioNode struct { Config *clusterConfig - ip string + name string + ips []net.IP client *etcd.Client me *masterelection.MasterElection watcher *configWatcher @@ -228,7 +230,7 @@ type RadioNode struct { stop chan bool } -func NewRadioNode(ip, netDev string, bwLimit float64, client *etcd.Client) *RadioNode { +func NewRadioNode(name string, ips []net.IP, netDev string, bwLimit float64, client *etcd.Client) *RadioNode { config := newClusterConfig() // Network updates trigger icecast reconfiguration. This @@ -247,19 +249,30 @@ func NewRadioNode(ip, netDev string, bwLimit float64, client *etcd.Client) *Radi // Global 'stop' channel. stopch := make(chan bool) + // Location information advertised when this node is master. + minfo := &autoradio.MasterNodeInfo{ + Name: name, + IP: ips, + } + minfodata, err := json.Marshal(minfo) + if err != nil { + log.Fatal(err) + } + return &RadioNode{ Config: config, - ip: ip, + name: name, + ips: ips, client: client, me: masterelection.NewMasterElection( client, autoradio.MasterElectionPath, - ip, + string(minfodata), 5, mech, stopch), watcher: newConfigSyncer(client, config, upch, stopch), - icecast: NewIcecastController(ip, stopch), + icecast: NewIcecastController(name, stopch), livenessTtl: 2, bw: bwmonitor.NewBandwidthUsageMonitor(netDev, bwLimit), upch: upch, @@ -272,13 +285,17 @@ func NewRadioNode(ip, netDev string, bwLimit float64, client *etcd.Client) *Radi func (rc *RadioNode) presence() { ticker := time.NewTicker(time.Duration(rc.livenessTtl/2) * time.Second) + // Register ourselves using the node name. + key := autoradio.NodePrefix + rc.name + for { select { case <-ticker.C: // Build our NodeStatus. icecastSt := rc.icecast.GetStatus() nodeSt := autoradio.NodeStatus{ - IP: rc.ip, + Name: rc.name, + IP: rc.ips, IcecastUp: icecastSt.Up, Mounts: icecastSt.Mounts, BandwidthUsage: rc.bw.GetUsage(), @@ -287,7 +304,7 @@ func (rc *RadioNode) presence() { // Update our node entry in the database. var buf bytes.Buffer json.NewEncoder(&buf).Encode(&nodeSt) - if _, err := rc.client.Set(autoradio.NodePrefix+rc.ip, buf.String(), rc.livenessTtl); err != nil { + if _, err := rc.client.Set(key, buf.String(), rc.livenessTtl); err != nil { log.Printf("presence: Set(): %s", err) } diff --git a/util/flag.go b/util/flag.go new file mode 100644 index 0000000000000000000000000000000000000000..0f6823cdfff933718b002616b02a609549f29969 --- /dev/null +++ b/util/flag.go @@ -0,0 +1,40 @@ +package util + +import ( + "flag" + "fmt" + "net" + "strings" +) + +type ipList []net.IP + +func (l *ipList) String() string { + var sl []string + for _, ip := range *l { + sl = append(sl, ip.String()) + } + return strings.Join(sl, ",") +} + +func (l *ipList) Set(value string) error { + ip := net.ParseIP(value) + if ip == nil { + return fmt.Errorf("Unable to parse IP address \"%s\"", value) + } + *l = append(*l, ip) + return nil +} + +func IPList(name, help string) *[]net.IP { + var l ipList + flag.Var(&l, name, help) + return (*[]net.IP)(&l) +} + +func IPListWithDefault(l []net.IP, deflt string) []net.IP { + if len(l) == 0 { + return []net.IP{net.ParseIP(deflt)} + } + return l +}