Commit 4d07f9a7 authored by ale's avatar ale

A few small changes to load balancing

* We can't use a RWMutex due to the predictor write at the end
* Return and log errors when lbv2 returns no results
* Generally increase http debug level
parent 0f759401
Pipeline #2712 passed with stages
in 4 minutes and 59 seconds
......@@ -45,6 +45,7 @@ func newHTTPHandler(n *Node, icecastPort int, domain string) http.Handler {
// there). Using command-line flags it is possible to disable
// the default debug pages, or to restrict access to localhost.
if !*disableDebugHandlers {
http.Handle("/debug/lbv2", n.lb.lb)
var h http.Handler = http.DefaultServeMux
if *restrictDebugHandlers {
h = withLocalhost(h)
......@@ -103,6 +104,7 @@ func withMount(n *Node, f func(*pb.Mount, http.ResponseWriter, *http.Request)) h
mountPath := strings.TrimSuffix(r.URL.Path, ".m3u")
mount, ok := n.mounts.GetMount(mountPath)
if !ok {
log.Printf("http: %s: not found", mountPath)
http.NotFound(w, r)
return
}
......@@ -124,6 +126,7 @@ func serveSource(n *Node, mount *pb.Mount, w http.ResponseWriter, r *http.Reques
return
}
log.Printf("connecting source for %s to %s", mount.Path, targetAddr)
doIcecastProxy(w, r, &url.URL{
Scheme: "http",
Host: targetAddr,
......@@ -137,6 +140,7 @@ func serveRedirect(lb *loadBalancer, mount *pb.Mount, w http.ResponseWriter, r *
// 1:1 mapping between Icecasts and frontends.
targetNode := lb.chooseNode(&httpRequestContext{r})
if targetNode == nil {
log.Printf("http: %s: no nodes available", mount.Path)
http.Error(w, "No nodes available", http.StatusServiceUnavailable)
return
}
......@@ -145,6 +149,7 @@ func serveRedirect(lb *loadBalancer, mount *pb.Mount, w http.ResponseWriter, r *
if targetAddr == "" {
// This should not happen if the protocol filter in
// the load balancer evaluation did its job properly.
log.Printf("http: %s: protocol unavailable", mount.Path)
http.Error(w, "No nodes available with the right IP protocol", http.StatusServiceUnavailable)
return
}
......
......@@ -47,7 +47,7 @@ type costAndUtil struct {
func (l *LoadBalancer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Build a view of the cost/utilization data.
l.lock.RLock()
l.lock.Lock()
var nodes []nodeDebugData
for i := 0; i < l.nodes.Len(); i++ {
n := l.nodes.Get(i)
......@@ -67,7 +67,7 @@ func (l *LoadBalancer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
for dim := range l.predictors {
dimensions = append(dimensions, dim)
}
l.lock.RUnlock()
l.lock.Unlock()
ctx := struct {
Nodes []nodeDebugData
......
......@@ -53,7 +53,7 @@ type nodeMetrics struct {
}
func (lc loadBalancerCollector) Collect(ch chan<- prometheus.Metric) {
lc.LoadBalancer.lock.RLock()
lc.LoadBalancer.lock.Lock()
data := make([]nodeMetrics, 0, lc.LoadBalancer.nodes.Len())
for i := 0; i < lc.LoadBalancer.nodes.Len(); i++ {
n := lc.LoadBalancer.nodes.Get(i)
......@@ -69,7 +69,7 @@ func (lc loadBalancerCollector) Collect(ch chan<- prometheus.Metric) {
})
}
}
lc.LoadBalancer.lock.RUnlock()
lc.LoadBalancer.lock.Unlock()
for _, d := range data {
dimLabel := strconv.Itoa(d.dimension)
......
......@@ -3,6 +3,8 @@
package lbv2
import (
"errors"
"log"
"math/rand"
"net"
"sync"
......@@ -157,7 +159,7 @@ type NodeList interface {
// receives an equal share of incoming traffic.
//
type LoadBalancer struct {
lock sync.RWMutex
lock sync.Mutex
nodes NodeList
predictors map[int]*costPredictor
filters []NodeFilter
......@@ -227,12 +229,19 @@ func (l *LoadBalancer) Update(nodes NodeList) {
}
}
// Possible error values returned by Choose.
var (
ErrNoNodes = errors.New("no nodes are known")
ErrAllNodesFiltered = errors.New("all nodes were filtered")
ErrPolicy = errors.New("nodes rejected by policy")
)
// Choose a node according to the specified policies.
func (l *LoadBalancer) Choose(ctx RequestContext) Node {
l.lock.RLock()
defer l.lock.RUnlock()
func (l *LoadBalancer) Choose(ctx RequestContext) (Node, error) {
l.lock.Lock()
defer l.lock.Unlock()
if l.nodes == nil || l.nodes.Len() == 0 {
return nil
return nil, ErrNoNodes
}
// Create the candidate list.
......@@ -248,11 +257,14 @@ func (l *LoadBalancer) Choose(ctx RequestContext) Node {
for _, f := range l.filters {
wnodes = f.Filter(ctx, wnodes)
}
if len(wnodes) == 0 {
return nil, ErrAllNodesFiltered
}
// Select a node among the available candidates.
n := l.policy.GetNode(wnodes)
if n == nil {
return nil
return nil, ErrPolicy
}
// Feed back the choice into the utilization predictors.
......@@ -261,7 +273,8 @@ func (l *LoadBalancer) Choose(ctx RequestContext) Node {
p.Incr(n)
}
}
return n
return n, nil
}
type Policy interface {
......@@ -311,19 +324,14 @@ func NodeScorerFilter(s NodeScorer) NodeFilter {
return &nodeScorerFilterWrapper{s}
}
// Give each node a score proportional to the inverse of utilization.
// Give each node a score of 1.0 - utilization.
type capacityAvailableScorer struct {
pred Predictor
}
func (s *capacityAvailableScorer) Score(ctx RequestContext, n Node) float64 {
// Cap utilization low limit at 5%. Anything below that is
// considered equally available.
u := s.pred.Utilization(n)
if u < 0.05 {
u = 0.05
}
return 1.0 / u
return 1.0 - u
}
func NewCapacityAvailableScorer(pred Predictor) NodeFilter {
......@@ -374,16 +382,31 @@ const baseWeight = 1000000
// Return a random item based on a weighted distribution.
func weightedPolicyFunc(wnodes []NodeScore) Node {
// Need to convert this anyway.
// Need to convert this anyway. We keep track of the maximum
// weight becaue if they are all zeros randutil.WeightedChoice
// will return an error. We fall back to random choice in that
// case.
choices := make([]randutil.Choice, len(wnodes))
var maxWeight int
for i, wn := range wnodes {
w := int(float64(baseWeight) * wn.Score)
if w > maxWeight {
maxWeight = w
}
choices[i] = randutil.Choice{
Weight: int(float64(baseWeight) * wn.Score),
Weight: w,
Item: wn.Node,
}
}
if maxWeight == 0 {
return randomPolicyFunc(wnodes)
}
result, err := randutil.WeightedChoice(choices)
if err != nil {
// We log these in order to detect randutil edge cases.
log.Printf("lbv2: randutil error: %v", err)
return nil
}
return result.Item.(Node)
......@@ -391,8 +414,7 @@ func weightedPolicyFunc(wnodes []NodeScore) Node {
var WeightedPolicy = PolicyFunc(weightedPolicyFunc)
// Return a random item regardless of their weight (but it still
// ignores disabled nodes, for which weight=0).
// Return a random item regardless of their score.
func randomPolicyFunc(wnodes []NodeScore) Node {
if len(wnodes) == 0 {
return nil
......@@ -403,20 +425,29 @@ func randomPolicyFunc(wnodes []NodeScore) Node {
var RandomPolicy = PolicyFunc(randomPolicyFunc)
// Always return the item with the highest score (only good for
// testing purposes, really).
// testing purposes, really). Falls back to random selection in case
// of multiple nodes with the same score.
func highestScorePolicyFunc(wnodes []NodeScore) Node {
maxIdx := -1
var maxScore float64
best := make([]int, 0, len(wnodes))
for i, n := range wnodes {
if n.Score > maxScore {
switch {
case n.Score > maxScore:
maxScore = n.Score
maxIdx = i
best = best[:0]
best = append(best, i)
case n.Score == maxScore:
best = append(best, i)
}
}
if maxIdx < 0 || maxScore == 0 {
switch len(best) {
case 0:
return nil
case 1:
return wnodes[best[0]].Node
default:
return wnodes[best[rand.Intn(len(best))]].Node
}
return wnodes[maxIdx].Node
}
var HighestScorePolicy = PolicyFunc(highestScorePolicyFunc)
......
......@@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"log"
"math/rand"
"net"
"net/http"
......@@ -12,30 +13,34 @@ import (
"sync"
"time"
"git.autistici.org/ale/autoradio"
"git.autistici.org/ale/autoradio/coordination/presence"
"git.autistici.org/ale/autoradio/node/lbv2"
pb "git.autistici.org/ale/autoradio/proto"
"git.autistici.org/ale/autoradio/util"
"go.etcd.io/etcd/clientv3"
)
type loadBalancer struct {
lb *lbv2.LoadBalancer
publicPeers *presence.EndpointSet
nodeMx sync.RWMutex
nodes []*nodeInfo
}
func newLoadBalancer(ctx context.Context, nodeID string, publicPeers *presence.EndpointSet, statusMgr *statusManager, lbSpec string) (*loadBalancer, error) {
func newLoadBalancer(ctx context.Context, cli *clientv3.Client, nodeID string, statusMgr *statusManager, lbSpec string) (*loadBalancer, error) {
lb, err := parseLoadBalancerSpec(nodeID, lbSpec)
if err != nil {
return nil, err
}
// Watch the authoritative list of peer nodes, do not return
// until it has been bootstrapped.
publicPeers, peersReady := presence.WatchEndpointsReady(ctx, cli, autoradio.PublicEndpointPrefix)
<-peersReady
l := &loadBalancer{
lb: lb,
publicPeers: publicPeers,
lb: lb,
}
go util.RunCron(ctx, 500*time.Millisecond, func(_ context.Context) {
......@@ -173,8 +178,9 @@ func (l *loadBalancer) getNodes() []*nodeInfo {
}
func (l *loadBalancer) chooseNode(ctx lbv2.RequestContext) *nodeInfo {
result := l.lb.Choose(ctx)
if result == nil {
result, err := l.lb.Choose(ctx)
if err != nil {
log.Printf("lbv2 failure: %v", err)
return nil
}
return result.(*nodeInfo)
......@@ -200,6 +206,10 @@ func icecastActiveFilter(ctx lbv2.RequestContext, n *nodeInfo) bool {
}
// NodeFilter that disables backends where Icecast is not running.
// Note that "disabled" means that the associated score is set to
// zero: if all nodes are disabled, the load balancing algorithm will
// still pick one (in fact we explicitly do not use any
// lbv2.ActiveNodesFilter).
func newIcecastActiveFilter() lbv2.NodeFilter {
return lbv2.NodeScorerFilter(autoradioNodeFilterFunc(icecastActiveFilter))
}
......@@ -242,7 +252,7 @@ func newIPProtocolFilter() lbv2.NodeFilter {
//
// Some filters will always be included in the resulting LoadBalancer
// and do not need to be specified explicitly (icecastActiveFilter and
// ipProtocolFilter, plus an activeNodesFilter at the end).
// ipProtocolFilter).
func parseLoadBalancerSpec(nodeID, specstr string) (*lbv2.LoadBalancer, error) {
lb := lbv2.New(nodeID)
lb.AddFilter(newIcecastActiveFilter())
......@@ -275,8 +285,6 @@ func parseLoadBalancerSpec(nodeID, specstr string) (*lbv2.LoadBalancer, error) {
}
lb.SetPolicy(policy)
lb.AddFilter(lbv2.NewActiveNodesFilter())
return lb, nil
}
......
......@@ -34,13 +34,12 @@ type Node struct {
maxBandwidth int
maxListeners int
mounts client.MountConfig
watcher *election.ElectionWatcher
statusMgr *statusManager
updateCh chan struct{}
ice Icecast
publicPeers *presence.EndpointSet
lb *loadBalancer
mounts client.MountConfig
watcher *election.ElectionWatcher
statusMgr *statusManager
updateCh chan struct{}
ice Icecast
lb *loadBalancer
ctx context.Context
}
......@@ -114,9 +113,6 @@ func New(parentCtx context.Context, session *concurrency.Session, ice Icecast, n
return nil, err
}
// Watch the authoritative list of peer nodes.
n.publicPeers = presence.WatchEndpoints(ctx, session.Client(), autoradio.PublicEndpointPrefix)
// Run a leader election protocol advertising the peer address of our
// Icecast daemon. We participate in the leader election, but then do
// nothing special once we're the leader, just wait for
......@@ -158,7 +154,7 @@ func New(parentCtx context.Context, session *concurrency.Session, ice Icecast, n
})
// Create the loadBalancer that runs within the node.
n.lb, err = newLoadBalancer(ctx, nodeID, n.publicPeers, n.statusMgr, lbSpec)
n.lb, err = newLoadBalancer(ctx, session.Client(), nodeID, n.statusMgr, lbSpec)
if err != nil {
cancel()
return nil, err
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment