diff --git a/api.go b/api.go index 0c5c58eab5ccf76aaae9cfbd275fe004ff41e94f..69a3910ce1de6b3c6b3f1000dddc3e8a695ac2c1 100644 --- a/api.go +++ b/api.go @@ -72,8 +72,11 @@ type NodeStatus struct { // Is the Icecast server up? IcecastUp bool - // List of + // List of mount points. Mounts []IcecastMountStatus `xml:"mount"` + + // Bandwidth utilization. + BandwidthUsage float64 } func (ns *NodeStatus) NumListeners() int { diff --git a/cmd/radiod/radiod.go b/cmd/radiod/radiod.go index 8e783a5a0c97dc0510004e8df85af6890ca3d099..0012327099db43cee2870aa17ac6c9809bc669c4 100644 --- a/cmd/radiod/radiod.go +++ b/cmd/radiod/radiod.go @@ -13,6 +13,8 @@ import ( var ( publicIp = flag.String("ip", "127.0.0.1", "Public IP for this machine") + netDev = flag.String("interface", "eth0", "Network interface to monitor for utilization") + bwLimit = flag.Int("bwlimit", 100, "Bandwidth usage limit (Mbps)") ) @@ -20,7 +22,8 @@ func main() { flag.Parse() client := autoradio.NewEtcdClient() - n := node.NewRadioNode(*publicIp, client) + bwLimitBytes := float64(*bwLimit * 1000000 / 8) + n := node.NewRadioNode(*publicIp, *netDev, bwLimitBytes, client) // Set up a clean shutdown function on SIGTERM. stopch := make(chan os.Signal) diff --git a/debian/autoradio.default b/debian/autoradio.default index 530e1f05589a09f5bc4ae18357fa6c7629e60747..2d79a488fd64cba41d459f3214ee63adfcc9f4c1 100644 --- a/debian/autoradio.default +++ b/debian/autoradio.default @@ -13,9 +13,13 @@ DOMAIN="" # if your fully-qualified hostname). #PUBLIC_IP="" +# Name of the outbound network interface (if unset, we'll try to +# figure it out automatically based on the value of PUBLIC_IP). +#INTERFACE="eth0" + # Set the etcd servers to connect to (default: etcd.$DOMAIN). #ETCD_SERVER="" # Additional options that are passed to specific daemons. -RADIOD_OPTIONS="" -REDIRECTORD_OPTIONS="" +#RADIOD_OPTIONS="--bwlimit=1000" +#REDIRECTORD_OPTIONS="" diff --git a/debian/services/lib.sh b/debian/services/lib.sh index 7db958c7ffd1818ef051aefc6a9d4756ee5fd9f9..9d24bdade3599cde9f981504ad4091bc26f5870e 100644 --- a/debian/services/lib.sh +++ b/debian/services/lib.sh @@ -35,6 +35,17 @@ set_public_ip() { fi } +set_interface() { + # Try to guess the network interface for PUBLIC_IP. + if [ -z "${INTERFACE}" ]; then + INTERFACE=$(/sbin/ip -o addr show to ${PUBLIC_IP} | awk '{print $2}') + fi + if [ -z "${INTERFACE}" ]; then + echo "Warning: could not autodetect network interface for ${PUBLIC_IP}" 1>&2 + INTERFACE=eth0 + fi +} + set_etcd_params() { local default_etcd_server="etcd.${DOMAIN}" ETCD_SERVER="${ETCD_SERVER:-${default_etcd_server}}" diff --git a/debian/services/radiod b/debian/services/radiod index a407f2e146d2896c8224dbd1c5ef6bafa4083eea..8ad92cc99c07294423e1b8a7b8ac4581dbd25560 100755 --- a/debian/services/radiod +++ b/debian/services/radiod @@ -3,8 +3,9 @@ . /usr/share/autoradio/lib.sh set_public_ip +set_interface set_etcd_params exec chpst -u icecast2 \ - radiod --ip=${PUBLIC_IP} ${ETCD_OPTIONS} ${RADIOD_OPTIONS} \ + radiod --ip=${PUBLIC_IP} --interface=${INTERFACE} ${ETCD_OPTIONS} ${RADIOD_OPTIONS} \ 2>&1 diff --git a/fe/loadbalancing.go b/fe/loadbalancing.go index 9ecca94013df2a2f7f231b7e05c46310e14907f6..a6676947de4153610c899f9dc6e05c87cb74dfcb 100644 --- a/fe/loadbalancing.go +++ b/fe/loadbalancing.go @@ -18,15 +18,15 @@ type LoadBalancingPolicy interface { type leastListenersPolicy struct{} func (llp leastListenersPolicy) GetNode(nodes []*autoradio.NodeStatus) *autoradio.NodeStatus { - minIdx := 0 + var minNode *autoradio.NodeStatus min := 1000000 - for i, n := range nodes { + for _, n := range nodes { if listeners := n.NumListeners(); listeners < min { - minIdx = i + minNode = n min = listeners } } - return nodes[minIdx] + return minNode } // Simple load balancing policy that selects a node randomly, @@ -48,12 +48,32 @@ func (wlp weightedListenersPolicy) GetNode(nodes []*autoradio.NodeStatus) *autor return result.Item.(*autoradio.NodeStatus) } +// Load balancing policy that will send requests to the backend whose +// bandwidth utilization is the lowest. +type leastBandwidthPolicy struct{} + +func (lbp leastBandwidthPolicy) GetNode(nodes []*autoradio.NodeStatus) *autoradio.NodeStatus { + // Capping usage at 100%, nodes above that won't be returned + // at all. + var minNode *autoradio.NodeStatus + minUtil := float64(1.0) + for _, n := range nodes { + if n.BandwidthUsage < minUtil { + minUtil = n.BandwidthUsage + minNode = n + } + } + return minNode +} + func getNamedLoadBalancingPolicy(lbpolicy string) LoadBalancingPolicy { switch lbpolicy { case "leastloaded", "ll": return &leastListenersPolicy{} case "weighted", "wl": return &weightedListenersPolicy{} + case "leastbandwidth", "lb": + return &leastBandwidthPolicy{} } log.Fatalf("Unknown load-balancing policy '%s'", lbpolicy) return nil diff --git a/node/bwmonitor/bwmonitor.go b/node/bwmonitor/bwmonitor.go new file mode 100644 index 0000000000000000000000000000000000000000..be401144d34fd313e4ed236f3a191476095b6c81 --- /dev/null +++ b/node/bwmonitor/bwmonitor.go @@ -0,0 +1,101 @@ +package bwmonitor + +import ( + "bufio" + "errors" + "os" + "regexp" + "strconv" + "strings" + "time" +) + +var spacesRx = regexp.MustCompile(`\s+`) + +type netDevCounts struct { + bytesSent int +} + +func getBytesSentForDevice(dev string) (uint64, error) { + file, err := os.Open("/proc/net/dev") + if err != nil { + return 0, err + } + defer file.Close() + + input := bufio.NewScanner(file) + for input.Scan() { + line := spacesRx.Split(strings.TrimSpace(input.Text()), -1) + curDev := line[0][0 : len(line[0])-1] + if curDev == dev { + bytes, err := strconv.ParseUint(line[9], 10, 64) + if err != nil { + return 0, err + } + return bytes, nil + } + } + return 0, errors.New("device not found") +} + +type BandwidthMonitor struct { + device string + counter uint64 + stamp time.Time + period time.Duration + stop chan bool + rate float64 +} + +func NewBandwidthMonitor(dev string) *BandwidthMonitor { + bw := &BandwidthMonitor{ + device: dev, + stamp: time.Now(), + period: 30 * time.Second, + stop: make(chan bool), + } + go bw.run() + return bw +} + +func (bw *BandwidthMonitor) Close() { + close(bw.stop) +} + +func (bw *BandwidthMonitor) GetRate() float64 { + return bw.rate +} + +func (bw *BandwidthMonitor) run() { + t := time.NewTicker(bw.period) + for { + select { + case <-t.C: + if c, err := getBytesSentForDevice(bw.device); err == nil { + now := time.Now() + bw.rate = float64(c - bw.counter) / now.Sub(bw.stamp).Seconds() + bw.counter = c + bw.stamp = now + } + case <-bw.stop: + return + } + } +} + +type BandwidthUsageMonitor struct { + *BandwidthMonitor + + bwLimit float64 +} + +func NewBandwidthUsageMonitor(dev string, bwLimit float64) *BandwidthUsageMonitor { + return &BandwidthUsageMonitor{ + NewBandwidthMonitor(dev), + bwLimit, + } +} + +func (bu *BandwidthUsageMonitor) GetUsage() float64 { + return bu.GetRate() / bu.bwLimit +} diff --git a/node/node.go b/node/node.go index 8fa36f7f71ada4189cadaa25d67b112e3f1bbd06..6ba8ba398f666ae6be38654aeb17b644163b67cf 100644 --- a/node/node.go +++ b/node/node.go @@ -10,6 +10,7 @@ import ( "git.autistici.org/ale/autoradio" "git.autistici.org/ale/autoradio/masterelection" + "git.autistici.org/ale/autoradio/node/bwmonitor" "git.autistici.org/ale/autoradio/third_party/github.com/coreos/go-etcd/etcd" ) @@ -191,12 +192,13 @@ type RadioNode struct { me *masterelection.MasterElection watcher *ConfigSyncer icecast *IcecastController + bw *bwmonitor.BandwidthUsageMonitor livenessTtl uint64 upch chan bool stop chan bool } -func NewRadioNode(ip string, client *etcd.Client) *RadioNode { +func NewRadioNode(ip, netDev string, bwLimit float64, client *etcd.Client) *RadioNode { config := NewClusterConfig() // Network updates trigger icecast reconfiguration. This @@ -229,6 +231,7 @@ func NewRadioNode(ip string, client *etcd.Client) *RadioNode { watcher: NewConfigSyncer(client, config, upch, stopch), icecast: NewIcecastController(ip, stopch), livenessTtl: 2, + bw: bwmonitor.NewBandwidthUsageMonitor(netDev, bwLimit), upch: upch, stop: stopch, } @@ -245,9 +248,10 @@ func (rc *RadioNode) presence() { // Build our NodeStatus. icecastSt := rc.icecast.GetStatus() nodeSt := autoradio.NodeStatus{ - IP: rc.ip, - IcecastUp: icecastSt.Up, - Mounts: icecastSt.Mounts, + IP: rc.ip, + IcecastUp: icecastSt.Up, + Mounts: icecastSt.Mounts, + BandwidthUsage: rc.bw.GetUsage(), } // Update our node entry in the database.