Skip to content
Snippets Groups Projects
Commit cbccb3f9 authored by ale's avatar ale
Browse files

add support for multiple load balancing policies

parent 05f102b5
Branches
No related tags found
No related merge requests found
...@@ -18,6 +18,8 @@ var ( ...@@ -18,6 +18,8 @@ var (
staticDir = flag.String("static-dir", "/usr/share/radioai/htdocs/static", "Static content directory") staticDir = flag.String("static-dir", "/usr/share/radioai/htdocs/static", "Static content directory")
templateDir = flag.String("template-dir", "/usr/share/radioai/htdocs/templates", "HTML templates directory") templateDir = flag.String("template-dir", "/usr/share/radioai/htdocs/templates", "HTML templates directory")
lbPolicy = flag.String("lb-policy", "weighted", "Load balancing policy (weighted, leastloaded)")
// Default DNS TTL (seconds). // Default DNS TTL (seconds).
dnsTtl = 5 dnsTtl = 5
) )
...@@ -35,6 +37,6 @@ func main() { ...@@ -35,6 +37,6 @@ func main() {
dnsRed := fe.NewDnsRedirector(api, *domain, *publicIp, dnsTtl) dnsRed := fe.NewDnsRedirector(api, *domain, *publicIp, dnsTtl)
dnsRed.Run(fmt.Sprintf(":%d", *dnsPort)) dnsRed.Run(fmt.Sprintf(":%d", *dnsPort))
red := fe.NewHttpRedirector(api, *domain) red := fe.NewHttpRedirector(api, *domain, *lbPolicy)
red.Run(fmt.Sprintf(":%d", *httpPort), *staticDir, *templateDir) red.Run(fmt.Sprintf(":%d", *httpPort), *staticDir, *templateDir)
} }
...@@ -6,7 +6,6 @@ import ( ...@@ -6,7 +6,6 @@ import (
"html/template" "html/template"
"io" "io"
"log" "log"
"math/rand"
"net" "net"
"net/http" "net/http"
"path/filepath" "path/filepath"
...@@ -26,18 +25,19 @@ type HttpRedirector struct { ...@@ -26,18 +25,19 @@ type HttpRedirector struct {
domain string domain string
client *radioai.RadioAPI client *radioai.RadioAPI
template *template.Template template *template.Template
lb LoadBalancingPolicy
} }
func NewHttpRedirector(client *radioai.RadioAPI, domain string) *HttpRedirector { func NewHttpRedirector(client *radioai.RadioAPI, domain string, lbpolicy string) *HttpRedirector {
return &HttpRedirector{ return &HttpRedirector{
client: client, client: client,
domain: domain, domain: domain,
lb: getNamedLoadBalancingPolicy(lbpolicy),
} }
} }
// Return an active node, chosen randomly (this is currently our load // Return an active node, chosen according to the current load
// balancing policy, since there is no status information about the // balancing policy.
// nodes yet).
func (h *HttpRedirector) pickActiveNode() string { func (h *HttpRedirector) pickActiveNode() string {
nodes, _ := h.client.GetNodes() nodes, _ := h.client.GetNodes()
if nodes == nil { if nodes == nil {
...@@ -45,17 +45,21 @@ func (h *HttpRedirector) pickActiveNode() string { ...@@ -45,17 +45,21 @@ func (h *HttpRedirector) pickActiveNode() string {
} }
// Filter nodes where Icecast is reported to be up. // Filter nodes where Icecast is reported to be up.
okNodes := make([]string, 0, len(nodes)) okNodes := make([]*radioai.NodeStatus, 0, len(nodes))
for _, n := range nodes { for _, n := range nodes {
if n.IcecastUp { if n.IcecastUp {
okNodes = append(okNodes, n.IP) okNodes = append(okNodes, n)
} }
} }
if len(okNodes) == 0 {
return ""
}
if len(okNodes) > 0 { result := h.lb.GetNode(okNodes)
return okNodes[rand.Intn(len(okNodes))] if result == nil {
return ""
} }
return "" return result.IP
} }
// Parse the request and extract the mount path. // Parse the request and extract the mount path.
...@@ -184,6 +188,8 @@ func (h *HttpRedirector) serveStatusPage(w http.ResponseWriter, r *http.Request) ...@@ -184,6 +188,8 @@ func (h *HttpRedirector) serveStatusPage(w http.ResponseWriter, r *http.Request)
} }
w.Header().Set("Content-Type", "text/html; charset=utf-8") w.Header().Set("Content-Type", "text/html; charset=utf-8")
w.Header().Set("Content-Length", strconv.Itoa(buf.Len())) w.Header().Set("Content-Length", strconv.Itoa(buf.Len()))
w.Header().Set("Expires", "-1")
w.Header().Set("Cache-Control", "private, max-age=0")
w.Write(buf.Bytes()) w.Write(buf.Bytes())
} }
......
package fe
import (
"log"
"git.autistici.org/ale/radioai"
"github.com/jmcvetta/randutil"
)
// A load balancing policy selects a single node from the pool of
// currently active ones.
type LoadBalancingPolicy interface {
GetNode([]*radioai.NodeStatus) *radioai.NodeStatus
}
// Simple load balancing policy that always returns the nodes with the
// least amount of listeners.
type leastListenersPolicy struct{}
func (llp leastListenersPolicy) GetNode(nodes []*radioai.NodeStatus) *radioai.NodeStatus {
minIdx := 0
min := 1000000
for i, n := range nodes {
if listeners := n.NumListeners(); listeners < min {
minIdx = i
min = listeners
}
}
return nodes[minIdx]
}
// Simple load balancing policy that selects a node randomly,
// associating a weight to each node inversely proportional to the
// number of active listeners.
type weightedListenersPolicy struct{}
func (wlp weightedListenersPolicy) GetNode(nodes []*radioai.NodeStatus) *radioai.NodeStatus {
choices := make([]randutil.Choice, 0, len(nodes))
weightBase := 1000000
for _, n := range nodes {
w := weightBase / (n.NumListeners() + 1)
choices = append(choices, randutil.Choice{w, n})
}
result, err := randutil.WeightedChoice(choices)
if err != nil {
return nil
}
return result.Item.(*radioai.NodeStatus)
}
func getNamedLoadBalancingPolicy(lbpolicy string) LoadBalancingPolicy {
switch lbpolicy {
case "leastloaded", "ll":
return &leastListenersPolicy{}
case "weighted", "wl":
return &weightedListenersPolicy{}
}
log.Fatalf("Unknown load-balancing policy '%s'", lbpolicy)
return nil
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment