Skip to content
Snippets Groups Projects
Select Git revision
  • renovate/github.com-miekg-dns-1.x
  • renovate/google.golang.org-protobuf-1.x
  • renovate/go.etcd.io-etcd-server-v3-3.x
  • renovate/go.etcd.io-etcd-client-v3-3.x
  • renovate/golang.org-x-crypto-0.x
  • renovate/github.com-prometheus-common-0.x
  • renovate/google.golang.org-grpc-1.x
  • renovate/github.com-prometheus-client_golang-1.x
  • renovate/golang.org-x-sync-0.x
  • renovate/github.com-lpar-gzipped-v2-2.x
  • master default
  • httplog
12 results

loadbalancing.go

Blame
  • loadbalancing.go 3.86 KiB
    package fe
    
    import (
    	"errors"
    	"fmt"
    	"net"
    	"strings"
    
    	"git.autistici.org/ale/autoradio"
    	"git.autistici.org/ale/autoradio/fe/lbv2"
    )
    
    const (
    	UTIL_BANDWIDTH = iota
    	UTIL_LISTENERS
    )
    
    // Wrapper type to make a NodeStatus match the lbv2.Node interface.
    type lbNode struct {
    	*autoradio.NodeStatus
    }
    
    func (n *lbNode) Utilization(dimension int) lbv2.NodeUtilization {
    	var u float64
    	switch dimension {
    	case UTIL_BANDWIDTH:
    		u = n.BandwidthUsage
    	case UTIL_LISTENERS:
    		u = float64(n.NumListeners()) / float64(n.MaxListeners)
    	}
    	return lbv2.NodeUtilization{
    		Utilization: u,
    		Requests:    n.NumListeners(),
    	}
    }
    
    func (n *lbNode) Name() string {
    	return n.NodeStatus.Name
    }
    
    type autoradioNodeFilterFunc func(lbv2.RequestContext, *lbNode) bool
    
    func (f autoradioNodeFilterFunc) Score(ctx lbv2.RequestContext, n lbv2.Node) float64 {
    	if f(ctx, n.(*lbNode)) {
    		return 1
    	}
    	return 0
    }
    
    func icecastActiveFilter(ctx lbv2.RequestContext, n *lbNode) bool {
    	return n.IcecastUp
    }
    
    // NodeFilter that disables backends where Icecast is not running.
    func newIcecastActiveFilter() lbv2.NodeFilter {
    	return lbv2.NodeScorerFilter(autoradioNodeFilterFunc(icecastActiveFilter))
    }
    
    func getIpProtos(ips []net.IP) (bool, bool) {
    	hasV4 := false
    	hasV6 := false
    	for _, ip := range ips {
    		if ip.To4() == nil {
    			hasV6 = true
    		} else {
    			hasV4 = true
    		}
    	}
    	return hasV4, hasV6
    }
    
    func ipProtocolFilter(ctx lbv2.RequestContext, n *lbNode) bool {
    	if ctx == nil {
    		return true
    	}
    	addr := ctx.RemoteAddr()
    	if addr == nil {
    		return true
    	}
    	remoteV6 := addr.To4() == nil
    	hasV4, hasV6 := getIpProtos(n.IP)
    	return (remoteV6 && hasV6) || (!remoteV6 && hasV4)
    }
    
    // NodeFilter that selects those backends having at least an IP
    // address matching the request protocol (IPv4/IPv6).
    func newIpProtocolFilter() lbv2.NodeFilter {
    	return lbv2.NodeScorerFilter(autoradioNodeFilterFunc(ipProtocolFilter))
    }
    
    // Wrapper class for lbv2.LoadBalancer that uses NodeStatus objects.
    type autoradioLoadBalancer struct {
    	*lbv2.LoadBalancer
    }
    
    func (l *autoradioLoadBalancer) Update(nodes []*autoradio.NodeStatus) {
    	wrapped := make([]lbv2.Node, 0, len(nodes))
    	for _, n := range nodes {
    		wrapped = append(wrapped, &lbNode{n})
    	}
    	l.LoadBalancer.Update(wrapped)
    }
    
    func (l *autoradioLoadBalancer) Choose(ctx lbv2.RequestContext) *autoradio.NodeStatus {
    	result := l.LoadBalancer.Choose(ctx)
    	if result == nil {
    		return nil
    	}
    	return result.(*lbNode).NodeStatus
    }
    
    // Parse a string that specifies how to build a LoadBalancer. The
    // string should consist of a list of comma-separated tokens, each
    // identifying a specific filter or policy.
    //
    // Some filters will always be included in the resulting LoadBalancer
    // and do not need to be specified explicitly (icecastActiveFilter and
    // ipProtocolFilter, plus an activeNodesFilter at the end).
    func parseLoadBalancerSpec(specstr string) (*autoradioLoadBalancer, error) {
    	lb := lbv2.New()
    	lb.AddFilter(newIcecastActiveFilter())
    	lb.AddFilter(newIpProtocolFilter())
    
    	var policy lbv2.Policy
    	for _, spec := range strings.Split(specstr, ",") {
    		switch spec {
    		case "bandwidth_available":
    			lb.AddFilter(lbv2.NewCapacityAvailableFilter(lb.GetPredictor(UTIL_BANDWIDTH)))
    		case "listeners_available":
    			lb.AddFilter(lbv2.NewCapacityAvailableFilter(lb.GetPredictor(UTIL_LISTENERS)))
    		case "bandwidth_score":
    			lb.AddFilter(lbv2.NewCapacityAvailableScorer(lb.GetPredictor(UTIL_BANDWIDTH)))
    		case "listeners_score":
    			lb.AddFilter(lbv2.NewCapacityAvailableScorer(lb.GetPredictor(UTIL_LISTENERS)))
    		case "random":
    			policy = lbv2.RandomPolicy
    		case "weighted":
    			policy = lbv2.WeightedPolicy
    		case "best":
    			policy = lbv2.HighestScorePolicy
    		default:
    			return nil, fmt.Errorf("unknown lb filter spec \"%s\"", spec)
    		}
    	}
    
    	if policy == nil {
    		return nil, errors.New("no lb policy specified")
    	}
    	lb.SetPolicy(policy)
    
    	lb.AddFilter(lbv2.NewActiveNodesFilter())
    
    	return &autoradioLoadBalancer{lb}, nil
    }