Select Git revision
node_test.go
-
ale authored
Split networking is where nodes communicate among themselves on a private network (for example when behind a NAT infrastructure). This change adds the --internal-ip option to radiod, that advertises a separate IP for internal communication.
ale authoredSplit networking is where nodes communicate among themselves on a private network (for example when behind a NAT infrastructure). This change adds the --internal-ip option to radiod, that advertises a separate IP for internal communication.
node_test.go 5.66 KiB
package node
import (
"fmt"
"log"
"net"
"testing"
"time"
"git.autistici.org/ale/autoradio"
"git.autistici.org/ale/autoradio/coordination/etcdtest"
)
type mockController struct {
mounts []*autoradio.Mount
isMaster bool
masterAddr net.IP
numUpdates int
}
func (m *mockController) Run(stop chan bool) {
<-stop
}
func (m *mockController) Update(conf *clusterConfig, isMaster bool, masterAddr net.IP) error {
m.mounts = conf.ListMounts()
m.isMaster = isMaster
m.masterAddr = masterAddr
m.numUpdates++
return nil
}
func (m *mockController) GetStatus() *icecastStatus {
return &icecastStatus{Up: true}
}
type mockTranscoder struct {
startCount int
stopCount int
}
func (t *mockTranscoder) Start() {
t.startCount++
}
func (t *mockTranscoder) Stop() {
t.stopCount++
}
func (t *mockTranscoder) Reset() {
t.startCount = 0
t.stopCount = 0
}
var globalMockTranscoder = &mockTranscoder{}
func newMockTranscoder(params *liquidsoapParams) (transcodingController, error) {
return globalMockTranscoder, nil
}
func startTestNodes(n int, etcd autoradio.EtcdClient) []*RadioNode {
var nodes []*RadioNode
for i := 0; i < n; i++ {
node := NewRadioNode(
fmt.Sprintf("node%d", i+1),
[]net.IP{net.ParseIP(fmt.Sprintf("127.0.0.%d", i+1))},
nil,
"eth0",
1000,
1000,
etcd)
node.icecast = &mockController{}
node.transcoderFn = newMockTranscoder
node.reloadDelay = time.Duration(0)
node.Start()
node.Log.SetPrefix(fmt.Sprintf("node%d: ", i+1))
node.me.Log.SetPrefix(fmt.Sprintf("node%d: masterelection: ", i+1))
nodes = append(nodes, node)
}
time.Sleep(1100 * time.Millisecond)
return nodes
}
func waitTestNodes(nodes []*RadioNode) {
for _, n := range nodes {
n.Wait()
}
}
func loadTestData(etcd autoradio.EtcdClient) {
etcd.Set(autoradio.MountPrefix+"test.ogg",
`{"Name": "/test.ogg", "Username": "source1", "Password": "foo"}`,
86400)
}
func TestRadioNode_MasterElection(t *testing.T) {
globalMockTranscoder.Reset()
etcd := etcdtest.NewClient()
loadTestData(etcd)
nodes := startTestNodes(3, etcd)
// Force master transitions by shutting down the nodes one by
// one as the become the master, and verify that there is a
// single master among the remaining ones.
curNodes := nodes
for len(curNodes) > 0 {
var tmp []*RadioNode
masterIdx := -1
numMasters := 0
for i, n := range curNodes {
if n.me.IsMaster() {
numMasters++
masterIdx = i
} else {
tmp = append(tmp, n)
}
}
if numMasters != 1 {
t.Fatalf("masters=%d (expected 1): %#v", numMasters, curNodes)
}
curNodes[masterIdx].Stop()
curNodes[masterIdx].Wait()
curNodes = tmp
time.Sleep(20 * time.Millisecond)
}
// Transcoders should not have been started.
if globalMockTranscoder.startCount > 0 {
t.Fatal("transcoders were started unexpectedly")
}
}
func TestRadioNode_ConfigChangePropagation(t *testing.T) {
etcd := etcdtest.NewClient()
loadTestData(etcd)
nodes := startTestNodes(3, etcd)
// Wait a bit and modify the stream. Check that the change
// propagates correctly.
time.Sleep(100 * time.Millisecond)
etcd.Set(autoradio.MountPrefix+"test.ogg",
`{"Name": "/test.ogg", "Username": "source2", "Password": "bar"}`,
86400)
time.Sleep(100 * time.Millisecond)
for i := 0; i < 3; i++ {
username := nodes[i].config.GetMount("/test.ogg").Username
if username != "source2" {
t.Errorf("change did not propagate to node %d", i+1)
}
}
// Verify that the controller has received the updates. There
// should be two of them: the initial config load, and the
// test update above.
for i := 0; i < 3; i++ {
numUpdates := nodes[i].icecast.(*mockController).numUpdates
if numUpdates != 2 {
t.Errorf("node %d received %d updates (expected 2)", i+1, numUpdates)
}
}
log.Printf("cleanup")
for _, n := range nodes {
n.Stop()
n.Wait()
}
}
func TestRadioNode_UpdatesDoNotTriggerIfNothingChanged(t *testing.T) {
etcd := etcdtest.NewClient()
loadTestData(etcd)
node := startTestNodes(1, etcd)[0]
for i := 0; i < 10; i++ {
time.Sleep(100 * time.Millisecond)
etcd.Set(autoradio.MountPrefix+"test.ogg",
`{"Name": "/test.ogg", "Username": "source1", "Password": "foo"}`,
86400)
}
numUpdates := node.icecast.(*mockController).numUpdates
if numUpdates != 1 {
t.Errorf("node received %d updates (expected 1)", numUpdates)
}
}
func TestRadioNode_TranscoderMasterElection(t *testing.T) {
globalMockTranscoder.Reset()
etcd := etcdtest.NewClient()
loadTestData(etcd)
// Load a transcoding mount.
etcd.Set(autoradio.MountPrefix+"test.mp3",
`{"Name": "/test.mp3", "Username": "source2", "Password": "foo",
"Transcoding": {"BitRate": 64, "SampleRate": 22050}}`,
86400)
nodes := startTestNodes(3, etcd)
time.Sleep(500 * time.Millisecond)
if globalMockTranscoder.startCount != 1 {
t.Errorf("transcoder was started more than once (%d)", globalMockTranscoder.startCount)
}
log.Printf("cleanup")
for _, n := range nodes {
n.Stop()
n.Wait()
}
// At the end, the transcoder must have been started and
// stopped the same number of times.
if globalMockTranscoder.startCount != globalMockTranscoder.stopCount {
t.Errorf("transcoder was started/stopped an unequal number of times: start=%d, stop=%d", globalMockTranscoder.startCount, globalMockTranscoder.stopCount)
}
}
func TestRadioNode_Presence(t *testing.T) {
etcd := etcdtest.NewClient()
loadTestData(etcd)
*nodeHeartbeat = 1
nodes := startTestNodes(3, etcd)
client := autoradio.NewClient(etcd)
time.Sleep(2000 * time.Millisecond)
defer func() {
for _, n := range nodes {
n.Stop()
n.Wait()
}
}()
result, err := client.GetNodes()
if err != nil {
t.Fatal(err)
}
if len(result) != 3 {
t.Fatalf("Some nodes did not report presence: %+v", result)
}
}