diff --git a/cmd/radiod/radiod.go b/cmd/radiod/radiod.go index 5a701b944a723d84117091601400b05d246b6089..85ef0e4b6df6bb4a282b698377d8ff44a63d54e8 100644 --- a/cmd/radiod/radiod.go +++ b/cmd/radiod/radiod.go @@ -15,10 +15,11 @@ import ( ) var ( - name = flag.String("name", shortHostname(), "Name for this node") - publicIps = util.IPList("ip", "Public IP for this machine (may be specified more than once)") - netDev = flag.String("interface", "eth0", "Network interface to monitor for utilization") - bwLimit = flag.Int("bwlimit", 100, "Bandwidth usage limit (Mbps)") + name = flag.String("name", shortHostname(), "Name for this node") + publicIps = util.IPList("ip", "Public IP for this machine (may be specified more than once)") + netDev = flag.String("interface", "eth0", "Network interface to monitor for utilization") + bwLimit = flag.Int("bwlimit", 100, "Bandwidth usage limit (Mbps)") + maxClients = flag.Int("max-clients", 1000, "Maximum number of connected clients") ) func shortHostname() string { @@ -37,7 +38,7 @@ func main() { client := autoradio.NewEtcdClient(true) bwLimitBytes := float64(*bwLimit * 1000000 / 8) - n := node.NewRadioNode(*name, util.IPListWithDefault(*publicIps, "127.0.0.1"), *netDev, bwLimitBytes, client) + n := node.NewRadioNode(*name, util.IPListWithDefault(*publicIps, "127.0.0.1"), *netDev, bwLimitBytes, *maxClients, client) // Set up a clean shutdown function on SIGTERM. stopch := make(chan os.Signal) diff --git a/cmd/redirectord/redirectord.go b/cmd/redirectord/redirectord.go index 6e4e09b22cc99d5c1e3a3e85027be6db0d45e30b..bb1cd8baf5fe5b9837545540e7114793f22210a5 100644 --- a/cmd/redirectord/redirectord.go +++ b/cmd/redirectord/redirectord.go @@ -39,6 +39,9 @@ func main() { dnsRed := fe.NewDnsRedirector(client, *domain, util.IPListWithDefault(*publicIps, "127.0.0.1"), dnsTtl) dnsRed.Run(fmt.Sprintf(":%d", *dnsPort)) - red := fe.NewHttpRedirector(client, *domain, *lbPolicy) + red, err := fe.NewHttpRedirector(client, *domain, *lbPolicy) + if err != nil { + log.Fatal(err) + } red.Run(fmt.Sprintf(":%d", *httpPort), *staticDir, *templateDir) } diff --git a/fe/http.go b/fe/http.go index b48a79d5ca093fd3d1108630dcd7fd6f01db5187..f73344856f70191ad5cabbe85fdf6d5df319a9c1 100644 --- a/fe/http.go +++ b/fe/http.go @@ -54,17 +54,21 @@ func statsHandler(h http.Handler) http.HandlerFunc { // HTTP redirector. type HttpRedirector struct { domain string - lb LoadBalancingPolicy + lb *autoradioLoadBalancer client *autoradio.Client template *template.Template } -func NewHttpRedirector(client *autoradio.Client, domain string, lbpolicy string) *HttpRedirector { +func NewHttpRedirector(client *autoradio.Client, domain string, lbspec string) (*HttpRedirector, error) { + lb, err := parseLoadBalancerSpec(lbspec) + if err != nil { + return nil, err + } return &HttpRedirector{ client: client, domain: domain, - lb: getNamedLoadBalancingPolicy(lbpolicy), - } + lb: lb, + }, nil } // Pick a random IP with a protocol appropriate to the request (based @@ -75,30 +79,32 @@ func randomIpForRequest(ips []net.IP, r *http.Request) net.IP { return randomIpByProto(ips, isV6) } +type httpRequestContext struct { + req *http.Request +} + +func (r *httpRequestContext) RemoteAddr() net.IP { + return net.ParseIP(r.req.RemoteAddr) +} + // Return an active node, chosen according to the current load // balancing policy. func (h *HttpRedirector) pickActiveNode(r *http.Request) net.IP { - nodes, _ := h.client.GetNodes() - if nodes == nil { + result := h.lb.Choose(&httpRequestContext{r}) + if result == nil { return nil } + return randomIpForRequest(result.IP, r) +} - // Filter nodes where Icecast is reported to be up. - okNodes := make([]*autoradio.NodeStatus, 0, len(nodes)) - for _, n := range nodes { - if n.IcecastUp { - okNodes = append(okNodes, n) +func (h *HttpRedirector) lbUpdater() { + for range time.NewTicker(2 * time.Second).C { + nodes, err := h.client.GetNodes() + if err != nil { + continue } + h.lb.Update(nodes) } - if len(okNodes) == 0 { - return nil - } - - result := h.lb.GetNode(okNodes) - if result == nil { - return nil - } - return randomIpForRequest(result.IP, r) } func icecastAddr(server net.IP) string { diff --git a/fe/lbv2/lbv2.go b/fe/lbv2/lbv2.go new file mode 100644 index 0000000000000000000000000000000000000000..9991e24d2c8ec13f77f2b0a786b5a416679d7106 --- /dev/null +++ b/fe/lbv2/lbv2.go @@ -0,0 +1,391 @@ +// Modular building blocks for a traffic control engine. +// +package lbv2 + +import ( + "math/rand" + "net" + "sync" + + "github.com/jmcvetta/randutil" +) + +const baseWeight = 1000000 + +// Node utilization along a specific dimension. Utilization is treated +// as a vector, the LoadBalancer is opaque to the actual meaning of +// the dimensions used (though it makes sense for Requests to be the +// same across dimensions). +type NodeUtilization struct { + // Utilization is a number between 0 and 1. + Utilization float64 + + // Some request-related metric. It doesn't really matter + // whether it is a counter or a gauge, as long as it is + // roughly proportional to the utilization. + Requests int +} + +type Node interface { + // Name of the node (unique identifier). + Name() string + + // Current utilization for the specified dimension. + Utilization(int) NodeUtilization +} + +type NodeScore struct { + Score float64 + Node Node +} + +// SetScore applies a multiplier to the current score and returns +// the modified object. +func (c NodeScore) SetScore(w float64) NodeScore { + c.Score *= w + return c +} + +// Damping factor for the utilization query cost model. +const costAlpha = 0.9 + +type costEstimate float64 + +func (e costEstimate) Update(util NodeUtilization) costEstimate { + // Compute the new query cost and update the current cost + // (with a damping factor). + newCost := util.Utilization / float64(1+util.Requests) + return costEstimate(costAlpha*float64(e) + (1-costAlpha)*newCost) +} + +// A Predictor estimates utilization along a specific dimension using +// a (relatively simple) cost query model. +type Predictor interface { + Utilization(Node) float64 +} + +type costPredictor struct { + cost map[string]costEstimate + util map[string]float64 + utilDimension int + clusterSize float64 +} + +func newPredictor(utilDimension int) *costPredictor { + return &costPredictor{ + clusterSize: 1, + utilDimension: utilDimension, + cost: make(map[string]costEstimate), + util: make(map[string]float64), + } +} + +// Update the cost function based on the most recent node utilization. +func (p *costPredictor) Update(nodes []Node) { + p.clusterSize = float64(len(nodes)) + for _, n := range nodes { + // Update the current query cost estimate. If the node + // is new, set the cost to 1 which results in a + // gradual ramp-up of connections as the damping + // function subsides. + name := n.Name() + util := n.Utilization(p.utilDimension) + if prev, ok := p.cost[name]; ok { + p.cost[name] = prev.Update(util) + } else { + p.cost[name] = costEstimate(1) + } + + // Update the predicted utilization. + p.util[name] = util.Utilization + } +} + +func (p *costPredictor) Incr(n Node) { + // The zeroth-order approximation is that every redirector + // sees the same data and follows the same query distribution. + p.util[n.Name()] += float64(p.cost[n.Name()]) * p.clusterSize +} + +func (p *costPredictor) Utilization(n Node) float64 { + return p.util[n.Name()] +} + +// Provides contextual information on the incoming request. +type RequestContext interface { + RemoteAddr() net.IP +} + +// The LoadBalancer object makes decisions about where traffic should +// go. It contains the list of active backend nodes (updated +// asynchronously by calling Update), and a list of filters and +// policies used to select a backend for every incoming request. +// +// Filters look at each available backend node and assign them a +// score, possibly depending on the incoming request. The policy then +// picks a backend among the available nodes based on their score. By +// convention, it is possible to remove a node from the list of +// candidates by setting its score to zero. +// +// Combinations of simple filters can implement relatively complex +// traffic control behaviors: from straightforward round-robin to +// geoip latency minimization to capacity-aware, utilization-based +// load balancing. +// +// The LoadBalancer keeps track of how many requests were sent to each +// backend node, to compute an up-to-date estimation of its current +// utilization. This should help mitigate "thundering herd" and "laser +// death ray" scenarios caused by delays in the utilization feedback +// loop. +// +// The computational model makes some generic assumptions about +// incoming traffic, the results will be better the more real traffic +// actually matches these assumptions. First, all requests are assumed +// to be identical, utilization-wise: there is a single query cost +// metric. While this implies that the accuracy of the cost estimation +// for each specific query may be low, the effect evens out with large +// numbers as long as the statistical distribution of request types +// varies little over time. Secondly, since the estimation logic is +// local to each frontend, the model assumes that each frontend +// receives an equal share of incoming traffic. +// +type LoadBalancer struct { + lock sync.Mutex + nodes []Node + predictors map[int]*costPredictor + filters []NodeFilter + policy Policy + + // Only use for testing purposes. + disablePredictors bool +} + +// New returns a new LoadBalancer with no filters or policy set. +func New() *LoadBalancer { + return &LoadBalancer{ + predictors: make(map[int]*costPredictor), + } +} + +// AddFilters appends a filter to the filter list. +func (l *LoadBalancer) AddFilter(f NodeFilter) { + l.filters = append(l.filters, f) +} + +// SetPolicy sets the node selection policy. +func (l *LoadBalancer) SetPolicy(p Policy) { + l.policy = p +} + +// GetPredictor returns an utilization predictor for the specified +// dimension. +func (l *LoadBalancer) GetPredictor(dimension int) Predictor { + p, ok := l.predictors[dimension] + if !ok { + p = newPredictor(dimension) + l.predictors[dimension] = p + } + return p +} + +// Only use for testing purposes. +func (l *LoadBalancer) DisableUtilizationPredictors() { + l.disablePredictors = true +} + +// Update the set of known nodes. The new utilization numbers will be +// used to calibrate the utilization predictors. +func (l *LoadBalancer) Update(nodes []Node) { + l.lock.Lock() + defer l.lock.Unlock() + l.nodes = nodes + for _, p := range l.predictors { + p.Update(nodes) + } +} + +// Choose a node according to the specified policies. +func (l *LoadBalancer) Choose(ctx RequestContext) Node { + l.lock.Lock() + defer l.lock.Unlock() + if len(l.nodes) == 0 { + return nil + } + + // Create the candidate list. + wnodes := make([]NodeScore, 0, len(l.nodes)) + for _, n := range l.nodes { + wnodes = append(wnodes, NodeScore{ + Score: 1.0, + Node: n, + }) + } + + // Apply filters. + for _, f := range l.filters { + wnodes = f.Filter(ctx, wnodes) + } + + // Select a node among the available candidates. + n := l.policy.GetNode(wnodes) + if n == nil { + return nil + } + + // Feed back the choice into the utilization predictors. + if !l.disablePredictors { + for _, p := range l.predictors { + p.Incr(n) + } + } + return n +} + +type Policy interface { + GetNode([]NodeScore) Node +} + +type PolicyFunc func([]NodeScore) Node + +func (f PolicyFunc) GetNode(wnodes []NodeScore) Node { + return f(wnodes) +} + +// Node filters operate on the list of available nodes (and associated +// weights), possibly modifying it in-place. They can be composed out +// of simpler parts, like individual nodeScorers wrapped by an +// nodeScorerFilter (when no request-specific initialization step is +// required). +type NodeFilter interface { + Filter(RequestContext, []NodeScore) []NodeScore +} + +// Score a single node. +type NodeScorer interface { + Score(RequestContext, Node) float64 +} + +type NodeScorerFunc func(RequestContext, Node) float64 + +func (f NodeScorerFunc) Score(ctx RequestContext, n Node) float64 { + return f(ctx, n) +} + +// Run an individual NodeScorer over all available nodes. Satisfies +// the NodeFilter interface. +type nodeScorerFilterWrapper struct { + NodeScorer +} + +func (f *nodeScorerFilterWrapper) Filter(ctx RequestContext, wnodes []NodeScore) []NodeScore { + for i, wn := range wnodes { + wnodes[i] = wnodes[i].SetScore(f.NodeScorer.Score(ctx, wn.Node)) + } + return wnodes +} + +func NodeScorerFilter(s NodeScorer) NodeFilter { + return &nodeScorerFilterWrapper{s} +} + +// Give each node a score proportional to the inverse of utilization. +type capacityAvailableScorer struct { + pred Predictor +} + +func (s *capacityAvailableScorer) Score(ctx RequestContext, n Node) float64 { + return 1.0 / s.pred.Utilization(n) +} + +func NewCapacityAvailableScorer(pred Predictor) NodeFilter { + return NodeScorerFilter(&capacityAvailableScorer{pred}) +} + +// Disable nodes that have no available capacity (utilization greater +// than 1). +type capacityAvailableFilter struct { + pred Predictor +} + +func (f *capacityAvailableFilter) Score(ctx RequestContext, n Node) float64 { + if f.pred.Utilization(n) >= 1 { + return 0 + } + return 1 +} + +func NewCapacityAvailableFilter(pred Predictor) NodeFilter { + return NodeScorerFilter(&capacityAvailableFilter{pred}) +} + +// Remove disabled nodes (weight=0) from the list of candidates. +type activeNodesFilter struct{} + +func (f *activeNodesFilter) Filter(ctx RequestContext, wnodes []NodeScore) []NodeScore { + // Remove from the list of candidates the nodes whose score is + // equal to 0. Modifies the list in-place moving non-zero + // elements to the top of the list. + o := 0 + for i, n := range wnodes { + if n.Score > 0 { + if i != o { + wnodes[o] = wnodes[i] + } + o++ + } + } + return wnodes[:o] +} + +func NewActiveNodesFilter() NodeFilter { + return &activeNodesFilter{} +} + +// Return a random item based on a weighted distribution. +func weightedPolicyFunc(wnodes []NodeScore) Node { + // Need to convert this anyway. + choices := make([]randutil.Choice, len(wnodes)) + for i, wn := range wnodes { + choices[i] = randutil.Choice{ + Weight: int(float64(baseWeight) * wn.Score), + Item: wn.Node, + } + } + result, err := randutil.WeightedChoice(choices) + if err != nil { + return nil + } + return result.Item.(Node) +} + +var WeightedPolicy = PolicyFunc(weightedPolicyFunc) + +// Return a random item regardless of their weight (but it still +// ignores disabled nodes, for which weight=0). +func randomPolicyFunc(wnodes []NodeScore) Node { + if len(wnodes) == 0 { + return nil + } + return wnodes[rand.Intn(len(wnodes))].Node +} + +var RandomPolicy = PolicyFunc(randomPolicyFunc) + +// Always return the item with the highest score (only good for +// testing purposes, really). +func highestScorePolicyFunc(wnodes []NodeScore) Node { + maxIdx := -1 + var maxScore float64 + for i, n := range wnodes { + if n.Score > maxScore { + maxScore = n.Score + maxIdx = i + } + } + if maxIdx < 0 || maxScore == 0 { + return nil + } + return wnodes[maxIdx].Node +} + +var HighestScorePolicy = PolicyFunc(highestScorePolicyFunc) diff --git a/fe/loadbalancing.go b/fe/loadbalancing.go index fdbd10a11fcfcdc85e834524aa4b793d9fe0707b..bdcc041250c3883abec37f1c6d29ae597f341cde 100644 --- a/fe/loadbalancing.go +++ b/fe/loadbalancing.go @@ -1,80 +1,140 @@ package fe import ( - "log" + "fmt" + "net" + "strings" "git.autistici.org/ale/autoradio" - "github.com/jmcvetta/randutil" + "git.autistici.org/ale/autoradio/fe/lbv2" ) -// A load balancing policy selects a single node from the pool of -// currently active ones. -type LoadBalancingPolicy interface { - GetNode([]*autoradio.NodeStatus) *autoradio.NodeStatus +const ( + UTIL_BANDWIDTH = iota + UTIL_LISTENERS +) + +// Wrapper type to make a NodeStatus match the lbv2.Node interface. +type lbNode struct { + *autoradio.NodeStatus +} + +func (n *lbNode) Utilization(dimension int) lbv2.NodeUtilization { + var u float64 + switch dimension { + case UTIL_BANDWIDTH: + u = n.BandwidthUsage + case UTIL_LISTENERS: + u = float64(n.NumListeners()) / float64(n.MaxListeners) + } + return lbv2.NodeUtilization{ + Utilization: u, + Requests: n.NumListeners(), + } } -// Simple load balancing policy that always returns the nodes with the -// least amount of listeners. -type leastListenersPolicy struct{} +func (n *lbNode) Name() string { + return n.NodeStatus.Name +} -func (llp leastListenersPolicy) GetNode(nodes []*autoradio.NodeStatus) *autoradio.NodeStatus { - var minNode *autoradio.NodeStatus - min := 1000000 - for _, n := range nodes { - if listeners := n.NumListeners(); listeners < min { - minNode = n - min = listeners +type autoradioNodeFilterFunc func(lbv2.RequestContext, *lbNode) bool + +func (f autoradioNodeFilterFunc) Score(ctx lbv2.RequestContext, n lbv2.Node) float64 { + if f(ctx, n.(*lbNode)) { + return 1 + } + return 0 +} + +func icecastActiveFilter(ctx lbv2.RequestContext, n *lbNode) bool { + return n.IcecastUp +} + +// NodeFilter that disables backends where Icecast is not running. +func newIcecastActiveFilter() lbv2.NodeFilter { + return lbv2.NodeScorerFilter(autoradioNodeFilterFunc(icecastActiveFilter)) +} + +func getIpProtos(ips []net.IP) (bool, bool) { + hasV4 := false + hasV6 := false + for _, ip := range ips { + if ip.To4() == nil { + hasV6 = true + } else { + hasV4 = true } } - return minNode + return hasV4, hasV6 +} + +func ipProtocolFilter(ctx lbv2.RequestContext, n *lbNode) bool { + if ctx == nil { + return true + } + addr := ctx.RemoteAddr() + if addr == nil { + return true + } + remoteV6 := addr.To4() == nil + hasV4, hasV6 := getIpProtos(n.IP) + return (remoteV6 && hasV6) || (!remoteV6 && hasV4) +} + +// NodeFilter that selects those backends having at least an IP +// address matching the request protocol (IPv4/IPv6). +func newIpProtocolFilter() lbv2.NodeFilter { + return lbv2.NodeScorerFilter(autoradioNodeFilterFunc(ipProtocolFilter)) } -// Simple load balancing policy that selects a node randomly, -// associating a weight to each node inversely proportional to the -// number of active listeners. -type weightedListenersPolicy struct{} +// Wrapper class for lbv2.LoadBalancer that uses NodeStatus objects. +type autoradioLoadBalancer struct { + *lbv2.LoadBalancer +} -func (wlp weightedListenersPolicy) GetNode(nodes []*autoradio.NodeStatus) *autoradio.NodeStatus { - choices := make([]randutil.Choice, 0, len(nodes)) - weightBase := 1000000 +func (l *autoradioLoadBalancer) Update(nodes []*autoradio.NodeStatus) { + wrapped := make([]lbv2.Node, 0, len(nodes)) for _, n := range nodes { - w := weightBase / (n.NumListeners() + 1) - choices = append(choices, randutil.Choice{Weight: w, Item: n}) + wrapped = append(wrapped, &lbNode{n}) } - result, err := randutil.WeightedChoice(choices) - if err != nil { + l.LoadBalancer.Update(wrapped) +} + +func (l *autoradioLoadBalancer) Choose(ctx lbv2.RequestContext) *autoradio.NodeStatus { + result := l.LoadBalancer.Choose(ctx) + if result == nil { return nil } - return result.Item.(*autoradio.NodeStatus) + return result.(*lbNode).NodeStatus } -// Load balancing policy that will send requests to the backend whose -// bandwidth utilization is the lowest. -type leastBandwidthPolicy struct{} +func parseLoadBalancerSpec(specstr string) (*autoradioLoadBalancer, error) { + lb := lbv2.New() + lb.AddFilter(newIcecastActiveFilter()) + lb.AddFilter(newIpProtocolFilter()) -func (lbp leastBandwidthPolicy) GetNode(nodes []*autoradio.NodeStatus) *autoradio.NodeStatus { - // Capping usage at 100%, nodes above that won't be returned - // at all. - var minNode *autoradio.NodeStatus - minUtil := float64(1.0) - for _, n := range nodes { - if n.BandwidthUsage < minUtil { - minUtil = n.BandwidthUsage - minNode = n + for _, spec := range strings.Split(specstr, ",") { + switch spec { + case "ba", "bw_avail", "bandwidth_available": + lb.AddFilter(lbv2.NewCapacityAvailableFilter(lb.GetPredictor(UTIL_BANDWIDTH))) + case "la", "listeners_available": + lb.AddFilter(lbv2.NewCapacityAvailableFilter(lb.GetPredictor(UTIL_LISTENERS))) + case "bw", "bandwidth_weight": + lb.AddFilter(lbv2.NewCapacityAvailableScorer(lb.GetPredictor(UTIL_BANDWIDTH))) + case "lw", "listeners_weight": + lb.AddFilter(lbv2.NewCapacityAvailableScorer(lb.GetPredictor(UTIL_LISTENERS))) + case "random": + lb.SetPolicy(lbv2.RandomPolicy) + case "weighted": + lb.SetPolicy(lbv2.WeightedPolicy) + case "best": + lb.SetPolicy(lbv2.HighestScorePolicy) + default: + return nil, fmt.Errorf("unknown lb filter spec \"%s\"", spec) } } - return minNode -} -func getNamedLoadBalancingPolicy(lbpolicy string) LoadBalancingPolicy { - switch lbpolicy { - case "leastloaded", "ll": - return &leastListenersPolicy{} - case "weighted", "wl": - return &weightedListenersPolicy{} - case "leastbandwidth", "lb": - return &leastBandwidthPolicy{} - } - log.Fatalf("Unknown load-balancing policy '%s'", lbpolicy) - return nil + lb.AddFilter(lbv2.NewActiveNodesFilter()) + + return &autoradioLoadBalancer{lb}, nil } diff --git a/fe/loadbalancing_test.go b/fe/loadbalancing_test.go new file mode 100644 index 0000000000000000000000000000000000000000..23f5029a11698cb5618af62a8d5de2d3647cee07 --- /dev/null +++ b/fe/loadbalancing_test.go @@ -0,0 +1,133 @@ +package fe + +import ( + "math" + "net" + "testing" + + "git.autistici.org/ale/autoradio" + "git.autistici.org/ale/autoradio/fe/lbv2" +) + +func compareDistribution(a, b map[string]int, n int) bool { + for k, aval := range a { + bval := b[k] + diff := math.Abs(float64(aval-bval) / float64(1+bval)) + if diff >= 0.1 { + return false + } + } + return true +} + +func runLBTest(t *testing.T, nodes []*autoradio.NodeStatus, ctx lbv2.RequestContext, lbspec string, n int, expected map[string]int) { + lb, err := parseLoadBalancerSpec(lbspec) + if err != nil { + t.Fatalf("Error parsing spec \"%s\": %v", lbspec, err) + } + + lb.DisableUtilizationPredictors() + + // Run lb.Choose() n times and accumulate the results. + results := make(map[string]int) + lb.Update(nodes) + for i := 0; i < n; i++ { + n := lb.Choose(ctx) + if n != nil { + results[n.Name] = results[n.Name] + 1 + } + } + + // Compare with expected results. + if !compareDistribution(results, expected, n) { + t.Fatalf("bad result distribution, got=%v, want=%v", results, expected) + } +} + +func TestLoadBalancer_IcecastActiveFilter(t *testing.T) { + nodes := []*autoradio.NodeStatus{ + &autoradio.NodeStatus{ + Name: "node1", + IcecastUp: false, + }, + &autoradio.NodeStatus{ + Name: "node2", + IcecastUp: true, + }, + } + expectedDist := map[string]int{"node2": 1} + runLBTest(t, nodes, nil, "best", 1, expectedDist) +} + +type mockRequestContext struct { + addr net.IP +} + +func (c *mockRequestContext) RemoteAddr() net.IP { + return c.addr +} + +func TestLoadBalancer_IpProtoFilter(t *testing.T) { + nodes := []*autoradio.NodeStatus{ + &autoradio.NodeStatus{ + Name: "node1", + IcecastUp: true, + IP: []net.IP{ + net.ParseIP("1.2.3.4"), + }, + }, + &autoradio.NodeStatus{ + Name: "node2", + IcecastUp: true, + IP: []net.IP{ + net.ParseIP("2001:888:2000:56::19"), + }, + }, + } + + runLBTest(t, nodes, &mockRequestContext{net.ParseIP("127.0.0.1")}, "best", 1, map[string]int{"node1": 1}) + runLBTest(t, nodes, &mockRequestContext{net.ParseIP("::1")}, "best", 1, map[string]int{"node2": 1}) +} + +func TestLoadBalancer_Policies(t *testing.T) { + nodes := []*autoradio.NodeStatus{ + &autoradio.NodeStatus{ + Name: "node1", + IcecastUp: true, + BandwidthUsage: 0.4, + }, + &autoradio.NodeStatus{ + Name: "node2", + IcecastUp: true, + BandwidthUsage: 0.1, + }, + } + + // Weighted should return node1 4 times as often as node2. + runLBTest(t, nodes, nil, "bw,weighted", 1000, map[string]int{"node1": 200, "node2": 800}) + + // The 'random' policy will ignore the weights. + runLBTest(t, nodes, nil, "bw,random", 1000, map[string]int{"node1": 500, "node2": 500}) + + // The 'best' policy will always return node2. + runLBTest(t, nodes, nil, "bw,best", 1000, map[string]int{"node2": 1000}) +} + +func TestLoadBalancer_PoliciesIgnoreDisabledNodes(t *testing.T) { + nodes := []*autoradio.NodeStatus{ + &autoradio.NodeStatus{ + Name: "node1", + IcecastUp: true, + BandwidthUsage: 0.4, + }, + &autoradio.NodeStatus{ + Name: "node2", + IcecastUp: true, + BandwidthUsage: 1.0, + }, + } + + for _, spec := range []string{"bw_avail,weighted", "bw_avail,random", "bw_avail,best"} { + runLBTest(t, nodes, nil, spec, 1000, map[string]int{"node1": 1000}) + } +}