From a2dbebc85bbc416a767b8912af70ba7b8d7b4866 Mon Sep 17 00:00:00 2001
From: ale <ale@incal.net>
Date: Fri, 2 Jan 2015 11:38:44 +0000
Subject: [PATCH] make max listener count configurable per-node

Adds a new metric to NodeStatus (MaxListeners) so that it is available
to the traffic control logic.
---
 api.go                      |  9 ++++--
 node/icecast.go             |  4 +--
 node/icecast_config.go      |  9 +-----
 node/icecast_config_test.go |  4 +--
 node/node.go                | 55 +++++++++++++++++++++----------------
 node/node_test.go           |  1 +
 6 files changed, 45 insertions(+), 37 deletions(-)

diff --git a/api.go b/api.go
index 0d63f4c6..7f028732 100644
--- a/api.go
+++ b/api.go
@@ -107,8 +107,13 @@ type NodeStatus struct {
 
 	// Bandwidth utilization.
 	BandwidthUsage float64
+
+	// Maximum number of allowed listeners.
+	MaxListeners int
 }
 
+// NumListeners returns the total number of listeners across all
+// mounts on this node.
 func (ns *NodeStatus) NumListeners() int {
 	listeners := 0
 	for _, m := range ns.Mounts {
@@ -117,8 +122,8 @@ func (ns *NodeStatus) NumListeners() int {
 	return listeners
 }
 
-// Cache the list of active nodes (the front-ends that need to
-// retrieve this information continuously, so we limit them to 2qps).
+// Cache the list of active nodes (the front-ends need to retrieve
+// this information continuously, so we limit them to 2qps).
 type nodesCache struct {
 	ttl      time.Duration
 	nodes    []*NodeStatus
diff --git a/node/icecast.go b/node/icecast.go
index f123d3bc..0ebc5d43 100644
--- a/node/icecast.go
+++ b/node/icecast.go
@@ -63,9 +63,9 @@ type icecastController struct {
 	stop   chan bool
 }
 
-func NewIcecastController(publicIp string) *icecastController {
+func NewIcecastController(publicIp string, maxClients int) *icecastController {
 	return &icecastController{
-		config: newIcecastConfig(publicIp),
+		config: newIcecastConfig(publicIp, maxClients),
 		status: &IcecastStatus{},
 	}
 }
diff --git a/node/icecast_config.go b/node/icecast_config.go
index a63e748f..608fcf02 100644
--- a/node/icecast_config.go
+++ b/node/icecast_config.go
@@ -19,13 +19,6 @@ var (
 	icecastAdminPwFile = "/etc/icecast/.admin_pw"
 	icecastAdminPw     string
 
-	// The per-node icecast client limit is set to a very high
-	// value in order to disable the enforcement at the icecast
-	// level (the front-end traffic management code should know
-	// better).
-	//icecastMaxClients    = flag.Int("icecast-max-clients", 10000, "maximum number of Icecast clients")
-	maxClients = 10000
-
 	// Icecast tunables.
 	icecastQueueSize     = flag.Int("icecast-queue-size", 1<<20, "Icecast queue size (bytes)")
 	icecastClientTimeout = flag.Int("icecast-client-timeout", 30, "Icecast client timeout (s)")
@@ -143,7 +136,7 @@ type icecastConfig struct {
 // to a file for persistence. It is not really meant to be used by the
 // operator.
 //
-func newIcecastConfig(publicIp string) *icecastConfig {
+func newIcecastConfig(publicIp string, maxClients int) *icecastConfig {
 	// We don't use the global source password, but icecast is
 	// happier if it's set, so we just use a random password every
 	// time.
diff --git a/node/icecast_config_test.go b/node/icecast_config_test.go
index ede5529c..b3e1f2b2 100644
--- a/node/icecast_config_test.go
+++ b/node/icecast_config_test.go
@@ -17,7 +17,7 @@ func TestIcecastConfig(t *testing.T) {
 	c.setMountIfChanged(mount)
 
 	// Test a relay config.
-	ice := newIcecastConfig("1.2.3.4")
+	ice := newIcecastConfig("1.2.3.4", 1000)
 	ice.Update(c, false, "2.3.4.5")
 	output, err := ice.Encode()
 	if err != nil {
@@ -32,7 +32,7 @@ func TestIcecastConfig(t *testing.T) {
 	}
 
 	// Test a master config.
-	ice = newIcecastConfig("1.2.3.4")
+	ice = newIcecastConfig("1.2.3.4", 1000)
 	ice.Update(c, true, "2.3.4.5")
 	output, err = ice.Encode()
 	if err != nil {
diff --git a/node/node.go b/node/node.go
index f44d1a6b..938b1be0 100644
--- a/node/node.go
+++ b/node/node.go
@@ -231,23 +231,24 @@ func (w *configWatcher) Start(stop chan bool) func(chan bool) {
 
 // An active streaming node, managing the local icecast server.
 type RadioNode struct {
-	client      autoradio.EtcdClient
-	config      *clusterConfig
-	name        string
-	ips         []net.IP
-	me          *masterelection.MasterElection
-	watcher     *configWatcher
-	icecast     Controller
-	bw          *bwmonitor.BandwidthUsageMonitor
-	heartbeat   uint64
-	reloadDelay time.Duration
-	upch        chan bool
-	stop        chan bool
-	wg          sync.WaitGroup
-	Log         *log.Logger
+	client       autoradio.EtcdClient
+	config       *clusterConfig
+	name         string
+	ips          []net.IP
+	me           *masterelection.MasterElection
+	watcher      *configWatcher
+	icecast      Controller
+	bw           *bwmonitor.BandwidthUsageMonitor
+	maxListeners int
+	heartbeat    uint64
+	reloadDelay  time.Duration
+	upch         chan bool
+	stop         chan bool
+	wg           sync.WaitGroup
+	Log          *log.Logger
 }
 
-func NewRadioNode(name string, ips []net.IP, netDev string, bwLimit float64, client autoradio.EtcdClient) *RadioNode {
+func NewRadioNode(name string, ips []net.IP, netDev string, bwLimit float64, maxListeners int, client autoradio.EtcdClient) *RadioNode {
 	config := newClusterConfig()
 
 	// Global 'stop' channel.
@@ -275,6 +276,12 @@ func NewRadioNode(name string, ips []net.IP, netDev string, bwLimit float64, cli
 		log.Fatal(err)
 	}
 
+	// Create the RadioNode and all the auxiliary objects it
+	// contains. Note that the per-node icecast client limit is
+	// actually set to a value greater than maxListeners, to allow
+	// for some headroom in the front-end traffic control
+	// computations (if everything goes well, connections should
+	// never be rejected by Icecast).
 	return &RadioNode{
 		config: config,
 		name:   name,
@@ -286,14 +293,15 @@ func NewRadioNode(name string, ips []net.IP, netDev string, bwLimit float64, cli
 			string(minfodata),
 			uint64(*masterElectionTtl),
 			mech),
-		watcher:     newConfigWatcher(client, config, upch),
-		icecast:     NewIcecastController(name),
-		reloadDelay: 1000 * time.Millisecond,
-		heartbeat:   uint64(*nodeHeartbeat),
-		bw:          bwmonitor.NewBandwidthUsageMonitor(netDev, bwLimit),
-		upch:        upch,
-		stop:        stopch,
-		Log:         log.New(os.Stderr, "node: ", 0),
+		watcher:      newConfigWatcher(client, config, upch),
+		icecast:      NewIcecastController(name, maxListeners*2),
+		reloadDelay:  1000 * time.Millisecond,
+		heartbeat:    uint64(*nodeHeartbeat),
+		bw:           bwmonitor.NewBandwidthUsageMonitor(netDev, bwLimit),
+		maxListeners: maxListeners,
+		upch:         upch,
+		stop:         stopch,
+		Log:          log.New(os.Stderr, "node: ", 0),
 	}
 }
 
@@ -316,6 +324,7 @@ func (rc *RadioNode) presence(stop chan bool) {
 				IcecastUp:      icecastStatus.Up,
 				Mounts:         icecastStatus.Mounts,
 				BandwidthUsage: rc.bw.GetUsage(),
+				MaxListeners:   rc.maxListeners,
 			}
 
 			// Update our node entry in the database.
diff --git a/node/node_test.go b/node/node_test.go
index ed01d1b4..143981fc 100644
--- a/node/node_test.go
+++ b/node/node_test.go
@@ -43,6 +43,7 @@ func startTestNodes(n int, etcd autoradio.EtcdClient) []*RadioNode {
 			[]net.IP{net.ParseIP(fmt.Sprintf("127.0.0.%d", i+1))},
 			"eth0",
 			1000,
+			1000,
 			etcd)
 		node.icecast = &mockController{}
 		node.reloadDelay = time.Duration(0)
-- 
GitLab