Skip to content
Snippets Groups Projects
Commit cae42f84 authored by ale's avatar ale
Browse files

Merge branch 'bwmonitor'

parents 7a26590d dc732e2c
No related branches found
No related tags found
No related merge requests found
......@@ -72,8 +72,11 @@ type NodeStatus struct {
// Is the Icecast server up?
IcecastUp bool
// List of
// List of mount points.
Mounts []IcecastMountStatus `xml:"mount"`
// Bandwidth utilization.
BandwidthUsage float64
}
func (ns *NodeStatus) NumListeners() int {
......
......@@ -13,6 +13,8 @@ import (
var (
publicIp = flag.String("ip", "127.0.0.1", "Public IP for this machine")
netDev = flag.String("interface", "eth0", "Network interface to monitor for utilization")
bwLimit = flag.Int("bwlimit", 100, "Bandwidth usage limit (Mbps)")
)
......@@ -20,7 +22,8 @@ func main() {
flag.Parse()
client := autoradio.NewEtcdClient()
n := node.NewRadioNode(*publicIp, client)
bwLimitBytes := float64(*bwLimit * 1000000 / 8)
n := node.NewRadioNode(*publicIp, *netDev, bwLimitBytes, client)
// Set up a clean shutdown function on SIGTERM.
stopch := make(chan os.Signal)
......
......@@ -13,9 +13,13 @@ DOMAIN=""
# if your fully-qualified hostname).
#PUBLIC_IP=""
# Name of the outbound network interface (if unset, we'll try to
# figure it out automatically based on the value of PUBLIC_IP).
#INTERFACE="eth0"
# Set the etcd servers to connect to (default: etcd.$DOMAIN).
#ETCD_SERVER=""
# Additional options that are passed to specific daemons.
RADIOD_OPTIONS=""
REDIRECTORD_OPTIONS=""
#RADIOD_OPTIONS="--bwlimit=1000"
#REDIRECTORD_OPTIONS=""
......@@ -35,6 +35,17 @@ set_public_ip() {
fi
}
set_interface() {
# Try to guess the network interface for PUBLIC_IP.
if [ -z "${INTERFACE}" ]; then
INTERFACE=$(/sbin/ip -o addr show to ${PUBLIC_IP} | awk '{print $2}')
fi
if [ -z "${INTERFACE}" ]; then
echo "Warning: could not autodetect network interface for ${PUBLIC_IP}" 1>&2
INTERFACE=eth0
fi
}
set_etcd_params() {
local default_etcd_server="etcd.${DOMAIN}"
ETCD_SERVER="${ETCD_SERVER:-${default_etcd_server}}"
......
......@@ -3,8 +3,9 @@
. /usr/share/autoradio/lib.sh
set_public_ip
set_interface
set_etcd_params
exec chpst -u icecast2 \
radiod --ip=${PUBLIC_IP} ${ETCD_OPTIONS} ${RADIOD_OPTIONS} \
radiod --ip=${PUBLIC_IP} --interface=${INTERFACE} ${ETCD_OPTIONS} ${RADIOD_OPTIONS} \
2>&1
......@@ -18,15 +18,15 @@ type LoadBalancingPolicy interface {
type leastListenersPolicy struct{}
func (llp leastListenersPolicy) GetNode(nodes []*autoradio.NodeStatus) *autoradio.NodeStatus {
minIdx := 0
var minNode *autoradio.NodeStatus
min := 1000000
for i, n := range nodes {
for _, n := range nodes {
if listeners := n.NumListeners(); listeners < min {
minIdx = i
minNode = n
min = listeners
}
}
return nodes[minIdx]
return minNode
}
// Simple load balancing policy that selects a node randomly,
......@@ -48,12 +48,32 @@ func (wlp weightedListenersPolicy) GetNode(nodes []*autoradio.NodeStatus) *autor
return result.Item.(*autoradio.NodeStatus)
}
// Load balancing policy that will send requests to the backend whose
// bandwidth utilization is the lowest.
type leastBandwidthPolicy struct{}
func (lbp leastBandwidthPolicy) GetNode(nodes []*autoradio.NodeStatus) *autoradio.NodeStatus {
// Capping usage at 100%, nodes above that won't be returned
// at all.
var minNode *autoradio.NodeStatus
minUtil := float64(1.0)
for _, n := range nodes {
if n.BandwidthUsage < minUtil {
minUtil = n.BandwidthUsage
minNode = n
}
}
return minNode
}
func getNamedLoadBalancingPolicy(lbpolicy string) LoadBalancingPolicy {
switch lbpolicy {
case "leastloaded", "ll":
return &leastListenersPolicy{}
case "weighted", "wl":
return &weightedListenersPolicy{}
case "leastbandwidth", "lb":
return &leastBandwidthPolicy{}
}
log.Fatalf("Unknown load-balancing policy '%s'", lbpolicy)
return nil
......
package bwmonitor
import (
"bufio"
"errors"
"os"
"regexp"
"strconv"
"strings"
"time"
)
var spacesRx = regexp.MustCompile(`\s+`)
type netDevCounts struct {
bytesSent int
}
func getBytesSentForDevice(dev string) (uint64, error) {
file, err := os.Open("/proc/net/dev")
if err != nil {
return 0, err
}
defer file.Close()
input := bufio.NewScanner(file)
for input.Scan() {
line := spacesRx.Split(strings.TrimSpace(input.Text()), -1)
curDev := line[0][0 : len(line[0])-1]
if curDev == dev {
bytes, err := strconv.ParseUint(line[9], 10, 64)
if err != nil {
return 0, err
}
return bytes, nil
}
}
return 0, errors.New("device not found")
}
type BandwidthMonitor struct {
device string
counter uint64
stamp time.Time
period time.Duration
stop chan bool
rate float64
}
func NewBandwidthMonitor(dev string) *BandwidthMonitor {
bw := &BandwidthMonitor{
device: dev,
stamp: time.Now(),
period: 30 * time.Second,
stop: make(chan bool),
}
go bw.run()
return bw
}
func (bw *BandwidthMonitor) Close() {
close(bw.stop)
}
func (bw *BandwidthMonitor) GetRate() float64 {
return bw.rate
}
func (bw *BandwidthMonitor) run() {
t := time.NewTicker(bw.period)
for {
select {
case <-t.C:
if c, err := getBytesSentForDevice(bw.device); err == nil {
now := time.Now()
bw.rate = float64(c - bw.counter) / now.Sub(bw.stamp).Seconds()
bw.counter = c
bw.stamp = now
}
case <-bw.stop:
return
}
}
}
type BandwidthUsageMonitor struct {
*BandwidthMonitor
bwLimit float64
}
func NewBandwidthUsageMonitor(dev string, bwLimit float64) *BandwidthUsageMonitor {
return &BandwidthUsageMonitor{
NewBandwidthMonitor(dev),
bwLimit,
}
}
func (bu *BandwidthUsageMonitor) GetUsage() float64 {
return bu.GetRate() / bu.bwLimit
}
......@@ -10,6 +10,7 @@ import (
"git.autistici.org/ale/autoradio"
"git.autistici.org/ale/autoradio/masterelection"
"git.autistici.org/ale/autoradio/node/bwmonitor"
"git.autistici.org/ale/autoradio/third_party/github.com/coreos/go-etcd/etcd"
)
......@@ -191,12 +192,13 @@ type RadioNode struct {
me *masterelection.MasterElection
watcher *ConfigSyncer
icecast *IcecastController
bw *bwmonitor.BandwidthUsageMonitor
livenessTtl uint64
upch chan bool
stop chan bool
}
func NewRadioNode(ip string, client *etcd.Client) *RadioNode {
func NewRadioNode(ip, netDev string, bwLimit float64, client *etcd.Client) *RadioNode {
config := NewClusterConfig()
// Network updates trigger icecast reconfiguration. This
......@@ -229,6 +231,7 @@ func NewRadioNode(ip string, client *etcd.Client) *RadioNode {
watcher: NewConfigSyncer(client, config, upch, stopch),
icecast: NewIcecastController(ip, stopch),
livenessTtl: 2,
bw: bwmonitor.NewBandwidthUsageMonitor(netDev, bwLimit),
upch: upch,
stop: stopch,
}
......@@ -245,9 +248,10 @@ func (rc *RadioNode) presence() {
// Build our NodeStatus.
icecastSt := rc.icecast.GetStatus()
nodeSt := autoradio.NodeStatus{
IP: rc.ip,
IcecastUp: icecastSt.Up,
Mounts: icecastSt.Mounts,
IP: rc.ip,
IcecastUp: icecastSt.Up,
Mounts: icecastSt.Mounts,
BandwidthUsage: rc.bw.GetUsage(),
}
// Update our node entry in the database.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment