Commit b49f402c authored by ale's avatar ale

implement stream transcoding

Adds optional transcoding parameters to a Mount. Each node will then
start a master election for every transcoded stream, and the master will
run liquidsoap to re-encode the data. Transcoding mounts are identical
in other respects to normal mounts, except that liquidsoap acts as a
source for them.
parent 98800767
......@@ -6,6 +6,7 @@ import (
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"net"
"strings"
"sync"
......@@ -23,9 +24,10 @@ const (
// rolling restart of the cluster will then seamlessly cause a
// transition to the new consensus (the cluster will be
// partitioned in the meantime).
ABIVersion = "2"
MasterElectionPath = "/icecast/" + ABIVersion + "/cluster/master"
NodePrefix = "/icecast/" + ABIVersion + "/nodes/"
ABIVersion = "3"
MasterElectionPath = "/icecast/" + ABIVersion + "/cluster/master"
TranscoderMasterElectionBase = "/icecast/" + ABIVersion + "/transcode"
NodePrefix = "/icecast/" + ABIVersion + "/nodes/"
IcecastPort = 8000
IcecastMountPrefix = "/_stream"
......@@ -36,6 +38,37 @@ var (
ErrIsFile = errors.New("key is a file")
)
// Encoding parameters used to re-encode a stream.
type EncodingParams struct {
// Path to the source mountpoint.
SourceName string
// Parameters for the transcoded stream. The value format is
// anything that liquidsoap will accept in its 'output'
// directive.
Format string
BitRate int
SampleRate int
Channels int
Quality float64
}
func (p *EncodingParams) Valid() error {
if p.Format == "" {
return errors.New("format not specified")
}
if p.SampleRate == 0 {
return errors.New("sample rate not specified")
}
if p.BitRate == 0 && p.Quality == 0 {
return errors.New("either bitrate or quality must be specified")
}
if p.Channels < 1 {
return errors.New("bad number of channels")
}
return nil
}
// A mountpoint for a stream.
type Mount struct {
// Name (path to the mountpoint).
......@@ -55,16 +88,45 @@ type Mount struct {
// Fallback stream name (optional).
Fallback string
// If Transcoding is non-nil, this mountpoint represents a
// transcoded stream.
Transcoding *EncodingParams
}
func (m *Mount) Valid() error {
if !strings.HasPrefix(m.Name, "/") {
return errors.New("name does not start with a slash")
}
if m.Username != "" && m.Password == "" {
return errors.New("username is set but password is empty")
}
if m.Username == "" && m.Password != "" {
return errors.New("password is set but username is empty")
}
if m.RelayUrl != "" && m.Transcoding != nil {
return errors.New("RelayUrl and Transcoding can't both be set")
}
if m.Transcoding != nil {
if err := m.Transcoding.Valid(); err != nil {
return fmt.Errorf("invalid encoding parameters: %v", err)
}
}
return nil
}
func (m *Mount) Equal(other *Mount) bool {
return *m == *other
return (m.Name == other.Name) && (m.Username == other.Username) && (m.Password == other.Password) && (m.RelayUrl == other.RelayUrl) && (m.Fallback == other.Fallback) && ((m.Transcoding == nil && other.Transcoding == nil) || (m.Transcoding != nil && other.Transcoding != nil && *m.Transcoding == *other.Transcoding))
}
func (m *Mount) IsRelay() bool {
return m.RelayUrl != ""
}
func (m *Mount) HasTranscoder() bool {
return m.Transcoding != nil
}
// Return the path in etcd used to store mountpoint configuration.
func mountEtcdPath(mountName string) string {
return MountPrefix + mountName[1:]
......
This diff is collapsed.
......@@ -197,7 +197,7 @@ func TestLoadBalancer_UtilizationPredictor(t *testing.T) {
newNr := okRequests()
if newNr == 0 && nr < cap {
t.Errorf("iteration %d: no successful requests but utilization is below 100% (avail=%d, ok_requests=%d)", i, cap-nr, newNr)
t.Errorf("iteration %d: no successful requests but utilization is below 100%% (avail=%d, ok_requests=%d)", i, cap-nr, newNr)
}
if newNr > (cap - nr) {
t.Fatalf("iteration %d: over capacity (avail=%d, ok_requests=%d)", i, cap-nr, newNr)
......
......@@ -25,12 +25,6 @@ var (
icecastOk = instrumentation.NewGauge("icecast.ok")
)
type Controller interface {
Update(*clusterConfig, bool, net.IP) error
GetStatus() *IcecastStatus
Run(chan bool)
}
// Icecast returns empty fields in our status handler, which we'll
// need to turn into integers (the xml unmarshaler will return an
// error in this specific case), so we use a separate type for
......
package node
import (
"flag"
"fmt"
"io"
"io/ioutil"
"log"
"os"
"os/exec"
"path/filepath"
"sync"
"text/template"
"time"
"git.autistici.org/ale/autoradio"
)
var (
liquidsoapBin = flag.String("liquidsoap", "/usr/bin/liquidsoap", "Location of the liquidsoap binary")
liquidsoapConfigStr = `
# Automatically generated config.
upstream = mksafe(input.http("{{.SourceURL}}", buffer=5.0))
output.icecast(%{{.Format}}(samplerate={{.SampleRate}}, {{if gt .BitRate 0}}bitrate={{.BitRate}}, {{end}}{{if gt .Quality 0.0}}quality={{.Quality}}, {{end}}{{if eq .Channels 2}}stereo{{else}}mono{{end}},
mount="{{.TargetMount}}",
host="{{.TargetIP}}", port={{.TargetPort}}, password="{{.TargetPassword}}",
upstream)
`
liquidsoapConfigTpl = template.Must(template.New("liquidsoap").Parse(liquidsoapConfigStr))
)
// Parameters to configure a liquidsoap-based transcoder.
type liquidsoapParams struct {
// Source (upstream) URL.
SourceURL string
// Target (downstream) connection parameters.
TargetIP string
TargetPort int
TargetMount string
TargetPassword string
// Stream encoding parameters. One note on the 'float32'
// choice for Quality: text/template can't evaluate the 'gt'
// function successfully on a float64 type!
Format string
BitRate int
SampleRate int
Channels int
Quality float32
}
// Create new parameters for liquidsoap for a transcoding mount. If
// mount.Transcoding is nil, this function will panic, so the caller
// should check mount.HasTranscoder().
func newLiquidsoapParams(mount *autoradio.Mount) *liquidsoapParams {
return &liquidsoapParams{
SourceURL: fmt.Sprintf("http://localhost%s", mount.Transcoding.SourceName),
TargetIP: "localhost",
TargetPort: 80,
TargetMount: mount.Name,
TargetPassword: mount.Password,
Format: mount.Transcoding.Format,
BitRate: mount.Transcoding.BitRate,
SampleRate: mount.Transcoding.SampleRate,
Channels: mount.Transcoding.Channels,
Quality: float32(mount.Transcoding.Quality),
}
}
func (p *liquidsoapParams) Equal(other *liquidsoapParams) bool {
return *p == *other
}
func (p *liquidsoapParams) Render(w io.Writer) error {
return liquidsoapConfigTpl.Execute(w, p)
}
// Controls a single liquidsoap process, transcoding a single stream.
type liquidsoapController struct {
params *liquidsoapParams
tmpdir string
configPath string
restartDelay time.Duration
stop chan bool
wg sync.WaitGroup
lock sync.Mutex
process *os.Process
}
func newLiquidsoap(params *liquidsoapParams) (*liquidsoapController, error) {
tmpdir, err := ioutil.TempDir("", "")
if err != nil {
return nil, err
}
configpath := filepath.Join(tmpdir, "transcode.liq")
return &liquidsoapController{
params: params,
tmpdir: tmpdir,
configPath: configpath,
restartDelay: 500 * time.Millisecond,
stop: make(chan bool),
}, nil
}
func (l *liquidsoapController) writeConfig() error {
file, err := os.Create(l.configPath)
if err != nil {
return err
}
defer file.Close()
return l.params.Render(file)
}
func (l *liquidsoapController) setProcess(p *os.Process) {
l.lock.Lock()
l.process = p
defer l.lock.Unlock()
}
func (l *liquidsoapController) run() {
defer l.wg.Done()
if err := l.writeConfig(); err != nil {
log.Printf("error starting liquidsoap: %v", err)
return
}
// Keep restarting liquidsoap if it fails, until the stop
// channel is closed (requesting termination). Keep the lock
// held until l.process is set, so that we can synchronize the
// process termination properly.
l.lock.Lock()
defer l.lock.Unlock()
for {
select {
case <-l.stop:
return
case <-time.After(l.restartDelay):
}
cmd := exec.Command(*liquidsoapBin, "-T", "-U", "-v", l.configPath)
if err := cmd.Start(); err != nil {
log.Printf("error starting liquidsoap: %v", err)
continue
}
l.process = cmd.Process
l.lock.Unlock()
err := cmd.Wait()
l.lock.Lock()
l.process = nil
if err != nil {
log.Printf("liquidsoap exited: %v", err)
}
}
}
func (l *liquidsoapController) Start() {
l.wg.Add(1)
go l.run()
}
func (l *liquidsoapController) Stop() {
// Close the stop channel to prevent restarts.
close(l.stop)
// Kill the currently running process, if any.
l.lock.Lock()
if l.process != nil {
l.process.Kill()
}
l.lock.Unlock()
l.wg.Wait()
os.RemoveAll(l.tmpdir)
}
package node
import (
"bytes"
"io"
"io/ioutil"
"os"
"testing"
"time"
"git.autistici.org/ale/autoradio"
)
var testParams = &liquidsoapParams{
SourceURL: "http://localhost/stream.ogg",
TargetIP: "localhost",
TargetPort: 80,
TargetMount: "/stream.mp3",
TargetPassword: "password",
Format: "mp3",
BitRate: 64,
SampleRate: 22050,
Channels: 2,
}
func TestLiquidsoapParams_New(t *testing.T) {
mount := &autoradio.Mount{
Name: "/stream.mp3",
Username: "sourceuser",
Password: "password",
Transcoding: &autoradio.EncodingParams{
SourceName: "/stream.ogg",
Format: "mp3",
BitRate: 64,
SampleRate: 22050,
Channels: 2,
},
}
params := newLiquidsoapParams(mount)
expected := testParams
if !params.Equal(expected) {
t.Fatalf("newLiquidsoapParams(): got %v, want %v", params, expected)
}
}
func TestLiquidsoapParams_Render(t *testing.T) {
var b bytes.Buffer
params := testParams
if err := params.Render(&b); err != nil {
t.Fatal(err)
}
if b.Len() == 0 {
t.Fatal("empty output")
}
}
func TestLiquidsoapController(t *testing.T) {
// Create a fake 'liquidsoap' binary that accepts all
// parameters and waits forever.
tmpf, err := ioutil.TempFile("", "")
if err != nil {
t.Fatal(err)
}
io.WriteString(tmpf, "#!/bin/sh\nsleep 3600\n")
tmpf.Close()
os.Chmod(tmpf.Name(), 0755)
defer os.Remove(tmpf.Name())
*liquidsoapBin = tmpf.Name()
l, err := newLiquidsoap(testParams)
if err != nil {
t.Fatal(err)
}
l.restartDelay = 0
l.Start()
time.Sleep(50 * time.Millisecond)
if l.process == nil {
t.Fatal("program not started")
}
l.Stop()
if l.process != nil {
t.Fatal("program not cleaned up properly")
}
}
......@@ -229,6 +229,22 @@ func (w *configWatcher) Start(stop chan bool) func(chan bool) {
}
}
// Private interfaces for process controllers. These are used to
// replace real processes with mocks while testing.
type controller interface {
Update(*clusterConfig, bool, net.IP) error
GetStatus() *IcecastStatus
Run(chan bool)
}
type transcodingController interface {
Start()
Stop()
}
// Factory for transcodingControllers.
type transcodingControllerFunc func(*liquidsoapParams) (transcodingController, error)
// An active streaming node, managing the local icecast server.
type RadioNode struct {
client autoradio.EtcdClient
......@@ -237,7 +253,8 @@ type RadioNode struct {
ips []net.IP
me *masterelection.MasterElection
watcher *configWatcher
icecast Controller
icecast controller
transcoderFn transcodingControllerFunc
bw *bwmonitor.BandwidthUsageMonitor
maxListeners int
heartbeat uint64
......@@ -293,8 +310,11 @@ func NewRadioNode(name string, ips []net.IP, netDev string, bwLimit float64, max
string(minfodata),
uint64(*masterElectionTtl),
mech),
watcher: newConfigWatcher(client, config, upch),
icecast: NewIcecastController(name, maxListeners*2),
watcher: newConfigWatcher(client, config, upch),
icecast: NewIcecastController(name, maxListeners*2),
transcoderFn: func(p *liquidsoapParams) (transcodingController, error) {
return newLiquidsoap(p)
},
reloadDelay: 1000 * time.Millisecond,
heartbeat: uint64(*nodeHeartbeat),
bw: bwmonitor.NewBandwidthUsageMonitor(netDev, bwLimit),
......@@ -357,7 +377,16 @@ func (rc *RadioNode) updater(stop chan bool) {
// initialize properly.
time.Sleep(200 * time.Millisecond)
rc.Log.Printf("starting icecast updater")
// Keep track of all the configured transcoders (and clean
// them up at the end).
transcoders := make(map[string]*transcoder)
defer func() {
for _, t := range transcoders {
t.Stop()
}
}()
rc.Log.Printf("starting updater")
for {
select {
case <-rc.upch:
......@@ -368,14 +397,54 @@ func (rc *RadioNode) updater(stop chan bool) {
continue
}
masterAddr := rc.getMasterAddr()
// Reload the Icecast daemon.
icecastReloads.Incr()
rc.Log.Printf("reloading icecast config")
if err := rc.icecast.Update(rc.config, rc.me.IsMaster(), rc.getMasterAddr()); err != nil {
if err := rc.icecast.Update(rc.config, rc.me.IsMaster(), masterAddr); err != nil {
icecastReloadErrors.Incr()
rc.Log.Printf("Update(): %v", err)
}
// Limit the rate of icecast reloads.
// Check the configuration for new or removed
// transcoding mounts, and start (or stop) the
// associated transcoder objects. We also need
// to detect changes in the encoding params
// and restart the transcoder if necessary.
tmp := make(map[string]struct{})
for name := range transcoders {
tmp[name] = struct{}{}
}
for _, m := range rc.config.ListMounts() {
if !m.HasTranscoder() {
continue
}
tparams := newLiquidsoapParams(m)
cur, ok := transcoders[m.Name]
if ok {
delete(tmp, m.Name)
if cur.Changed(tparams) {
cur.Stop()
ok = false
}
}
if !ok {
if t, err := newTranscoder(tparams, rc.transcoderFn, rc.name, rc.client); err != nil {
rc.Log.Printf("could not create transcoder: %v", err)
} else {
t.Start()
transcoders[m.Name] = t
}
}
}
for name := range tmp {
transcoders[name].Stop()
delete(transcoders, name)
}
// Limit the rate of reconfigurations.
if rc.reloadDelay > 0 {
time.Sleep(rc.reloadDelay)
}
......@@ -444,3 +513,83 @@ func (rc *RadioNode) Run() {
func (rc *RadioNode) Stop() {
close(rc.stop)
}
// Transcoder just runs a master election protocol and starts
// liquidsoap for a submount whenever this node becomes the master.
// Transcoder instances can be started and stopped individually. The
// transcoding parameters set at creation time can't be changed while
// the transcoder is running (it must be stopped and restarted).
type transcoder struct {
params *liquidsoapParams
nodeName string
client autoradio.EtcdClient
liquidsoap transcodingController
stop chan bool
wg sync.WaitGroup
}
func newTranscoder(params *liquidsoapParams, tfn transcodingControllerFunc, nodeName string, client autoradio.EtcdClient) (*transcoder, error) {
l, err := tfn(params)
if err != nil {
return nil, err
}
return &transcoder{
params: params,
liquidsoap: l,
client: client,
nodeName: nodeName,
stop: make(chan bool),
}, nil
}
// Changed returns true if the stream parameters have changed,
// requiring a transcoder restart.
func (t *transcoder) Changed(newParams *liquidsoapParams) bool {
return !t.params.Equal(newParams)
}
func (t *transcoder) run() {
defer t.wg.Done()
// The master election protocol must be stopped when the
// transcoder terminates, so its lifecycle is fully contained
// within the scope of this function.
update := make(chan masterelection.State)
mestop := make(chan bool)
me := masterelection.NewMasterElection(
t.client,
autoradio.TranscoderMasterElectionBase+t.params.TargetMount,
t.nodeName,
uint64(*masterElectionTtl),
update)
go me.Run(mestop)
defer close(mestop)
running := false
for {
select {
case state := <-update:
if state.Role == masterelection.ROLE_MASTER {
t.liquidsoap.Start()
running = true
} else if running {
t.liquidsoap.Stop()
}
case <-t.stop:
if running {
t.liquidsoap.Stop()
}
return
}
}
}
func (t *transcoder) Start() {
t.wg.Add(1)
go t.run()
}
func (t *transcoder) Stop() {
close(t.stop)
t.wg.Wait()
}
......@@ -34,6 +34,30 @@ func (m *mockController) GetStatus() *IcecastStatus {
return &IcecastStatus{Up: true}
}
type mockTranscoder struct {
startCount int
stopCount int
}
func (t *mockTranscoder) Start() {
t.startCount++
}
func (t *mockTranscoder) Stop() {
t.stopCount++
}
func (t *mockTranscoder) Reset() {
t.startCount = 0
t.stopCount = 0
}
var globalMockTranscoder = &mockTranscoder{}
func newMockTranscoder(params *liquidsoapParams) (transcodingController, error) {
return globalMockTranscoder, nil
}
func startTestNodes(n int, etcd autoradio.EtcdClient) []*RadioNode {
var nodes []*RadioNode
......@@ -46,6 +70,7 @@ func startTestNodes(n int, etcd autoradio.EtcdClient) []*RadioNode {
1000,
etcd)
node.icecast = &mockController{}
node.transcoderFn = newMockTranscoder
node.reloadDelay = time.Duration(0)
node.Start()
node.Log.SetPrefix(fmt.Sprintf("node%d: ", i+1))
......@@ -69,6 +94,7 @@ func loadTestData(etcd autoradio.EtcdClient) {
}
func TestRadioNode_MasterElection(t *testing.T) {
globalMockTranscoder.Reset()
etcd := util.NewTestEtcdClient()
loadTestData(etcd)
nodes := startTestNodes(3, etcd)
......@@ -98,6 +124,11 @@ func TestRadioNode_MasterElection(t *testing.T) {
time.Sleep(20 * time.Millisecond)
}
// Transcoders should not have been started.
if globalMockTranscoder.startCount > 0 {
t.Fatal("transcoders were started unexpectedly")
}
}
func TestRadioNode_ConfigChangePropagation(t *testing.T) {
......@@ -154,3 +185,33 @@ func TestRadioNode_UpdatesDoNotTriggerIfNothingChanged(t *testing.T) {
t.Errorf("node received %d updates (expected 1)", numUpdates)
}
}
func TestRadioNode_TranscoderMasterElection(t *testing.T) {
globalMockTranscoder.Reset()
etcd := util.NewTestEtcdClient()
loadTestData(etcd)
// Load a transcoding mount.
etcd.Set(autoradio.MountPrefix+"test.mp3",
`{"Name": "/test.mp3", "Username": "source2", "Password": "foo",
"Transcoding": {"BitRate": 64, "SampleRate": 22050}}`,
86400)
nodes := startTestNodes(3, etcd)
time.Sleep(500 * time.Millisecond)
if globalMockTranscoder.startCount != 1 {
t.Errorf("transcoder was started more than once (%d)", globalMockTranscoder.startCount)
}
log.Printf("cleanup")
for _, n := range nodes {
n.Stop()
n.Wait()
}
// At the end, the transcoder must have been started and
// stopped the same number of times.
if globalMockTranscoder.startCount != globalMockTranscoder.stopCount {
t.Errorf("transcoder was started/stopped an unequal number of times: start=%d, stop=%d", globalMockTranscoder.startCount, globalMockTranscoder.stopCount)
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment