Skip to content
Snippets Groups Projects
Select Git revision
  • renovate/github.com-miekg-dns-1.x
  • renovate/google.golang.org-protobuf-1.x
  • renovate/go.etcd.io-etcd-server-v3-3.x
  • renovate/go.etcd.io-etcd-client-v3-3.x
  • renovate/golang.org-x-crypto-0.x
  • renovate/github.com-prometheus-common-0.x
  • renovate/google.golang.org-grpc-1.x
  • renovate/github.com-prometheus-client_golang-1.x
  • renovate/golang.org-x-sync-0.x
  • renovate/github.com-lpar-gzipped-v2-2.x
  • master default
  • httplog
12 results

node_test.go

Blame
    • ale's avatar
      ed350cdb
      allow split networking · ed350cdb
      ale authored
      Split networking is where nodes communicate among themselves on a
      private network (for example when behind a NAT infrastructure). This
      change adds the --internal-ip option to radiod, that advertises a
      separate IP for internal communication.
      ed350cdb
      History
      allow split networking
      ale authored
      Split networking is where nodes communicate among themselves on a
      private network (for example when behind a NAT infrastructure). This
      change adds the --internal-ip option to radiod, that advertises a
      separate IP for internal communication.
    node_test.go 5.66 KiB
    package node
    
    import (
    	"fmt"
    	"log"
    	"net"
    	"testing"
    	"time"
    
    	"git.autistici.org/ale/autoradio"
    	"git.autistici.org/ale/autoradio/coordination/etcdtest"
    )
    
    type mockController struct {
    	mounts     []*autoradio.Mount
    	isMaster   bool
    	masterAddr net.IP
    	numUpdates int
    }
    
    func (m *mockController) Run(stop chan bool) {
    	<-stop
    }
    
    func (m *mockController) Update(conf *clusterConfig, isMaster bool, masterAddr net.IP) error {
    	m.mounts = conf.ListMounts()
    	m.isMaster = isMaster
    	m.masterAddr = masterAddr
    	m.numUpdates++
    	return nil
    }
    
    func (m *mockController) GetStatus() *icecastStatus {
    	return &icecastStatus{Up: true}
    }
    
    type mockTranscoder struct {
    	startCount int
    	stopCount  int
    }
    
    func (t *mockTranscoder) Start() {
    	t.startCount++
    }
    
    func (t *mockTranscoder) Stop() {
    	t.stopCount++
    }
    
    func (t *mockTranscoder) Reset() {
    	t.startCount = 0
    	t.stopCount = 0
    }
    
    var globalMockTranscoder = &mockTranscoder{}
    
    func newMockTranscoder(params *liquidsoapParams) (transcodingController, error) {
    	return globalMockTranscoder, nil
    }
    
    func startTestNodes(n int, etcd autoradio.EtcdClient) []*RadioNode {
    	var nodes []*RadioNode
    
    	for i := 0; i < n; i++ {
    		node := NewRadioNode(
    			fmt.Sprintf("node%d", i+1),
    			[]net.IP{net.ParseIP(fmt.Sprintf("127.0.0.%d", i+1))},
    			nil,
    			"eth0",
    			1000,
    			1000,
    			etcd)
    		node.icecast = &mockController{}
    		node.transcoderFn = newMockTranscoder
    		node.reloadDelay = time.Duration(0)
    		node.Start()
    		node.Log.SetPrefix(fmt.Sprintf("node%d: ", i+1))
    		node.me.Log.SetPrefix(fmt.Sprintf("node%d: masterelection: ", i+1))
    		nodes = append(nodes, node)
    	}
    	time.Sleep(1100 * time.Millisecond)
    	return nodes
    }
    
    func waitTestNodes(nodes []*RadioNode) {
    	for _, n := range nodes {
    		n.Wait()
    	}
    }
    
    func loadTestData(etcd autoradio.EtcdClient) {
    	etcd.Set(autoradio.MountPrefix+"test.ogg",
    		`{"Name": "/test.ogg", "Username": "source1", "Password": "foo"}`,
    		86400)
    }
    
    func TestRadioNode_MasterElection(t *testing.T) {
    	globalMockTranscoder.Reset()
    	etcd := etcdtest.NewClient()
    	loadTestData(etcd)
    	nodes := startTestNodes(3, etcd)
    
    	// Force master transitions by shutting down the nodes one by
    	// one as the become the master, and verify that there is a
    	// single master among the remaining ones.
    	curNodes := nodes
    	for len(curNodes) > 0 {
    		var tmp []*RadioNode
    		masterIdx := -1
    		numMasters := 0
    		for i, n := range curNodes {
    			if n.me.IsMaster() {
    				numMasters++
    				masterIdx = i
    			} else {
    				tmp = append(tmp, n)
    			}
    		}
    		if numMasters != 1 {
    			t.Fatalf("masters=%d (expected 1): %#v", numMasters, curNodes)
    		}
    		curNodes[masterIdx].Stop()
    		curNodes[masterIdx].Wait()
    		curNodes = tmp
    
    		time.Sleep(20 * time.Millisecond)
    	}
    
    	// Transcoders should not have been started.
    	if globalMockTranscoder.startCount > 0 {
    		t.Fatal("transcoders were started unexpectedly")
    	}
    }
    
    func TestRadioNode_ConfigChangePropagation(t *testing.T) {
    	etcd := etcdtest.NewClient()
    	loadTestData(etcd)
    	nodes := startTestNodes(3, etcd)
    
    	// Wait a bit and modify the stream. Check that the change
    	// propagates correctly.
    	time.Sleep(100 * time.Millisecond)
    	etcd.Set(autoradio.MountPrefix+"test.ogg",
    		`{"Name": "/test.ogg", "Username": "source2", "Password": "bar"}`,
    		86400)
    	time.Sleep(100 * time.Millisecond)
    
    	for i := 0; i < 3; i++ {
    		username := nodes[i].config.GetMount("/test.ogg").Username
    		if username != "source2" {
    			t.Errorf("change did not propagate to node %d", i+1)
    		}
    	}
    
    	// Verify that the controller has received the updates. There
    	// should be two of them: the initial config load, and the
    	// test update above.
    	for i := 0; i < 3; i++ {
    		numUpdates := nodes[i].icecast.(*mockController).numUpdates
    		if numUpdates != 2 {
    			t.Errorf("node %d received %d updates (expected 2)", i+1, numUpdates)
    		}
    	}
    
    	log.Printf("cleanup")
    	for _, n := range nodes {
    		n.Stop()
    		n.Wait()
    	}
    }
    
    func TestRadioNode_UpdatesDoNotTriggerIfNothingChanged(t *testing.T) {
    	etcd := etcdtest.NewClient()
    	loadTestData(etcd)
    	node := startTestNodes(1, etcd)[0]
    
    	for i := 0; i < 10; i++ {
    		time.Sleep(100 * time.Millisecond)
    		etcd.Set(autoradio.MountPrefix+"test.ogg",
    			`{"Name": "/test.ogg", "Username": "source1", "Password": "foo"}`,
    			86400)
    	}
    
    	numUpdates := node.icecast.(*mockController).numUpdates
    	if numUpdates != 1 {
    		t.Errorf("node received %d updates (expected 1)", numUpdates)
    	}
    }
    
    func TestRadioNode_TranscoderMasterElection(t *testing.T) {
    	globalMockTranscoder.Reset()
    	etcd := etcdtest.NewClient()
    	loadTestData(etcd)
    
    	// Load a transcoding mount.
    	etcd.Set(autoradio.MountPrefix+"test.mp3",
    		`{"Name": "/test.mp3", "Username": "source2", "Password": "foo",
                      "Transcoding": {"BitRate": 64, "SampleRate": 22050}}`,
    		86400)
    
    	nodes := startTestNodes(3, etcd)
    	time.Sleep(500 * time.Millisecond)
    	if globalMockTranscoder.startCount != 1 {
    		t.Errorf("transcoder was started more than once (%d)", globalMockTranscoder.startCount)
    	}
    
    	log.Printf("cleanup")
    	for _, n := range nodes {
    		n.Stop()
    		n.Wait()
    	}
    
    	// At the end, the transcoder must have been started and
    	// stopped the same number of times.
    	if globalMockTranscoder.startCount != globalMockTranscoder.stopCount {
    		t.Errorf("transcoder was started/stopped an unequal number of times: start=%d, stop=%d", globalMockTranscoder.startCount, globalMockTranscoder.stopCount)
    	}
    }
    
    func TestRadioNode_Presence(t *testing.T) {
    	etcd := etcdtest.NewClient()
    	loadTestData(etcd)
    	*nodeHeartbeat = 1
    	nodes := startTestNodes(3, etcd)
    	client := autoradio.NewClient(etcd)
    
    	time.Sleep(2000 * time.Millisecond)
    
    	defer func() {
    		for _, n := range nodes {
    			n.Stop()
    			n.Wait()
    		}
    	}()
    
    	result, err := client.GetNodes()
    	if err != nil {
    		t.Fatal(err)
    	}
    	if len(result) != 3 {
    		t.Fatalf("Some nodes did not report presence: %+v", result)
    	}
    }