diff --git a/api.go b/api.go index a97ba29488f9715b38a215d092906d5ba889d328..68b8596ac47c39ec5f154b99ba71f616a1299517 100644 --- a/api.go +++ b/api.go @@ -226,6 +226,11 @@ type NodeStatus struct { // Public IP addresses of this server. IP []net.IP + // Internal IP addresses of this server (i.e. reachable from + // the other peers of the cluster), if different from IP + // above. Can be nil, in which case the value of IP is used. + InternalIP []net.IP + // Is the Icecast server up? IcecastUp bool @@ -239,6 +244,13 @@ type NodeStatus struct { MaxListeners int } +func (ns *NodeStatus) GetInternalIP() []net.IP { + if len(ns.InternalIP) != 0 { + return ns.InternalIP + } + return ns.IP +} + // NumListeners returns the total number of listeners across all // mounts on this node. func (ns *NodeStatus) NumListeners() int { @@ -352,8 +364,21 @@ func (r *Client) ListMounts() ([]*Mount, error) { // IP address here saves another round-trip to etcd to retrieve the // node info in the most common case. type MasterNodeInfo struct { + // Name of the node. Name string - IP []net.IP + + // Public IPs of the node. + IP []net.IP + + // Internal IPs of the node (possibly the same as IP). + InternalIP []net.IP +} + +func (m MasterNodeInfo) GetInternalIP() []net.IP { + if len(m.InternalIP) != 0 { + return m.InternalIP + } + return m.IP } // GetMasterInfo returns the address of the current master server. diff --git a/cmd/radiod/radiod.go b/cmd/radiod/radiod.go index 85ef0e4b6df6bb4a282b698377d8ff44a63d54e8..ffce1854cb58ad7583a89dc002a24f953b3e8c40 100644 --- a/cmd/radiod/radiod.go +++ b/cmd/radiod/radiod.go @@ -15,11 +15,12 @@ import ( ) var ( - name = flag.String("name", shortHostname(), "Name for this node") - publicIps = util.IPList("ip", "Public IP for this machine (may be specified more than once)") - netDev = flag.String("interface", "eth0", "Network interface to monitor for utilization") - bwLimit = flag.Int("bwlimit", 100, "Bandwidth usage limit (Mbps)") - maxClients = flag.Int("max-clients", 1000, "Maximum number of connected clients") + name = flag.String("name", shortHostname(), "Name for this node") + publicIPs = util.IPList("ip", "Public IP for this machine (may be specified more than once)") + internalIPs = util.IPList("internal-ip", "Internal IP for this machine (within the cluster), if different than --ip") + netDev = flag.String("interface", "eth0", "Network interface to monitor for utilization") + bwLimit = flag.Int("bwlimit", 100, "Bandwidth usage limit (Mbps)") + maxClients = flag.Int("max-clients", 1000, "Maximum number of connected clients") ) func shortHostname() string { @@ -38,7 +39,7 @@ func main() { client := autoradio.NewEtcdClient(true) bwLimitBytes := float64(*bwLimit * 1000000 / 8) - n := node.NewRadioNode(*name, util.IPListWithDefault(*publicIps, "127.0.0.1"), *netDev, bwLimitBytes, *maxClients, client) + n := node.NewRadioNode(*name, util.IPListWithDefault(*publicIPs, "127.0.0.1"), *internalIPs, *netDev, bwLimitBytes, *maxClients, client) // Set up a clean shutdown function on SIGTERM. stopch := make(chan os.Signal) diff --git a/fe/http.go b/fe/http.go index efbbce864a58d2d72fddf32d9b0abeca0c4f09aa..596f1482e09a95de54a2a96694d5be226c662773 100644 --- a/fe/http.go +++ b/fe/http.go @@ -252,8 +252,8 @@ func (h *HTTPRedirector) serveSource(mount *autoradio.Mount, w http.ResponseWrit // Create a ReverseProxy on the fly with the right backend // address. Blindly assume that the master has at least one - // public IP, and that we can reach it. - target := icecastAddr(masterInfo.IP[0]) + // internal IP, and that we can reach it. + target := icecastAddr(masterInfo.GetInternalIP()[0]) tracker.setConnState(r, StateSource, target) proxy := &ReverseProxy{ Director: func(req *http.Request) { diff --git a/node/liquidsoap.go b/node/liquidsoap.go index 752a1c9e93c1bb45ae932cbdc86e0c07528fbcab..50a4a73866659016c87c9d93f336439f3c378666 100644 --- a/node/liquidsoap.go +++ b/node/liquidsoap.go @@ -57,6 +57,11 @@ type liquidsoapParams struct { // Create new parameters for liquidsoap for a transcoding mount. If // mount.Transcoding is nil, this function will panic, so the caller // should check mount.HasTranscoder(). +// +// The source URL points at localhost, relying on the front-end +// proxying to actually send the traffic to the master. This has the +// advantage that we don't have to reconfigure liquidsoap on +// mastership changes. func newLiquidsoapParams(mount *autoradio.Mount) *liquidsoapParams { return &liquidsoapParams{ SourceURL: fmt.Sprintf("http://localhost%s", mount.Transcoding.SourceName), diff --git a/node/node.go b/node/node.go index 4af9f0ef09de67b1b402907892fce6658966bad9..24122ade00fb314542e5576acf96c25d818d0d89 100644 --- a/node/node.go +++ b/node/node.go @@ -151,9 +151,10 @@ type RadioNode struct { wg sync.WaitGroup client autoradio.EtcdClient - name string - ips []net.IP - config *clusterConfig + name string + ips []net.IP + internalIPs []net.IP + config *clusterConfig me *masterelection.MasterElection syncer *watcher.Syncer @@ -184,7 +185,7 @@ type RadioNode struct { } // NewRadioNode creates and initializes a new autoradio node. -func NewRadioNode(name string, ips []net.IP, netDev string, bwLimit float64, maxListeners int, client autoradio.EtcdClient) *RadioNode { +func NewRadioNode(name string, ips, internalIPs []net.IP, netDev string, bwLimit float64, maxListeners int, client autoradio.EtcdClient) *RadioNode { // Global 'stop' channel. stopch := make(chan bool) @@ -202,8 +203,9 @@ func NewRadioNode(name string, ips []net.IP, netDev string, bwLimit float64, max // Location information advertised when this node is master. minfo := &autoradio.MasterNodeInfo{ - Name: name, - IP: ips, + Name: name, + IP: ips, + InternalIP: internalIPs, } minfodata, err := json.Marshal(minfo) if err != nil { @@ -218,10 +220,11 @@ func NewRadioNode(name string, ips []net.IP, netDev string, bwLimit float64, max // never be rejected by Icecast). config := newClusterConfig() rc := &RadioNode{ - config: config, - name: name, - ips: ips, - client: client, + config: config, + name: name, + ips: ips, + internalIPs: internalIPs, + client: client, me: masterelection.New( client, autoradio.MasterElectionPath, @@ -250,6 +253,7 @@ func (rc *RadioNode) getNodeStatus() string { nodeStatus := autoradio.NodeStatus{ Name: rc.name, IP: rc.ips, + InternalIP: rc.internalIPs, IcecastUp: icecastStatus.Up, Mounts: icecastStatus.Mounts, BandwidthUsage: rc.bw.GetUsage(), @@ -262,15 +266,16 @@ func (rc *RadioNode) getNodeStatus() string { return buf.String() } -// Get a valid IP address for the current master node (to be passed to -// Icecast). Since we don't really know much about network topology, -// just pick the first IP address associated with the master node. -func (rc *RadioNode) getMasterAddr() net.IP { +// Get a valid internal IP address for the current master node (to be +// passed to Icecast). Since we don't really know much about network +// topology, just pick the first IP address associated with the master +// node. +func (rc *RadioNode) getMasterInternalAddr() net.IP { var info autoradio.MasterNodeInfo - if err := json.NewDecoder(strings.NewReader(rc.me.GetMasterData())).Decode(&info); err != nil || len(info.IP) == 0 { + if err := json.NewDecoder(strings.NewReader(rc.me.GetMasterData())).Decode(&info); err != nil || len(info.GetInternalIP()) == 0 { return nil } - return info.IP[0] + return info.GetInternalIP()[0] } // Reload the icecast configuration when needed. @@ -301,7 +306,7 @@ func (rc *RadioNode) updater(stop chan bool) { rc.Log.Printf("updating configuration") - masterAddr := rc.getMasterAddr() + masterAddr := rc.getMasterInternalAddr() // Reload the Icecast daemon. icecastReloads.Incr() diff --git a/node/node_test.go b/node/node_test.go index 1d91318b771d5b52ddb13eea9f5243637ace1ced..2c39e0c1f7c1a309b9ba13bc6c0e875cfc456a54 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -65,6 +65,7 @@ func startTestNodes(n int, etcd autoradio.EtcdClient) []*RadioNode { node := NewRadioNode( fmt.Sprintf("node%d", i+1), []net.IP{net.ParseIP(fmt.Sprintf("127.0.0.%d", i+1))}, + nil, "eth0", 1000, 1000,