Skip to content
Snippets Groups Projects
Commit ed350cdb authored by ale's avatar ale
Browse files

allow split networking

Split networking is where nodes communicate among themselves on a
private network (for example when behind a NAT infrastructure). This
change adds the --internal-ip option to radiod, that advertises a
separate IP for internal communication.
parent fa1a504e
No related branches found
No related tags found
No related merge requests found
......@@ -226,6 +226,11 @@ type NodeStatus struct {
// Public IP addresses of this server.
IP []net.IP
// Internal IP addresses of this server (i.e. reachable from
// the other peers of the cluster), if different from IP
// above. Can be nil, in which case the value of IP is used.
InternalIP []net.IP
// Is the Icecast server up?
IcecastUp bool
......@@ -239,6 +244,13 @@ type NodeStatus struct {
MaxListeners int
}
func (ns *NodeStatus) GetInternalIP() []net.IP {
if len(ns.InternalIP) != 0 {
return ns.InternalIP
}
return ns.IP
}
// NumListeners returns the total number of listeners across all
// mounts on this node.
func (ns *NodeStatus) NumListeners() int {
......@@ -352,8 +364,21 @@ func (r *Client) ListMounts() ([]*Mount, error) {
// IP address here saves another round-trip to etcd to retrieve the
// node info in the most common case.
type MasterNodeInfo struct {
// Name of the node.
Name string
IP []net.IP
// Public IPs of the node.
IP []net.IP
// Internal IPs of the node (possibly the same as IP).
InternalIP []net.IP
}
func (m MasterNodeInfo) GetInternalIP() []net.IP {
if len(m.InternalIP) != 0 {
return m.InternalIP
}
return m.IP
}
// GetMasterInfo returns the address of the current master server.
......
......@@ -15,11 +15,12 @@ import (
)
var (
name = flag.String("name", shortHostname(), "Name for this node")
publicIps = util.IPList("ip", "Public IP for this machine (may be specified more than once)")
netDev = flag.String("interface", "eth0", "Network interface to monitor for utilization")
bwLimit = flag.Int("bwlimit", 100, "Bandwidth usage limit (Mbps)")
maxClients = flag.Int("max-clients", 1000, "Maximum number of connected clients")
name = flag.String("name", shortHostname(), "Name for this node")
publicIPs = util.IPList("ip", "Public IP for this machine (may be specified more than once)")
internalIPs = util.IPList("internal-ip", "Internal IP for this machine (within the cluster), if different than --ip")
netDev = flag.String("interface", "eth0", "Network interface to monitor for utilization")
bwLimit = flag.Int("bwlimit", 100, "Bandwidth usage limit (Mbps)")
maxClients = flag.Int("max-clients", 1000, "Maximum number of connected clients")
)
func shortHostname() string {
......@@ -38,7 +39,7 @@ func main() {
client := autoradio.NewEtcdClient(true)
bwLimitBytes := float64(*bwLimit * 1000000 / 8)
n := node.NewRadioNode(*name, util.IPListWithDefault(*publicIps, "127.0.0.1"), *netDev, bwLimitBytes, *maxClients, client)
n := node.NewRadioNode(*name, util.IPListWithDefault(*publicIPs, "127.0.0.1"), *internalIPs, *netDev, bwLimitBytes, *maxClients, client)
// Set up a clean shutdown function on SIGTERM.
stopch := make(chan os.Signal)
......
......@@ -252,8 +252,8 @@ func (h *HTTPRedirector) serveSource(mount *autoradio.Mount, w http.ResponseWrit
// Create a ReverseProxy on the fly with the right backend
// address. Blindly assume that the master has at least one
// public IP, and that we can reach it.
target := icecastAddr(masterInfo.IP[0])
// internal IP, and that we can reach it.
target := icecastAddr(masterInfo.GetInternalIP()[0])
tracker.setConnState(r, StateSource, target)
proxy := &ReverseProxy{
Director: func(req *http.Request) {
......
......@@ -57,6 +57,11 @@ type liquidsoapParams struct {
// Create new parameters for liquidsoap for a transcoding mount. If
// mount.Transcoding is nil, this function will panic, so the caller
// should check mount.HasTranscoder().
//
// The source URL points at localhost, relying on the front-end
// proxying to actually send the traffic to the master. This has the
// advantage that we don't have to reconfigure liquidsoap on
// mastership changes.
func newLiquidsoapParams(mount *autoradio.Mount) *liquidsoapParams {
return &liquidsoapParams{
SourceURL: fmt.Sprintf("http://localhost%s", mount.Transcoding.SourceName),
......
......@@ -151,9 +151,10 @@ type RadioNode struct {
wg sync.WaitGroup
client autoradio.EtcdClient
name string
ips []net.IP
config *clusterConfig
name string
ips []net.IP
internalIPs []net.IP
config *clusterConfig
me *masterelection.MasterElection
syncer *watcher.Syncer
......@@ -184,7 +185,7 @@ type RadioNode struct {
}
// NewRadioNode creates and initializes a new autoradio node.
func NewRadioNode(name string, ips []net.IP, netDev string, bwLimit float64, maxListeners int, client autoradio.EtcdClient) *RadioNode {
func NewRadioNode(name string, ips, internalIPs []net.IP, netDev string, bwLimit float64, maxListeners int, client autoradio.EtcdClient) *RadioNode {
// Global 'stop' channel.
stopch := make(chan bool)
......@@ -202,8 +203,9 @@ func NewRadioNode(name string, ips []net.IP, netDev string, bwLimit float64, max
// Location information advertised when this node is master.
minfo := &autoradio.MasterNodeInfo{
Name: name,
IP: ips,
Name: name,
IP: ips,
InternalIP: internalIPs,
}
minfodata, err := json.Marshal(minfo)
if err != nil {
......@@ -218,10 +220,11 @@ func NewRadioNode(name string, ips []net.IP, netDev string, bwLimit float64, max
// never be rejected by Icecast).
config := newClusterConfig()
rc := &RadioNode{
config: config,
name: name,
ips: ips,
client: client,
config: config,
name: name,
ips: ips,
internalIPs: internalIPs,
client: client,
me: masterelection.New(
client,
autoradio.MasterElectionPath,
......@@ -250,6 +253,7 @@ func (rc *RadioNode) getNodeStatus() string {
nodeStatus := autoradio.NodeStatus{
Name: rc.name,
IP: rc.ips,
InternalIP: rc.internalIPs,
IcecastUp: icecastStatus.Up,
Mounts: icecastStatus.Mounts,
BandwidthUsage: rc.bw.GetUsage(),
......@@ -262,15 +266,16 @@ func (rc *RadioNode) getNodeStatus() string {
return buf.String()
}
// Get a valid IP address for the current master node (to be passed to
// Icecast). Since we don't really know much about network topology,
// just pick the first IP address associated with the master node.
func (rc *RadioNode) getMasterAddr() net.IP {
// Get a valid internal IP address for the current master node (to be
// passed to Icecast). Since we don't really know much about network
// topology, just pick the first IP address associated with the master
// node.
func (rc *RadioNode) getMasterInternalAddr() net.IP {
var info autoradio.MasterNodeInfo
if err := json.NewDecoder(strings.NewReader(rc.me.GetMasterData())).Decode(&info); err != nil || len(info.IP) == 0 {
if err := json.NewDecoder(strings.NewReader(rc.me.GetMasterData())).Decode(&info); err != nil || len(info.GetInternalIP()) == 0 {
return nil
}
return info.IP[0]
return info.GetInternalIP()[0]
}
// Reload the icecast configuration when needed.
......@@ -301,7 +306,7 @@ func (rc *RadioNode) updater(stop chan bool) {
rc.Log.Printf("updating configuration")
masterAddr := rc.getMasterAddr()
masterAddr := rc.getMasterInternalAddr()
// Reload the Icecast daemon.
icecastReloads.Incr()
......
......@@ -65,6 +65,7 @@ func startTestNodes(n int, etcd autoradio.EtcdClient) []*RadioNode {
node := NewRadioNode(
fmt.Sprintf("node%d", i+1),
[]net.IP{net.ParseIP(fmt.Sprintf("127.0.0.%d", i+1))},
nil,
"eth0",
1000,
1000,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment