Skip to content
Snippets Groups Projects
Commit 01d010c9 authored by ale's avatar ale
Browse files

support arbitrary public IPs

Introduce node names, allowing a node to have multiple IP addresses.
Also, make the front-end code IPv6-aware and support AAAA records.
parent dc5e2c0b
No related branches found
No related tags found
No related merge requests found
......@@ -6,6 +6,7 @@ import (
"encoding/base64"
"encoding/json"
"errors"
"net"
"strings"
"sync"
"time"
......@@ -78,8 +79,11 @@ type IcecastMountStatus struct {
// Status of a node. This is used to report load and stream status.
type NodeStatus struct {
// Public IP of this server.
IP string
// Short name of this node.
Name string
// Public IP addresses of this server.
IP []net.IP
// Is the Icecast server up?
IcecastUp bool
......@@ -205,16 +209,28 @@ func (r *RadioAPI) ListMounts() ([]*Mount, error) {
return result, nil
}
// Location data for the master node. Having the IP address here saves
// another round-trip to etcd to retrieve the node info in the most
// common case.
type MasterNodeInfo struct {
Name string
IP []net.IP
}
// GetMasterAddr returns the address of the current master server.
func (r *RadioAPI) GetMasterAddr() (string, error) {
func (r *RadioAPI) GetMasterInfo() (*MasterNodeInfo, error) {
response, err := r.client.Get(MasterElectionPath, false, false)
if err != nil || response.Node == nil {
return "", err
return nil, err
}
if response.Node.Dir {
return "", ErrIsDirectory
return nil, ErrIsDirectory
}
return response.Node.Value, nil
var m MasterNodeInfo
if err := json.NewDecoder(strings.NewReader(response.Node.Value)).Decode(&m); err != nil {
return nil, err
}
return &m, nil
}
// GetNodes returns the list of active cluster nodes.
......
......@@ -5,19 +5,30 @@ import (
"log"
"os"
"os/signal"
"strings"
"syscall"
"git.autistici.org/ale/autoradio"
"git.autistici.org/ale/autoradio/instrumentation"
"git.autistici.org/ale/autoradio/node"
"git.autistici.org/ale/autoradio/util"
)
var (
publicIp = flag.String("ip", "127.0.0.1", "Public IP for this machine")
netDev = flag.String("interface", "eth0", "Network interface to monitor for utilization")
bwLimit = flag.Int("bwlimit", 100, "Bandwidth usage limit (Mbps)")
name = flag.String("name", shortHostname(), "Name for this node")
publicIps = util.IPList("ip", "Public IP for this machine (may be specified more than once)")
netDev = flag.String("interface", "eth0", "Network interface to monitor for utilization")
bwLimit = flag.Int("bwlimit", 100, "Bandwidth usage limit (Mbps)")
)
func shortHostname() string {
hostname, _ := os.Hostname()
if r := strings.Index(hostname, "."); r >= 0 {
return hostname[:r]
}
return hostname
}
func main() {
flag.Parse()
......@@ -25,7 +36,7 @@ func main() {
client := autoradio.NewEtcdClient()
bwLimitBytes := float64(*bwLimit * 1000000 / 8)
n := node.NewRadioNode(*publicIp, *netDev, bwLimitBytes, client)
n := node.NewRadioNode(*name, util.IPListWithDefault(*publicIps, "127.0.0.1"), *netDev, bwLimitBytes, client)
// Set up a clean shutdown function on SIGTERM.
stopch := make(chan os.Signal)
......
......@@ -8,13 +8,14 @@ import (
"git.autistici.org/ale/autoradio"
"git.autistici.org/ale/autoradio/fe"
"git.autistici.org/ale/autoradio/instrumentation"
"git.autistici.org/ale/autoradio/util"
)
var (
domain = flag.String("domain", "", "DNS domain to serve")
dnsPort = flag.Int("dns-port", 53, "DNS port")
httpPort = flag.Int("http-port", 80, "HTTP port")
publicIp = flag.String("ip", "127.0.0.1", "Public IP for this machine")
domain = flag.String("domain", "", "DNS domain to serve")
publicIps = util.IPList("ip", "Public IP for this machine (may be specified more than once)")
dnsPort = flag.Int("dns-port", 53, "DNS port")
httpPort = flag.Int("http-port", 80, "HTTP port")
staticDir = flag.String("static-dir", "/usr/share/autoradio/htdocs/static", "Static content directory")
templateDir = flag.String("template-dir", "/usr/share/autoradio/htdocs/templates", "HTML templates directory")
......@@ -37,7 +38,7 @@ func main() {
client := autoradio.NewEtcdClient()
api := autoradio.NewRadioAPI(client)
dnsRed := fe.NewDnsRedirector(api, *domain, *publicIp, dnsTtl)
dnsRed := fe.NewDnsRedirector(api, *domain, util.IPListWithDefault(*publicIps, "127.0.0.1"), dnsTtl)
dnsRed.Run(fmt.Sprintf(":%d", *dnsPort))
red := fe.NewHttpRedirector(api, *domain, *lbPolicy)
......
package fe
import (
"math/rand"
"net"
"strings"
)
// Filter a list of IP addresses by protocol.
func filterIpByProto(ips []net.IP, v6 bool) []net.IP {
var candidates []net.IP
for _, ip := range ips {
isIPv6 := (ip.To4() == nil)
if (isIPv6 && v6) || (!isIPv6 && !v6) {
candidates = append(candidates, ip)
}
}
return candidates
}
// Pick a random IP for the specified proto.
func randomIpByProto(ips []net.IP, v6 bool) net.IP {
candidates := filterIpByProto(ips, v6)
if len(candidates) > 0 {
return candidates[rand.Intn(len(candidates))]
}
return nil
}
// Return an appropriate name for a metric, given an IP address.
func ipToMetric(ip net.IP) string {
return strings.Replace(ip.String(), ".", "_", -1)
}
......@@ -35,14 +35,14 @@ type DnsRedirector struct {
client *autoradio.RadioAPI
origin string
originNumParts int
publicIp string
publicIps []net.IP
ttl int
soa dns.RR
}
// NewDnsRedirector returns a DNS server for the given origin and
// publicIp. The A records served will have the specified ttl.
func NewDnsRedirector(client *autoradio.RadioAPI, origin, publicIp string, ttl int) *DnsRedirector {
func NewDnsRedirector(client *autoradio.RadioAPI, origin string, publicIps []net.IP, ttl int) *DnsRedirector {
if !strings.HasSuffix(origin, ".") {
origin += "."
}
......@@ -68,15 +68,15 @@ func NewDnsRedirector(client *autoradio.RadioAPI, origin, publicIp string, ttl i
client: client,
origin: origin,
originNumParts: len(dns.SplitDomainName(origin)),
publicIp: publicIp,
publicIps: publicIps,
ttl: ttl,
soa: soa,
}
}
// Randomly shuffle a list of strings.
func shuffle(list []string) []string {
out := make([]string, len(list))
// Randomly shuffle a list of addresses.
func shuffle(list []net.IP) []net.IP {
out := make([]net.IP, len(list))
for dst, src := range rand.Perm(len(list)) {
out[dst] = list[src]
}
......@@ -104,22 +104,34 @@ func ednsFromRequest(req, m *dns.Msg) {
return
}
// Create an A RR for a specific IP.
func (d *DnsRedirector) recordForIp(name string, ip string) *dns.A {
var fqdn string
func (d *DnsRedirector) withOrigin(name string) string {
if name == "" {
fqdn = d.origin
} else {
fqdn = name + "." + d.origin
return d.origin
}
return name + "." + d.origin
}
// Create an A RR for a specific IP.
func (d *DnsRedirector) recordForIp(name string, ip net.IP, v6 bool) dns.RR {
if v6 {
return &dns.AAAA{
Hdr: dns.RR_Header{
Name: d.withOrigin(name),
Rrtype: dns.TypeAAAA,
Class: dns.ClassINET,
Ttl: uint32(d.ttl),
},
AAAA: ip,
}
}
return &dns.A{
Hdr: dns.RR_Header{
Name: fqdn,
Name: d.withOrigin(name),
Rrtype: dns.TypeA,
Class: dns.ClassINET,
Ttl: uint32(d.ttl),
},
A: net.ParseIP(ip),
A: ip,
}
}
......@@ -131,10 +143,10 @@ func (d *DnsRedirector) getQuestionName(req *dns.Msg) string {
}
// Flatten IPs from the list of nodes.
func flattenIPs(nodes []*autoradio.NodeStatus) []string {
var ips []string
func flattenIPs(nodes []*autoradio.NodeStatus) []net.IP {
var ips []net.IP
for _, n := range nodes {
ips = append(ips, n.IP)
ips = append(ips, n.IP...)
}
return ips
}
......@@ -161,7 +173,7 @@ func (d *DnsRedirector) serveDNS(w dns.ResponseWriter, req *dns.Msg) {
m.Answer = append(m.Answer, d.soa)
responseMsg = "SOA"
case req.Question[0].Qtype == dns.TypeA:
case req.Question[0].Qtype == dns.TypeA || req.Question[0].Qtype == dns.TypeAAAA:
// Return an NXDOMAIN for unknown queries.
if !isValidQuery(query) {
m.SetRcode(req, dns.RcodeNameError)
......@@ -173,7 +185,7 @@ func (d *DnsRedirector) serveDNS(w dns.ResponseWriter, req *dns.Msg) {
// really care about errors from GetNodes as long as
// some nodes are returned (i.e. stale data from the
// cache is accepted).
var ips []string
var ips []net.IP
nodes, _ := d.client.GetNodes()
if len(nodes) > 0 {
ips = flattenIPs(nodes)
......@@ -182,8 +194,10 @@ func (d *DnsRedirector) serveDNS(w dns.ResponseWriter, req *dns.Msg) {
// active nodes, fall back to serving our
// public IP (just to avoid returning an empty
// reply, which might be cached for longer).
ips = []string{d.publicIp}
ips = d.publicIps
}
isV6 := (req.Question[0].Qtype == dns.TypeAAAA)
ips = filterIpByProto(ips, isV6)
// Shuffle the list in random order, and keep only the
// first N results.
......@@ -195,7 +209,7 @@ func (d *DnsRedirector) serveDNS(w dns.ResponseWriter, req *dns.Msg) {
m.SetReply(req)
m.MsgHdr.Authoritative = true
for _, ip := range ips {
rec := d.recordForIp(query, ip)
rec := d.recordForIp(query, ip, isV6)
m.Answer = append(m.Answer, rec)
dnsTargetStats.IncrVar(ipToMetric(ip))
}
......@@ -234,7 +248,3 @@ func (d *DnsRedirector) Run(addr string) {
}(proto)
}
}
func ipToMetric(ip string) string {
return strings.Replace(ip, ".", "_", -1)
}
......@@ -67,12 +67,20 @@ func NewHttpRedirector(client *autoradio.RadioAPI, domain string, lbpolicy strin
}
}
// Pick a random IP with a protocol appropriate to the request (based
// on the remote address).
func randomIpForRequest(ips []net.IP, r *http.Request) net.IP {
remoteAddr := net.ParseIP(r.RemoteAddr)
isV6 := (remoteAddr != nil && (remoteAddr.To4() == nil))
return randomIpByProto(ips, isV6)
}
// Return an active node, chosen according to the current load
// balancing policy.
func (h *HttpRedirector) pickActiveNode() string {
func (h *HttpRedirector) pickActiveNode(r *http.Request) net.IP {
nodes, _ := h.client.GetNodes()
if nodes == nil {
return ""
return nil
}
// Filter nodes where Icecast is reported to be up.
......@@ -83,24 +91,24 @@ func (h *HttpRedirector) pickActiveNode() string {
}
}
if len(okNodes) == 0 {
return ""
return nil
}
result := h.lb.GetNode(okNodes)
if result == nil {
return ""
return nil
}
return result.IP
return randomIpForRequest(result.IP, r)
}
func icecastAddr(server string) string {
return net.JoinHostPort(server, strconv.Itoa(autoradio.IcecastPort))
func icecastAddr(server net.IP) string {
return net.JoinHostPort(server.String(), strconv.Itoa(autoradio.IcecastPort))
}
func streamUrl(server, mountName string) string {
func streamUrl(server net.IP, mountName string) string {
var serverAddr string
if *proxyStreams {
serverAddr = server
serverAddr = server.String()
} else {
serverAddr = icecastAddr(server)
}
......@@ -173,8 +181,8 @@ func redirect(w http.ResponseWriter, r *http.Request, urlStr string) {
// Serve a response for a client connection to a relay.
func (h *HttpRedirector) serveRelay(mount *autoradio.Mount, w http.ResponseWriter, r *http.Request) {
// Find an active node.
relayAddr := h.pickActiveNode()
if relayAddr == "" {
relayAddr := h.pickActiveNode(r)
if relayAddr == nil {
http.Error(w, "No active nodes", http.StatusServiceUnavailable)
return
}
......@@ -201,7 +209,7 @@ func (h *HttpRedirector) serveSource(mount *autoradio.Mount, w http.ResponseWrit
sourceConnections.Incr()
// Find the current master node.
masterAddr, err := h.client.GetMasterAddr()
masterInfo, err := h.client.GetMasterInfo()
if err != nil {
log.Printf("source error: no master: %v", err)
http.Error(w, err.Error(), http.StatusServiceUnavailable)
......@@ -214,7 +222,7 @@ func (h *HttpRedirector) serveSource(mount *autoradio.Mount, w http.ResponseWrit
proxy := &ReverseProxy{
Director: func(req *http.Request) {
req.URL.Scheme = "http"
req.URL.Host = icecastAddr(masterAddr)
req.URL.Host = icecastAddr(masterInfo.IP[0])
req.URL.Path = autoradio.MountNameToIcecastPath(mount.Name)
},
FlushInterval: 500 * time.Millisecond,
......
......@@ -4,6 +4,7 @@ import (
"bytes"
"encoding/json"
"log"
"net"
"strings"
"sync"
"time"
......@@ -217,7 +218,8 @@ func (w *configWatcher) Start() {
type RadioNode struct {
Config *clusterConfig
ip string
name string
ips []net.IP
client *etcd.Client
me *masterelection.MasterElection
watcher *configWatcher
......@@ -228,7 +230,7 @@ type RadioNode struct {
stop chan bool
}
func NewRadioNode(ip, netDev string, bwLimit float64, client *etcd.Client) *RadioNode {
func NewRadioNode(name string, ips []net.IP, netDev string, bwLimit float64, client *etcd.Client) *RadioNode {
config := newClusterConfig()
// Network updates trigger icecast reconfiguration. This
......@@ -247,19 +249,30 @@ func NewRadioNode(ip, netDev string, bwLimit float64, client *etcd.Client) *Radi
// Global 'stop' channel.
stopch := make(chan bool)
// Location information advertised when this node is master.
minfo := &autoradio.MasterNodeInfo{
Name: name,
IP: ips,
}
minfodata, err := json.Marshal(minfo)
if err != nil {
log.Fatal(err)
}
return &RadioNode{
Config: config,
ip: ip,
name: name,
ips: ips,
client: client,
me: masterelection.NewMasterElection(
client,
autoradio.MasterElectionPath,
ip,
string(minfodata),
5,
mech,
stopch),
watcher: newConfigSyncer(client, config, upch, stopch),
icecast: NewIcecastController(ip, stopch),
icecast: NewIcecastController(name, stopch),
livenessTtl: 2,
bw: bwmonitor.NewBandwidthUsageMonitor(netDev, bwLimit),
upch: upch,
......@@ -272,13 +285,17 @@ func NewRadioNode(ip, netDev string, bwLimit float64, client *etcd.Client) *Radi
func (rc *RadioNode) presence() {
ticker := time.NewTicker(time.Duration(rc.livenessTtl/2) * time.Second)
// Register ourselves using the node name.
key := autoradio.NodePrefix + rc.name
for {
select {
case <-ticker.C:
// Build our NodeStatus.
icecastSt := rc.icecast.GetStatus()
nodeSt := autoradio.NodeStatus{
IP: rc.ip,
Name: rc.name,
IP: rc.ips,
IcecastUp: icecastSt.Up,
Mounts: icecastSt.Mounts,
BandwidthUsage: rc.bw.GetUsage(),
......@@ -287,7 +304,7 @@ func (rc *RadioNode) presence() {
// Update our node entry in the database.
var buf bytes.Buffer
json.NewEncoder(&buf).Encode(&nodeSt)
if _, err := rc.client.Set(autoradio.NodePrefix+rc.ip, buf.String(), rc.livenessTtl); err != nil {
if _, err := rc.client.Set(key, buf.String(), rc.livenessTtl); err != nil {
log.Printf("presence: Set(): %s", err)
}
......
package util
import (
"flag"
"fmt"
"net"
"strings"
)
type ipList []net.IP
func (l *ipList) String() string {
var sl []string
for _, ip := range *l {
sl = append(sl, ip.String())
}
return strings.Join(sl, ",")
}
func (l *ipList) Set(value string) error {
ip := net.ParseIP(value)
if ip == nil {
return fmt.Errorf("Unable to parse IP address \"%s\"", value)
}
*l = append(*l, ip)
return nil
}
func IPList(name, help string) *[]net.IP {
var l ipList
flag.Var(&l, name, help)
return (*[]net.IP)(&l)
}
func IPListWithDefault(l []net.IP, deflt string) []net.IP {
if len(l) == 0 {
return []net.IP{net.ParseIP(deflt)}
}
return l
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment