Select Git revision
loadbalancing.go
loadbalancing.go 3.86 KiB
package fe
import (
"errors"
"fmt"
"net"
"strings"
"git.autistici.org/ale/autoradio"
"git.autistici.org/ale/autoradio/fe/lbv2"
)
const (
UTIL_BANDWIDTH = iota
UTIL_LISTENERS
)
// Wrapper type to make a NodeStatus match the lbv2.Node interface.
type lbNode struct {
*autoradio.NodeStatus
}
func (n *lbNode) Utilization(dimension int) lbv2.NodeUtilization {
var u float64
switch dimension {
case UTIL_BANDWIDTH:
u = n.BandwidthUsage
case UTIL_LISTENERS:
u = float64(n.NumListeners()) / float64(n.MaxListeners)
}
return lbv2.NodeUtilization{
Utilization: u,
Requests: n.NumListeners(),
}
}
func (n *lbNode) Name() string {
return n.NodeStatus.Name
}
type autoradioNodeFilterFunc func(lbv2.RequestContext, *lbNode) bool
func (f autoradioNodeFilterFunc) Score(ctx lbv2.RequestContext, n lbv2.Node) float64 {
if f(ctx, n.(*lbNode)) {
return 1
}
return 0
}
func icecastActiveFilter(ctx lbv2.RequestContext, n *lbNode) bool {
return n.IcecastUp
}
// NodeFilter that disables backends where Icecast is not running.
func newIcecastActiveFilter() lbv2.NodeFilter {
return lbv2.NodeScorerFilter(autoradioNodeFilterFunc(icecastActiveFilter))
}
func getIpProtos(ips []net.IP) (bool, bool) {
hasV4 := false
hasV6 := false
for _, ip := range ips {
if ip.To4() == nil {
hasV6 = true
} else {
hasV4 = true
}
}
return hasV4, hasV6
}
func ipProtocolFilter(ctx lbv2.RequestContext, n *lbNode) bool {
if ctx == nil {
return true
}
addr := ctx.RemoteAddr()
if addr == nil {
return true
}
remoteV6 := addr.To4() == nil
hasV4, hasV6 := getIpProtos(n.IP)
return (remoteV6 && hasV6) || (!remoteV6 && hasV4)
}
// NodeFilter that selects those backends having at least an IP
// address matching the request protocol (IPv4/IPv6).
func newIpProtocolFilter() lbv2.NodeFilter {
return lbv2.NodeScorerFilter(autoradioNodeFilterFunc(ipProtocolFilter))
}
// Wrapper class for lbv2.LoadBalancer that uses NodeStatus objects.
type autoradioLoadBalancer struct {
*lbv2.LoadBalancer
}
func (l *autoradioLoadBalancer) Update(nodes []*autoradio.NodeStatus) {
wrapped := make([]lbv2.Node, 0, len(nodes))
for _, n := range nodes {
wrapped = append(wrapped, &lbNode{n})
}
l.LoadBalancer.Update(wrapped)
}
func (l *autoradioLoadBalancer) Choose(ctx lbv2.RequestContext) *autoradio.NodeStatus {
result := l.LoadBalancer.Choose(ctx)
if result == nil {
return nil
}
return result.(*lbNode).NodeStatus
}
// Parse a string that specifies how to build a LoadBalancer. The
// string should consist of a list of comma-separated tokens, each
// identifying a specific filter or policy.
//
// Some filters will always be included in the resulting LoadBalancer
// and do not need to be specified explicitly (icecastActiveFilter and
// ipProtocolFilter, plus an activeNodesFilter at the end).
func parseLoadBalancerSpec(specstr string) (*autoradioLoadBalancer, error) {
lb := lbv2.New()
lb.AddFilter(newIcecastActiveFilter())
lb.AddFilter(newIpProtocolFilter())
var policy lbv2.Policy
for _, spec := range strings.Split(specstr, ",") {
switch spec {
case "bandwidth_available":
lb.AddFilter(lbv2.NewCapacityAvailableFilter(lb.GetPredictor(UTIL_BANDWIDTH)))
case "listeners_available":
lb.AddFilter(lbv2.NewCapacityAvailableFilter(lb.GetPredictor(UTIL_LISTENERS)))
case "bandwidth_score":
lb.AddFilter(lbv2.NewCapacityAvailableScorer(lb.GetPredictor(UTIL_BANDWIDTH)))
case "listeners_score":
lb.AddFilter(lbv2.NewCapacityAvailableScorer(lb.GetPredictor(UTIL_LISTENERS)))
case "random":
policy = lbv2.RandomPolicy
case "weighted":
policy = lbv2.WeightedPolicy
case "best":
policy = lbv2.HighestScorePolicy
default:
return nil, fmt.Errorf("unknown lb filter spec \"%s\"", spec)
}
}
if policy == nil {
return nil, errors.New("no lb policy specified")
}
lb.SetPolicy(policy)
lb.AddFilter(lbv2.NewActiveNodesFilter())
return &autoradioLoadBalancer{lb}, nil
}