Commit dc732e2c authored by ale's avatar ale
Browse files

monitor bandwidth of the outbound link; add a load-balancing policy based on network utilization

parent ac095f08
......@@ -72,8 +72,11 @@ type NodeStatus struct {
// Is the Icecast server up?
IcecastUp bool
// List of
// List of mount points.
Mounts []IcecastMountStatus `xml:"mount"`
// Bandwidth utilization.
BandwidthUsage float64
}
func (ns *NodeStatus) NumListeners() int {
......
......@@ -13,6 +13,8 @@ import (
var (
publicIp = flag.String("ip", "127.0.0.1", "Public IP for this machine")
netDev = flag.String("interface", "eth0", "Network interface to monitor for utilization")
bwLimit = flag.Int("bwlimit", 100, "Bandwidth usage limit (Mbps)")
)
......@@ -20,7 +22,8 @@ func main() {
flag.Parse()
client := autoradio.NewEtcdClient()
n := node.NewRadioNode(*publicIp, client)
bwLimitBytes := float64(*bwLimit * 1000000 / 8)
n := node.NewRadioNode(*publicIp, *netDev, bwLimitBytes, client)
// Set up a clean shutdown function on SIGTERM.
stopch := make(chan os.Signal)
......
......@@ -13,9 +13,13 @@ DOMAIN=""
# if your fully-qualified hostname).
#PUBLIC_IP=""
# Name of the outbound network interface (if unset, we'll try to
# figure it out automatically based on the value of PUBLIC_IP).
#INTERFACE="eth0"
# Set the etcd servers to connect to (default: etcd.$DOMAIN).
#ETCD_SERVER=""
# Additional options that are passed to specific daemons.
RADIOD_OPTIONS=""
REDIRECTORD_OPTIONS=""
#RADIOD_OPTIONS="--bwlimit=1000"
#REDIRECTORD_OPTIONS=""
......@@ -35,6 +35,17 @@ set_public_ip() {
fi
}
set_interface() {
# Try to guess the network interface for PUBLIC_IP.
if [ -z "${INTERFACE}" ]; then
INTERFACE=$(/sbin/ip -o addr show to ${PUBLIC_IP} | awk '{print $2}')
fi
if [ -z "${INTERFACE}" ]; then
echo "Warning: could not autodetect network interface for ${PUBLIC_IP}" 1>&2
INTERFACE=eth0
fi
}
set_etcd_params() {
local default_etcd_server="etcd.${DOMAIN}"
ETCD_SERVER="${ETCD_SERVER:-${default_etcd_server}}"
......
......@@ -3,8 +3,9 @@
. /usr/share/autoradio/lib.sh
set_public_ip
set_interface
set_etcd_params
exec chpst -u icecast2 \
radiod --ip=${PUBLIC_IP} ${ETCD_OPTIONS} ${RADIOD_OPTIONS} \
radiod --ip=${PUBLIC_IP} --interface=${INTERFACE} ${ETCD_OPTIONS} ${RADIOD_OPTIONS} \
2>&1
......@@ -18,15 +18,15 @@ type LoadBalancingPolicy interface {
type leastListenersPolicy struct{}
func (llp leastListenersPolicy) GetNode(nodes []*autoradio.NodeStatus) *autoradio.NodeStatus {
minIdx := 0
var minNode *autoradio.NodeStatus
min := 1000000
for i, n := range nodes {
for _, n := range nodes {
if listeners := n.NumListeners(); listeners < min {
minIdx = i
minNode = n
min = listeners
}
}
return nodes[minIdx]
return minNode
}
// Simple load balancing policy that selects a node randomly,
......@@ -48,12 +48,32 @@ func (wlp weightedListenersPolicy) GetNode(nodes []*autoradio.NodeStatus) *autor
return result.Item.(*autoradio.NodeStatus)
}
// Load balancing policy that will send requests to the backend whose
// bandwidth utilization is the lowest.
type leastBandwidthPolicy struct{}
func (lbp leastBandwidthPolicy) GetNode(nodes []*autoradio.NodeStatus) *autoradio.NodeStatus {
// Capping usage at 100%, nodes above that won't be returned
// at all.
var minNode *autoradio.NodeStatus
minUtil := float64(1.0)
for _, n := range nodes {
if n.BandwidthUsage < minUtil {
minUtil = n.BandwidthUsage
minNode = n
}
}
return minNode
}
func getNamedLoadBalancingPolicy(lbpolicy string) LoadBalancingPolicy {
switch lbpolicy {
case "leastloaded", "ll":
return &leastListenersPolicy{}
case "weighted", "wl":
return &weightedListenersPolicy{}
case "leastbandwidth", "lb":
return &leastBandwidthPolicy{}
}
log.Fatalf("Unknown load-balancing policy '%s'", lbpolicy)
return nil
......
package bwmonitor
import (
"bufio"
"errors"
"os"
"regexp"
"strconv"
"strings"
"time"
)
var spacesRx = regexp.MustCompile(`\s+`)
type netDevCounts struct {
bytesSent int
}
func getBytesSentForDevice(dev string) (uint64, error) {
file, err := os.Open("/proc/net/dev")
if err != nil {
return 0, err
}
defer file.Close()
input := bufio.NewScanner(file)
for input.Scan() {
line := spacesRx.Split(strings.TrimSpace(input.Text()), -1)
curDev := line[0][0 : len(line[0])-1]
if curDev == dev {
bytes, err := strconv.ParseUint(line[9], 10, 64)
if err != nil {
return 0, err
}
return bytes, nil
}
}
return 0, errors.New("device not found")
}
type BandwidthMonitor struct {
device string
counter uint64
stamp time.Time
period time.Duration
stop chan bool
rate float64
}
func NewBandwidthMonitor(dev string) *BandwidthMonitor {
bw := &BandwidthMonitor{
device: dev,
stamp: time.Now(),
period: 30 * time.Second,
stop: make(chan bool),
}
go bw.run()
return bw
}
func (bw *BandwidthMonitor) Close() {
close(bw.stop)
}
func (bw *BandwidthMonitor) GetRate() float64 {
return bw.rate
}
func (bw *BandwidthMonitor) run() {
t := time.NewTicker(bw.period)
for {
select {
case <-t.C:
if c, err := getBytesSentForDevice(bw.device); err == nil {
now := time.Now()
bw.rate = float64(c - bw.counter) / now.Sub(bw.stamp).Seconds()
bw.counter = c
bw.stamp = now
}
case <-bw.stop:
return
}
}
}
type BandwidthUsageMonitor struct {
*BandwidthMonitor
bwLimit float64
}
func NewBandwidthUsageMonitor(dev string, bwLimit float64) *BandwidthUsageMonitor {
return &BandwidthUsageMonitor{
NewBandwidthMonitor(dev),
bwLimit,
}
}
func (bu *BandwidthUsageMonitor) GetUsage() float64 {
return bu.GetRate() / bu.bwLimit
}
......@@ -10,6 +10,7 @@ import (
"git.autistici.org/ale/autoradio"
"git.autistici.org/ale/autoradio/masterelection"
"git.autistici.org/ale/autoradio/node/bwmonitor"
"git.autistici.org/ale/autoradio/third_party/github.com/coreos/go-etcd/etcd"
)
......@@ -191,12 +192,13 @@ type RadioNode struct {
me *masterelection.MasterElection
watcher *ConfigSyncer
icecast *IcecastController
bw *bwmonitor.BandwidthUsageMonitor
livenessTtl uint64
upch chan bool
stop chan bool
}
func NewRadioNode(ip string, client *etcd.Client) *RadioNode {
func NewRadioNode(ip, netDev string, bwLimit float64, client *etcd.Client) *RadioNode {
config := NewClusterConfig()
// Network updates trigger icecast reconfiguration. This
......@@ -229,6 +231,7 @@ func NewRadioNode(ip string, client *etcd.Client) *RadioNode {
watcher: NewConfigSyncer(client, config, upch, stopch),
icecast: NewIcecastController(ip, stopch),
livenessTtl: 2,
bw: bwmonitor.NewBandwidthUsageMonitor(netDev, bwLimit),
upch: upch,
stop: stopch,
}
......@@ -245,9 +248,10 @@ func (rc *RadioNode) presence() {
// Build our NodeStatus.
icecastSt := rc.icecast.GetStatus()
nodeSt := autoradio.NodeStatus{
IP: rc.ip,
IcecastUp: icecastSt.Up,
Mounts: icecastSt.Mounts,
IP: rc.ip,
IcecastUp: icecastSt.Up,
Mounts: icecastSt.Mounts,
BandwidthUsage: rc.bw.GetUsage(),
}
// Update our node entry in the database.
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment