node.go 13 KB
Newer Older
ale's avatar
ale committed
1
package node
2 3

import (
4
	"bytes"
5
	"encoding/json"
6
	"flag"
7
	"log"
ale's avatar
ale committed
8
	"net"
ale's avatar
ale committed
9
	"os"
10 11 12 13
	"strings"
	"sync"
	"time"

ale's avatar
ale committed
14
	"git.autistici.org/ale/autoradio"
15 16 17
	"git.autistici.org/ale/autoradio/coordination/masterelection"
	"git.autistici.org/ale/autoradio/coordination/presence"
	"git.autistici.org/ale/autoradio/coordination/watcher"
ale's avatar
ale committed
18
	"git.autistici.org/ale/autoradio/instrumentation"
19
	"git.autistici.org/ale/autoradio/node/bwmonitor"
20 21
)

ale's avatar
ale committed
22
var (
23
	masterElectionTTL = flag.Int("master-election-ttl", 5, "TTL for the master election protocol (s)")
24 25
	nodeHeartbeat     = flag.Int("heartbeat", 3, "Period for the node presence heartbeat (s)")

ale's avatar
ale committed
26 27 28 29 30
	icecastReloadErrors = instrumentation.NewCounter("icecast.reload_errors")
	icecastReloads      = instrumentation.NewCounter("icecast.reload")
	configIndex         = instrumentation.NewGauge("config.etcd_index")
)

31
func trigger(c chan struct{}) {
32
	select {
33
	case c <- struct{}{}:
34 35 36 37
	default:
	}
}

ale's avatar
ale committed
38 39 40
// Converts an etcd key (a path) to the Icecast mount path. Removes
// mountPrefix from the beginning of the path, keeping the leading
// slash.
41
func keyToMount(key string) string {
ale's avatar
ale committed
42
	return key[len(autoradio.MountPrefix)-1:]
43 44
}

45 46
// In-memory representation of the overall configuration (basically
// just a list of the known mounts).
ale's avatar
ale committed
47
type clusterConfig struct {
ale's avatar
ale committed
48
	mounts map[string]*autoradio.Mount
49 50 51
	lock   sync.Mutex
}

ale's avatar
ale committed
52 53
func newClusterConfig() *clusterConfig {
	return &clusterConfig{
ale's avatar
ale committed
54
		mounts: make(map[string]*autoradio.Mount),
55 56 57
	}
}

58 59 60 61
// Delete implements the watcher.Syncable interface.
func (c *clusterConfig) Delete(key string) {
}

62
// TODO: remove?
ale's avatar
ale committed
63
func (c *clusterConfig) GetMount(name string) *autoradio.Mount {
64 65 66 67 68
	c.lock.Lock()
	defer c.lock.Unlock()
	return c.mounts[name]
}

ale's avatar
ale committed
69
func (c *clusterConfig) ListMounts() []*autoradio.Mount {
70 71
	c.lock.Lock()
	defer c.lock.Unlock()
ale's avatar
ale committed
72
	result := make([]*autoradio.Mount, 0, len(c.mounts))
73 74 75 76
	for _, m := range c.mounts {
		result = append(result, m)
	}
	return result
77 78
}

79 80 81
// Keeps the in-memory service configuration (clusterConfig) in sync
// with the etcd database. An update channel is triggered whenever the
// data changes.
ale's avatar
ale committed
82
type configWatcher struct {
83
	*clusterConfig
84
	client autoradio.EtcdClient
85
	update chan struct{}
86 87
}

88 89 90 91 92 93
func newConfigWatcher(client autoradio.EtcdClient, config *clusterConfig, update chan struct{}) *watcher.Syncer {
	return watcher.NewSyncer(client, autoradio.MountPrefix, &configWatcher{
		clusterConfig: config,
		client:        client,
		update:        update,
	})
94 95
}

96 97 98 99 100
// Set implements the watcher.Syncable interface. It will update the
// clusterConfig only if the data actually changed.
func (c *configWatcher) Set(key, value string, index uint64) {
	name := keyToMount(key)

ale's avatar
ale committed
101 102
	var m autoradio.Mount
	if err := json.NewDecoder(strings.NewReader(value)).Decode(&m); err != nil {
103 104
		log.Printf("error updating mount %s [@%d]: corrupted data: %v", name, index, err)
		return
105 106
	}

107 108 109 110 111
	c.lock.Lock()
	defer c.lock.Unlock()
	if prev, ok := c.mounts[name]; ok && prev.Equal(&m) {
		// No changes.
		return
ale's avatar
ale committed
112 113
	}

114 115 116 117
	c.mounts[name] = &m
	log.Printf("updated mount %s [@%d]: %s", name, index, value)
	// trigger update here
	trigger(c.update)
118 119
}

120 121 122 123 124 125 126 127
// Delete implements the watcher.Syncable interface.
func (c *configWatcher) Delete(key string) {
	name := keyToMount(key)

	c.lock.Lock()
	defer c.lock.Unlock()
	if _, ok := c.mounts[name]; ok {
		delete(c.mounts, name)
ale's avatar
ale committed
128
	}
129 130
	// trigger update here
	trigger(c.update)
131 132
}

ale's avatar
ale committed
133 134 135 136
// Private interfaces for process controllers. These are used to
// replace real processes with mocks while testing.
type controller interface {
	Update(*clusterConfig, bool, net.IP) error
ale's avatar
ale committed
137
	GetStatus() *icecastStatus
ale's avatar
ale committed
138 139 140 141 142 143 144 145 146 147 148
	Run(chan bool)
}

type transcodingController interface {
	Start()
	Stop()
}

// Factory for transcodingControllers.
type transcodingControllerFunc func(*liquidsoapParams) (transcodingController, error)

149
// RadioNode is an active streaming node, managing the local icecast server.
150
type RadioNode struct {
151 152 153
	wg     sync.WaitGroup
	client autoradio.EtcdClient

ale's avatar
ale committed
154 155 156 157
	name        string
	ips         []net.IP
	internalIPs []net.IP
	config      *clusterConfig
158 159 160 161 162

	me     *masterelection.MasterElection
	syncer *watcher.Syncer
	bw     *bwmonitor.BandwidthUsageMonitor

ale's avatar
ale committed
163
	icecast      controller
164
	maxListeners int
165 166 167 168

	// Node presence heartbeat.
	presence *presence.Presence

ale's avatar
ale committed
169
	// Rate limiting for Icecast daemon restarts.
170 171 172 173 174 175
	reloadDelay time.Duration

	// Generator for transcodingControllers. Exposed as a member
	// so that it can be stubbed out during tests.
	transcoderFn transcodingControllerFunc

ale's avatar
ale committed
176 177 178 179 180
	// All currently active transcoders (locked due to the
	// async debugging handler).
	transcodersMx sync.Mutex
	transcoders   map[string]*transcoder

181 182 183 184 185 186 187 188 189
	// A note on channel types used for signaling: while I
	// personally prefer struct{} chans, the etcd interface for
	// Watch makes it convenient to use bool stop channels
	// throughout the application.
	upch chan struct{}
	stop chan bool

	// Logger for debug messages and state changes.
	Log *log.Logger
190 191
}

192
// NewRadioNode creates and initializes a new autoradio node.
ale's avatar
ale committed
193
func NewRadioNode(name string, ips, internalIPs []net.IP, netDev string, bwLimit float64, maxListeners int, client autoradio.EtcdClient) *RadioNode {
ale's avatar
ale committed
194 195 196
	// Global 'stop' channel.
	stopch := make(chan bool)

197
	// Network updates trigger icecast reconfiguration. This
ale's avatar
ale committed
198
	// channel is used as an event signal.
199
	upch := make(chan struct{}, 1)
200 201

	// MasterElection changes trigger an update.
202
	mech := make(chan masterelection.State)
203
	go func() {
ale's avatar
ale committed
204
		for range mech {
205
			trigger(upch)
206 207 208
		}
	}()

ale's avatar
ale committed
209 210
	// Location information advertised when this node is master.
	minfo := &autoradio.MasterNodeInfo{
ale's avatar
ale committed
211 212 213
		Name:       name,
		IP:         ips,
		InternalIP: internalIPs,
ale's avatar
ale committed
214 215 216 217 218 219
	}
	minfodata, err := json.Marshal(minfo)
	if err != nil {
		log.Fatal(err)
	}

220 221 222 223 224 225
	// Create the RadioNode and all the auxiliary objects it
	// contains. Note that the per-node icecast client limit is
	// actually set to a value greater than maxListeners, to allow
	// for some headroom in the front-end traffic control
	// computations (if everything goes well, connections should
	// never be rejected by Icecast).
226 227
	config := newClusterConfig()
	rc := &RadioNode{
ale's avatar
ale committed
228 229 230 231 232
		config:      config,
		name:        name,
		ips:         ips,
		internalIPs: internalIPs,
		client:      client,
233
		me: masterelection.New(
234
			client,
ale's avatar
ale committed
235
			autoradio.MasterElectionPath,
ale's avatar
ale committed
236
			string(minfodata),
237
			uint64(*masterElectionTTL),
ale's avatar
ale committed
238
			mech),
239
		syncer:  newConfigWatcher(client, config, upch),
ale's avatar
ale committed
240
		icecast: newIcecastController(name, maxListeners*2),
ale's avatar
ale committed
241 242 243
		transcoderFn: func(p *liquidsoapParams) (transcodingController, error) {
			return newLiquidsoap(p)
		},
ale's avatar
ale committed
244
		transcoders:  make(map[string]*transcoder),
245 246 247 248 249 250
		reloadDelay:  1000 * time.Millisecond,
		bw:           bwmonitor.NewBandwidthUsageMonitor(netDev, bwLimit),
		maxListeners: maxListeners,
		upch:         upch,
		stop:         stopch,
		Log:          log.New(os.Stderr, "node: ", 0),
251
	}
252 253
	rc.presence = presence.New(client, autoradio.NodePrefix, rc.getNodeStatus, uint64(*nodeHeartbeat))
	return rc
254 255
}

256 257 258 259 260 261
func (rc *RadioNode) getNodeStatus() string {
	// Build our NodeStatus.
	icecastStatus := rc.icecast.GetStatus()
	nodeStatus := autoradio.NodeStatus{
		Name:           rc.name,
		IP:             rc.ips,
ale's avatar
ale committed
262
		InternalIP:     rc.internalIPs,
263 264 265 266
		IcecastUp:      icecastStatus.Up,
		Mounts:         icecastStatus.Mounts,
		BandwidthUsage: rc.bw.GetUsage(),
		MaxListeners:   rc.maxListeners,
267
	}
268 269 270 271 272

	// Update our node entry in the database.
	var buf bytes.Buffer
	json.NewEncoder(&buf).Encode(&nodeStatus)
	return buf.String()
273 274
}

ale's avatar
ale committed
275 276 277 278 279
// Get a valid internal IP address for the current master node (to be
// passed to Icecast). Since we don't really know much about network
// topology, just pick the first IP address associated with the master
// node.
func (rc *RadioNode) getMasterInternalAddr() net.IP {
280
	var info autoradio.MasterNodeInfo
ale's avatar
ale committed
281
	if err := json.NewDecoder(strings.NewReader(rc.me.GetMasterData())).Decode(&info); err != nil || len(info.GetInternalIP()) == 0 {
282 283
		return nil
	}
ale's avatar
ale committed
284
	return info.GetInternalIP()[0]
285 286
}

ale's avatar
ale committed
287 288 289 290
// Reload the icecast configuration when needed.
func (rc *RadioNode) updater(stop chan bool) {
	// Wait an instant to give a chance to the services to
	// initialize properly.
ale's avatar
ale committed
291 292
	time.Sleep(200 * time.Millisecond)

ale's avatar
ale committed
293 294 295
	// Keep track of all the configured transcoders (and clean
	// them up at the end).
	defer func() {
ale's avatar
ale committed
296
		for _, t := range rc.transcoders {
ale's avatar
ale committed
297 298 299 300 301
			t.Stop()
		}
	}()

	rc.Log.Printf("starting updater")
302 303 304
	for {
		select {
		case <-rc.upch:
305 306 307 308 309 310 311
			// We may have received an update before the
			// masterelection had time to actually elect a
			// master. In this case, skip.
			if !rc.me.Valid() {
				continue
			}

312 313
			rc.Log.Printf("updating configuration")

ale's avatar
ale committed
314
			masterAddr := rc.getMasterInternalAddr()
ale's avatar
ale committed
315 316

			// Reload the Icecast daemon.
ale's avatar
ale committed
317
			icecastReloads.Incr()
ale's avatar
ale committed
318
			if err := rc.icecast.Update(rc.config, rc.me.IsMaster(), masterAddr); err != nil {
ale's avatar
ale committed
319
				icecastReloadErrors.Incr()
320
				rc.Log.Printf("Update(): %v", err)
321 322
			}

ale's avatar
ale committed
323 324 325 326 327
			// Check the configuration for new or removed
			// transcoding mounts, and start (or stop) the
			// associated transcoder objects. We also need
			// to detect changes in the encoding params
			// and restart the transcoder if necessary.
ale's avatar
ale committed
328
			rc.transcodersMx.Lock()
ale's avatar
ale committed
329
			tmp := make(map[string]struct{})
ale's avatar
ale committed
330
			for name := range rc.transcoders {
ale's avatar
ale committed
331 332 333 334 335 336 337 338
				tmp[name] = struct{}{}
			}
			for _, m := range rc.config.ListMounts() {
				if !m.HasTranscoder() {
					continue
				}

				tparams := newLiquidsoapParams(m)
ale's avatar
ale committed
339
				cur, ok := rc.transcoders[m.Name]
ale's avatar
ale committed
340 341 342 343 344 345 346 347 348 349 350 351
				if ok {
					delete(tmp, m.Name)
					if cur.Changed(tparams) {
						cur.Stop()
						ok = false
					}
				}
				if !ok {
					if t, err := newTranscoder(tparams, rc.transcoderFn, rc.name, rc.client); err != nil {
						rc.Log.Printf("could not create transcoder: %v", err)
					} else {
						t.Start()
ale's avatar
ale committed
352
						rc.transcoders[m.Name] = t
ale's avatar
ale committed
353 354 355 356
					}
				}
			}
			for name := range tmp {
ale's avatar
ale committed
357 358
				rc.transcoders[name].Stop()
				delete(rc.transcoders, name)
ale's avatar
ale committed
359
			}
ale's avatar
ale committed
360
			rc.transcodersMx.Unlock()
ale's avatar
ale committed
361 362

			// Limit the rate of reconfigurations.
ale's avatar
ale committed
363 364 365
			if rc.reloadDelay > 0 {
				time.Sleep(rc.reloadDelay)
			}
366

ale's avatar
ale committed
367
		case <-stop:
368
			return
369 370 371
		}
	}
}
372

ale's avatar
ale committed
373 374
// Start the node. Returns immediately.
func (rc *RadioNode) Start() {
375 376 377 378 379
	// Starting the presence runner might fail - raise a fatal
	// error in that  case.
	if err := rc.presence.Start(); err != nil {
		log.Fatal(err)
	}
ale's avatar
ale committed
380 381 382 383 384 385 386 387 388 389 390 391 392

	// Start auxiliary services and event listeners on their own
	// goroutines. Put them all in a WaitGroup so we can wait for
	// their termination.
	bgfuncs := []func(chan bool){
		// Icecast updater,
		rc.updater,

		// Master election runner.
		rc.me.Run,

		// Icecast status checker.
		rc.icecast.Run,
393 394 395

		// Bandwidth monitor.
		rc.bw.Run,
ale's avatar
ale committed
396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421
	}

	for _, fn := range bgfuncs {
		rc.wg.Add(1)
		go func(fn func(stop chan bool)) {
			fn(rc.stop)
			rc.wg.Done()
		}(fn)
	}
}

// Wait until all processing associated with this node has terminated.
func (rc *RadioNode) Wait() {
	rc.wg.Wait()
	// Let's leave this around for garbage collection, otherwise
	// the in-memory etcd mock server might try to send values to
	// it due to a Watch-trigger desynchronization...
	//close(rc.upch)
}

// Run the node, waiting for termination.
func (rc *RadioNode) Run() {
	rc.Start()
	rc.Wait()
}

422 423
// Stop everything.
func (rc *RadioNode) Stop() {
424
	rc.presence.Stop()
425 426
	close(rc.stop)
}
ale's avatar
ale committed
427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469

// Transcoder just runs a master election protocol and starts
// liquidsoap for a submount whenever this node becomes the master.
// Transcoder instances can be started and stopped individually. The
// transcoding parameters set at creation time can't be changed while
// the transcoder is running (it must be stopped and restarted).
type transcoder struct {
	params     *liquidsoapParams
	nodeName   string
	client     autoradio.EtcdClient
	liquidsoap transcodingController
	stop       chan bool
	wg         sync.WaitGroup
}

func newTranscoder(params *liquidsoapParams, tfn transcodingControllerFunc, nodeName string, client autoradio.EtcdClient) (*transcoder, error) {
	l, err := tfn(params)
	if err != nil {
		return nil, err
	}
	return &transcoder{
		params:     params,
		liquidsoap: l,
		client:     client,
		nodeName:   nodeName,
		stop:       make(chan bool),
	}, nil
}

// Changed returns true if the stream parameters have changed,
// requiring a transcoder restart.
func (t *transcoder) Changed(newParams *liquidsoapParams) bool {
	return !t.params.Equal(newParams)
}

func (t *transcoder) run() {
	defer t.wg.Done()

	// The master election protocol must be stopped when the
	// transcoder terminates, so its lifecycle is fully contained
	// within the scope of this function.
	update := make(chan masterelection.State)
	mestop := make(chan bool)
470
	me := masterelection.New(
ale's avatar
ale committed
471 472 473
		t.client,
		autoradio.TranscoderMasterElectionBase+t.params.TargetMount,
		t.nodeName,
474
		uint64(*masterElectionTTL),
ale's avatar
ale committed
475 476 477 478 479 480 481 482
		update)
	go me.Run(mestop)
	defer close(mestop)

	running := false
	for {
		select {
		case state := <-update:
483
			if state.Role == masterelection.RoleMaster {
ale's avatar
ale committed
484 485 486 487
				t.liquidsoap.Start()
				running = true
			} else if running {
				t.liquidsoap.Stop()
488
				running = false
ale's avatar
ale committed
489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507
			}
		case <-t.stop:
			if running {
				t.liquidsoap.Stop()
			}
			return
		}
	}
}

func (t *transcoder) Start() {
	t.wg.Add(1)
	go t.run()
}

func (t *transcoder) Stop() {
	close(t.stop)
	t.wg.Wait()
}