From 4d07f9a79a1dc9b34679a6da6e61f827f6c4685b Mon Sep 17 00:00:00 2001
From: ale <ale@incal.net>
Date: Sat, 13 Apr 2019 12:05:56 +0100
Subject: [PATCH] A few small changes to load balancing

* We can't use a RWMutex due to the predictor write at the end
* Return and log errors when lbv2 returns no results
* Generally increase http debug level
---
 node/http.go                 |  5 +++
 node/lbv2/debug.go           |  4 +-
 node/lbv2/instrumentation.go |  4 +-
 node/lbv2/lbv2.go            | 79 +++++++++++++++++++++++++-----------
 node/loadbalancing.go        | 28 ++++++++-----
 node/node.go                 | 18 ++++----
 6 files changed, 89 insertions(+), 49 deletions(-)

diff --git a/node/http.go b/node/http.go
index e61ade96..f4dc3866 100644
--- a/node/http.go
+++ b/node/http.go
@@ -45,6 +45,7 @@ func newHTTPHandler(n *Node, icecastPort int, domain string) http.Handler {
 	// there). Using command-line flags it is possible to disable
 	// the default debug pages, or to restrict access to localhost.
 	if !*disableDebugHandlers {
+		http.Handle("/debug/lbv2", n.lb.lb)
 		var h http.Handler = http.DefaultServeMux
 		if *restrictDebugHandlers {
 			h = withLocalhost(h)
@@ -103,6 +104,7 @@ func withMount(n *Node, f func(*pb.Mount, http.ResponseWriter, *http.Request)) h
 		mountPath := strings.TrimSuffix(r.URL.Path, ".m3u")
 		mount, ok := n.mounts.GetMount(mountPath)
 		if !ok {
+			log.Printf("http: %s: not found", mountPath)
 			http.NotFound(w, r)
 			return
 		}
@@ -124,6 +126,7 @@ func serveSource(n *Node, mount *pb.Mount, w http.ResponseWriter, r *http.Reques
 		return
 	}
 
+	log.Printf("connecting source for %s to %s", mount.Path, targetAddr)
 	doIcecastProxy(w, r, &url.URL{
 		Scheme: "http",
 		Host:   targetAddr,
@@ -137,6 +140,7 @@ func serveRedirect(lb *loadBalancer, mount *pb.Mount, w http.ResponseWriter, r *
 	// 1:1 mapping between Icecasts and frontends.
 	targetNode := lb.chooseNode(&httpRequestContext{r})
 	if targetNode == nil {
+		log.Printf("http: %s: no nodes available", mount.Path)
 		http.Error(w, "No nodes available", http.StatusServiceUnavailable)
 		return
 	}
@@ -145,6 +149,7 @@ func serveRedirect(lb *loadBalancer, mount *pb.Mount, w http.ResponseWriter, r *
 	if targetAddr == "" {
 		// This should not happen if the protocol filter in
 		// the load balancer evaluation did its job properly.
+		log.Printf("http: %s: protocol unavailable", mount.Path)
 		http.Error(w, "No nodes available with the right IP protocol", http.StatusServiceUnavailable)
 		return
 	}
diff --git a/node/lbv2/debug.go b/node/lbv2/debug.go
index df7cc820..9c693b50 100644
--- a/node/lbv2/debug.go
+++ b/node/lbv2/debug.go
@@ -47,7 +47,7 @@ type costAndUtil struct {
 
 func (l *LoadBalancer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 	// Build a view of the cost/utilization data.
-	l.lock.RLock()
+	l.lock.Lock()
 	var nodes []nodeDebugData
 	for i := 0; i < l.nodes.Len(); i++ {
 		n := l.nodes.Get(i)
@@ -67,7 +67,7 @@ func (l *LoadBalancer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 	for dim := range l.predictors {
 		dimensions = append(dimensions, dim)
 	}
-	l.lock.RUnlock()
+	l.lock.Unlock()
 
 	ctx := struct {
 		Nodes         []nodeDebugData
diff --git a/node/lbv2/instrumentation.go b/node/lbv2/instrumentation.go
index 614b6769..662f80bd 100644
--- a/node/lbv2/instrumentation.go
+++ b/node/lbv2/instrumentation.go
@@ -53,7 +53,7 @@ type nodeMetrics struct {
 }
 
 func (lc loadBalancerCollector) Collect(ch chan<- prometheus.Metric) {
-	lc.LoadBalancer.lock.RLock()
+	lc.LoadBalancer.lock.Lock()
 	data := make([]nodeMetrics, 0, lc.LoadBalancer.nodes.Len())
 	for i := 0; i < lc.LoadBalancer.nodes.Len(); i++ {
 		n := lc.LoadBalancer.nodes.Get(i)
@@ -69,7 +69,7 @@ func (lc loadBalancerCollector) Collect(ch chan<- prometheus.Metric) {
 			})
 		}
 	}
-	lc.LoadBalancer.lock.RUnlock()
+	lc.LoadBalancer.lock.Unlock()
 
 	for _, d := range data {
 		dimLabel := strconv.Itoa(d.dimension)
diff --git a/node/lbv2/lbv2.go b/node/lbv2/lbv2.go
index 873316ce..e5a2b11a 100644
--- a/node/lbv2/lbv2.go
+++ b/node/lbv2/lbv2.go
@@ -3,6 +3,8 @@
 package lbv2
 
 import (
+	"errors"
+	"log"
 	"math/rand"
 	"net"
 	"sync"
@@ -157,7 +159,7 @@ type NodeList interface {
 // receives an equal share of incoming traffic.
 //
 type LoadBalancer struct {
-	lock       sync.RWMutex
+	lock       sync.Mutex
 	nodes      NodeList
 	predictors map[int]*costPredictor
 	filters    []NodeFilter
@@ -227,12 +229,19 @@ func (l *LoadBalancer) Update(nodes NodeList) {
 	}
 }
 
+// Possible error values returned by Choose.
+var (
+	ErrNoNodes          = errors.New("no nodes are known")
+	ErrAllNodesFiltered = errors.New("all nodes were filtered")
+	ErrPolicy           = errors.New("nodes rejected by policy")
+)
+
 // Choose a node according to the specified policies.
-func (l *LoadBalancer) Choose(ctx RequestContext) Node {
-	l.lock.RLock()
-	defer l.lock.RUnlock()
+func (l *LoadBalancer) Choose(ctx RequestContext) (Node, error) {
+	l.lock.Lock()
+	defer l.lock.Unlock()
 	if l.nodes == nil || l.nodes.Len() == 0 {
-		return nil
+		return nil, ErrNoNodes
 	}
 
 	// Create the candidate list.
@@ -248,11 +257,14 @@ func (l *LoadBalancer) Choose(ctx RequestContext) Node {
 	for _, f := range l.filters {
 		wnodes = f.Filter(ctx, wnodes)
 	}
+	if len(wnodes) == 0 {
+		return nil, ErrAllNodesFiltered
+	}
 
 	// Select a node among the available candidates.
 	n := l.policy.GetNode(wnodes)
 	if n == nil {
-		return nil
+		return nil, ErrPolicy
 	}
 
 	// Feed back the choice into the utilization predictors.
@@ -261,7 +273,8 @@ func (l *LoadBalancer) Choose(ctx RequestContext) Node {
 			p.Incr(n)
 		}
 	}
-	return n
+
+	return n, nil
 }
 
 type Policy interface {
@@ -311,19 +324,14 @@ func NodeScorerFilter(s NodeScorer) NodeFilter {
 	return &nodeScorerFilterWrapper{s}
 }
 
-// Give each node a score proportional to the inverse of utilization.
+// Give each node a score of 1.0 - utilization.
 type capacityAvailableScorer struct {
 	pred Predictor
 }
 
 func (s *capacityAvailableScorer) Score(ctx RequestContext, n Node) float64 {
-	// Cap utilization low limit at 5%. Anything below that is
-	// considered equally available.
 	u := s.pred.Utilization(n)
-	if u < 0.05 {
-		u = 0.05
-	}
-	return 1.0 / u
+	return 1.0 - u
 }
 
 func NewCapacityAvailableScorer(pred Predictor) NodeFilter {
@@ -374,16 +382,31 @@ const baseWeight = 1000000
 
 // Return a random item based on a weighted distribution.
 func weightedPolicyFunc(wnodes []NodeScore) Node {
-	// Need to convert this anyway.
+	// Need to convert this anyway. We keep track of the maximum
+	// weight becaue if they are all zeros randutil.WeightedChoice
+	// will return an error. We fall back to random choice in that
+	// case.
 	choices := make([]randutil.Choice, len(wnodes))
+	var maxWeight int
 	for i, wn := range wnodes {
+		w := int(float64(baseWeight) * wn.Score)
+		if w > maxWeight {
+			maxWeight = w
+		}
 		choices[i] = randutil.Choice{
-			Weight: int(float64(baseWeight) * wn.Score),
+			Weight: w,
 			Item:   wn.Node,
 		}
 	}
+
+	if maxWeight == 0 {
+		return randomPolicyFunc(wnodes)
+	}
+
 	result, err := randutil.WeightedChoice(choices)
 	if err != nil {
+		// We log these in order to detect randutil edge cases.
+		log.Printf("lbv2: randutil error: %v", err)
 		return nil
 	}
 	return result.Item.(Node)
@@ -391,8 +414,7 @@ func weightedPolicyFunc(wnodes []NodeScore) Node {
 
 var WeightedPolicy = PolicyFunc(weightedPolicyFunc)
 
-// Return a random item regardless of their weight (but it still
-// ignores disabled nodes, for which weight=0).
+// Return a random item regardless of their score.
 func randomPolicyFunc(wnodes []NodeScore) Node {
 	if len(wnodes) == 0 {
 		return nil
@@ -403,20 +425,29 @@ func randomPolicyFunc(wnodes []NodeScore) Node {
 var RandomPolicy = PolicyFunc(randomPolicyFunc)
 
 // Always return the item with the highest score (only good for
-// testing purposes, really).
+// testing purposes, really). Falls back to random selection in case
+// of multiple nodes with the same score.
 func highestScorePolicyFunc(wnodes []NodeScore) Node {
-	maxIdx := -1
 	var maxScore float64
+	best := make([]int, 0, len(wnodes))
 	for i, n := range wnodes {
-		if n.Score > maxScore {
+		switch {
+		case n.Score > maxScore:
 			maxScore = n.Score
-			maxIdx = i
+			best = best[:0]
+			best = append(best, i)
+		case n.Score == maxScore:
+			best = append(best, i)
 		}
 	}
-	if maxIdx < 0 || maxScore == 0 {
+	switch len(best) {
+	case 0:
 		return nil
+	case 1:
+		return wnodes[best[0]].Node
+	default:
+		return wnodes[best[rand.Intn(len(best))]].Node
 	}
-	return wnodes[maxIdx].Node
 }
 
 var HighestScorePolicy = PolicyFunc(highestScorePolicyFunc)
diff --git a/node/loadbalancing.go b/node/loadbalancing.go
index 8ce5c35e..0c4363fe 100644
--- a/node/loadbalancing.go
+++ b/node/loadbalancing.go
@@ -4,6 +4,7 @@ import (
 	"context"
 	"errors"
 	"fmt"
+	"log"
 	"math/rand"
 	"net"
 	"net/http"
@@ -12,30 +13,34 @@ import (
 	"sync"
 	"time"
 
+	"git.autistici.org/ale/autoradio"
 	"git.autistici.org/ale/autoradio/coordination/presence"
 	"git.autistici.org/ale/autoradio/node/lbv2"
 	pb "git.autistici.org/ale/autoradio/proto"
 	"git.autistici.org/ale/autoradio/util"
+	"go.etcd.io/etcd/clientv3"
 )
 
 type loadBalancer struct {
 	lb *lbv2.LoadBalancer
 
-	publicPeers *presence.EndpointSet
-
 	nodeMx sync.RWMutex
 	nodes  []*nodeInfo
 }
 
-func newLoadBalancer(ctx context.Context, nodeID string, publicPeers *presence.EndpointSet, statusMgr *statusManager, lbSpec string) (*loadBalancer, error) {
+func newLoadBalancer(ctx context.Context, cli *clientv3.Client, nodeID string, statusMgr *statusManager, lbSpec string) (*loadBalancer, error) {
 	lb, err := parseLoadBalancerSpec(nodeID, lbSpec)
 	if err != nil {
 		return nil, err
 	}
 
+	// Watch the authoritative list of peer nodes, do not return
+	// until it has been bootstrapped.
+	publicPeers, peersReady := presence.WatchEndpointsReady(ctx, cli, autoradio.PublicEndpointPrefix)
+	<-peersReady
+
 	l := &loadBalancer{
-		lb:          lb,
-		publicPeers: publicPeers,
+		lb: lb,
 	}
 
 	go util.RunCron(ctx, 500*time.Millisecond, func(_ context.Context) {
@@ -173,8 +178,9 @@ func (l *loadBalancer) getNodes() []*nodeInfo {
 }
 
 func (l *loadBalancer) chooseNode(ctx lbv2.RequestContext) *nodeInfo {
-	result := l.lb.Choose(ctx)
-	if result == nil {
+	result, err := l.lb.Choose(ctx)
+	if err != nil {
+		log.Printf("lbv2 failure: %v", err)
 		return nil
 	}
 	return result.(*nodeInfo)
@@ -200,6 +206,10 @@ func icecastActiveFilter(ctx lbv2.RequestContext, n *nodeInfo) bool {
 }
 
 // NodeFilter that disables backends where Icecast is not running.
+// Note that "disabled" means that the associated score is set to
+// zero: if all nodes are disabled, the load balancing algorithm will
+// still pick one (in fact we explicitly do not use any
+// lbv2.ActiveNodesFilter).
 func newIcecastActiveFilter() lbv2.NodeFilter {
 	return lbv2.NodeScorerFilter(autoradioNodeFilterFunc(icecastActiveFilter))
 }
@@ -242,7 +252,7 @@ func newIPProtocolFilter() lbv2.NodeFilter {
 //
 // Some filters will always be included in the resulting LoadBalancer
 // and do not need to be specified explicitly (icecastActiveFilter and
-// ipProtocolFilter, plus an activeNodesFilter at the end).
+// ipProtocolFilter).
 func parseLoadBalancerSpec(nodeID, specstr string) (*lbv2.LoadBalancer, error) {
 	lb := lbv2.New(nodeID)
 	lb.AddFilter(newIcecastActiveFilter())
@@ -275,8 +285,6 @@ func parseLoadBalancerSpec(nodeID, specstr string) (*lbv2.LoadBalancer, error) {
 	}
 	lb.SetPolicy(policy)
 
-	lb.AddFilter(lbv2.NewActiveNodesFilter())
-
 	return lb, nil
 }
 
diff --git a/node/node.go b/node/node.go
index 44f48d71..b42d9111 100644
--- a/node/node.go
+++ b/node/node.go
@@ -34,13 +34,12 @@ type Node struct {
 	maxBandwidth int
 	maxListeners int
 
-	mounts      client.MountConfig
-	watcher     *election.ElectionWatcher
-	statusMgr   *statusManager
-	updateCh    chan struct{}
-	ice         Icecast
-	publicPeers *presence.EndpointSet
-	lb          *loadBalancer
+	mounts    client.MountConfig
+	watcher   *election.ElectionWatcher
+	statusMgr *statusManager
+	updateCh  chan struct{}
+	ice       Icecast
+	lb        *loadBalancer
 
 	ctx context.Context
 }
@@ -114,9 +113,6 @@ func New(parentCtx context.Context, session *concurrency.Session, ice Icecast, n
 		return nil, err
 	}
 
-	// Watch the authoritative list of peer nodes.
-	n.publicPeers = presence.WatchEndpoints(ctx, session.Client(), autoradio.PublicEndpointPrefix)
-
 	// Run a leader election protocol advertising the peer address of our
 	// Icecast daemon. We participate in the leader election, but then do
 	// nothing special once we're the leader, just wait for
@@ -158,7 +154,7 @@ func New(parentCtx context.Context, session *concurrency.Session, ice Icecast, n
 	})
 
 	// Create the loadBalancer that runs within the node.
-	n.lb, err = newLoadBalancer(ctx, nodeID, n.publicPeers, n.statusMgr, lbSpec)
+	n.lb, err = newLoadBalancer(ctx, session.Client(), nodeID, n.statusMgr, lbSpec)
 	if err != nil {
 		cancel()
 		return nil, err
-- 
GitLab