diff --git a/api.go b/api.go index 7f02873208b67336536cff6fc75fdd7783626af0..50a130b4c7de512e1baf4edc1816e425d27556ca 100644 --- a/api.go +++ b/api.go @@ -6,6 +6,7 @@ import ( "encoding/base64" "encoding/json" "errors" + "fmt" "net" "strings" "sync" @@ -23,9 +24,10 @@ const ( // rolling restart of the cluster will then seamlessly cause a // transition to the new consensus (the cluster will be // partitioned in the meantime). - ABIVersion = "2" - MasterElectionPath = "/icecast/" + ABIVersion + "/cluster/master" - NodePrefix = "/icecast/" + ABIVersion + "/nodes/" + ABIVersion = "3" + MasterElectionPath = "/icecast/" + ABIVersion + "/cluster/master" + TranscoderMasterElectionBase = "/icecast/" + ABIVersion + "/transcode" + NodePrefix = "/icecast/" + ABIVersion + "/nodes/" IcecastPort = 8000 IcecastMountPrefix = "/_stream" @@ -36,6 +38,37 @@ var ( ErrIsFile = errors.New("key is a file") ) +// Encoding parameters used to re-encode a stream. +type EncodingParams struct { + // Path to the source mountpoint. + SourceName string + + // Parameters for the transcoded stream. The value format is + // anything that liquidsoap will accept in its 'output' + // directive. + Format string + BitRate int + SampleRate int + Channels int + Quality float64 +} + +func (p *EncodingParams) Valid() error { + if p.Format == "" { + return errors.New("format not specified") + } + if p.SampleRate == 0 { + return errors.New("sample rate not specified") + } + if p.BitRate == 0 && p.Quality == 0 { + return errors.New("either bitrate or quality must be specified") + } + if p.Channels < 1 { + return errors.New("bad number of channels") + } + return nil +} + // A mountpoint for a stream. type Mount struct { // Name (path to the mountpoint). @@ -55,16 +88,45 @@ type Mount struct { // Fallback stream name (optional). Fallback string + + // If Transcoding is non-nil, this mountpoint represents a + // transcoded stream. + Transcoding *EncodingParams +} + +func (m *Mount) Valid() error { + if !strings.HasPrefix(m.Name, "/") { + return errors.New("name does not start with a slash") + } + if m.Username != "" && m.Password == "" { + return errors.New("username is set but password is empty") + } + if m.Username == "" && m.Password != "" { + return errors.New("password is set but username is empty") + } + if m.RelayUrl != "" && m.Transcoding != nil { + return errors.New("RelayUrl and Transcoding can't both be set") + } + if m.Transcoding != nil { + if err := m.Transcoding.Valid(); err != nil { + return fmt.Errorf("invalid encoding parameters: %v", err) + } + } + return nil } func (m *Mount) Equal(other *Mount) bool { - return *m == *other + return (m.Name == other.Name) && (m.Username == other.Username) && (m.Password == other.Password) && (m.RelayUrl == other.RelayUrl) && (m.Fallback == other.Fallback) && ((m.Transcoding == nil && other.Transcoding == nil) || (m.Transcoding != nil && other.Transcoding != nil && *m.Transcoding == *other.Transcoding)) } func (m *Mount) IsRelay() bool { return m.RelayUrl != "" } +func (m *Mount) HasTranscoder() bool { + return m.Transcoding != nil +} + // Return the path in etcd used to store mountpoint configuration. func mountEtcdPath(mountName string) string { return MountPrefix + mountName[1:] diff --git a/cmd/radioctl/radioctl.go b/cmd/radioctl/radioctl.go index a40999d01614ecfc4dd67fc7e77f5cdb78261bb9..233d274b928fdd02aa8815db9fa91fa840c2b179 100644 --- a/cmd/radioctl/radioctl.go +++ b/cmd/radioctl/radioctl.go @@ -1,6 +1,7 @@ package main import ( + "encoding/json" "flag" "fmt" "hash/crc32" @@ -14,6 +15,9 @@ import ( gonutsflag "github.com/gonuts/flag" ) +// Format for output of structured data. +var outputFormat = flag.String("format", "txt", "Output format for structured data (json, txt)") + type HasAddFlags interface { AddFlags(*gonutsflag.FlagSet) } @@ -65,65 +69,51 @@ func setRelay(m *autoradio.Mount, relayUrl string) { } } -// Edit a mountpoint. -type editMountCommand struct { - BaseCommand - relay string - fallback string -} +func printMount(m *autoradio.Mount) { + switch *outputFormat { + case "json": + s, _ := json.MarshalIndent(m, "", " ") + os.Stdout.Write(s) -var UNSET = "UNSET" + case "txt": + fmt.Printf("path=%s\n", m.Name) + if m.Username != "" { + fmt.Printf("username=%s\npassword=%s\n", m.Username, m.Password) + } + if m.RelayUrl != "" { + fmt.Printf("relay_url=%s\n", m.RelayUrl) + } + if m.Fallback != "" { + fmt.Printf("fallback_url=%s\n", m.Fallback) + } + if t := m.Transcoding; t != nil { + fmt.Printf("transcode_source_url=%s\n", t.SourceName) + fmt.Printf("transcode_format=%s\n", t.Format) + fmt.Printf("transcode_bitrate=%d\n", t.BitRate) + fmt.Printf("transcode_quality=%f\n", t.Quality) + fmt.Printf("transcode_samplerate=%d\n", t.SampleRate) + } -func newEditMountCommand() *editMountCommand { - return &editMountCommand{ - BaseCommand: BaseCommand{ - UsageLine: "edit-mount <path>", - Short: "Edit an existing mountpoint", - Long: ` -Modify parameters of the specified mountpoint, such as the relay -and the fallback URL. If the relay option is set, the mountpoint -will not accept source connections anymore. To revert to the -default, non-relay behavior, set the relay to the empty string -(with --relay=""). -`, - }, + default: + log.Printf("unsupported output format \"%s\"", *outputFormat) } } -func (cmd *editMountCommand) AddFlags(f *gonutsflag.FlagSet) { - // Note that we use special values to figure out whether a - // flag has been specified or not. There might be better way - // to do this. - f.StringVar(&cmd.relay, "relay", UNSET, "Upstream URL to relay") - f.StringVar(&cmd.fallback, "fallback", UNSET, "Fallback stream URL") +func addEncodingFlags(f *gonutsflag.FlagSet, p *autoradio.EncodingParams) { + f.StringVar(&p.SourceName, "source", "", "Source mountpoint") + f.StringVar(&p.Format, "codec", "", "Encoding format") + f.IntVar(&p.BitRate, "bitrate", 0, "Bitrate (kbps)") + f.IntVar(&p.SampleRate, "samplerate", 0, "Sample rate (Hz)") + f.IntVar(&p.Channels, "channels", 2, "Number of channels") + f.Float64Var(&p.Quality, "quality", 0, "Quality (alternatively to bitrate for some encoders)") } -func (cmd *editMountCommand) Run(args []string) { - if len(args) != 1 { - log.Fatal("Wrong number of arguments") - } - - client := getClient() - mount, err := client.GetMount(args[0]) +func mountExists(name string, client *autoradio.Client) bool { + m, err := client.GetMount(name) if err != nil { log.Fatal(err) } - if mount == nil { - log.Fatal("Mount not found") - } - - if cmd.fallback != UNSET { - mount.Fallback = cmd.fallback - } - if cmd.relay != UNSET { - setRelay(mount, cmd.relay) - } - - if err := client.SetMount(mount); err != nil { - log.Fatal(err) - } - - fmt.Printf("%+v\n", mount) + return m != nil } // Create a new mountpoint. @@ -159,28 +149,166 @@ func (cmd *createMountCommand) Run(args []string) { path := args[0] if !strings.HasPrefix(path, "/") { + log.Printf("Warning: mountpoint %s does not start with a slash, using /%s instead", path, path) path = "/" + path } // Check if the mount already exists. client := getClient() if oldm, _ := client.GetMount(path); oldm != nil { - log.Fatal("A mount with that name already exists!") + log.Fatal("ERROR: A mount with that name already exists!") } // Create the new mount and set the relevant fields (depending // on the options passed to the command). m := &autoradio.Mount{Name: path} setRelay(m, cmd.relay) - if cmd.fallback != "" { + m.Fallback = cmd.fallback + + if err := m.Valid(); err != nil { + log.Fatalf("ERROR: mount configuration is invalid: %v", err) + } + + if err := client.SetMount(m); err != nil { + log.Fatalf("ERROR: creating mount: %v", err) + } + + printMount(m) +} + +// Create a submount (transcoded stream). +type createTranscodingMountCommand struct { + BaseCommand + params *autoradio.EncodingParams + fallback string +} + +func newCreateTranscodingMountCommand() *createTranscodingMountCommand { + return &createTranscodingMountCommand{ + BaseCommand: BaseCommand{ + UsageLine: "create-transcoding-mount <path>", + Short: "Create a transcoded mount", + Long: ` +Create a new stream that will transcode the parent stream with +different encoding parameters. +`, + }, + params: &autoradio.EncodingParams{}, + } +} + +func (cmd *createTranscodingMountCommand) AddFlags(f *gonutsflag.FlagSet) { + addEncodingFlags(f, cmd.params) + f.StringVar(&cmd.fallback, "fallback", "", "Fallback stream URL") +} + +func (cmd *createTranscodingMountCommand) Run(args []string) { + if len(args) != 1 { + log.Fatal("Wrong number of arguments") + } + + path := args[0] + if !strings.HasPrefix(path, "/") { + log.Printf("Warning: mountpoint %s does not start with a slash, using /%s instead", path, path) + path = "/" + path + } + + // The mount path should not exist. + client := getClient() + if mountExists(path, client) { + log.Fatal("ERROR: a mount with that name already exists!") + } + // The source mount should exist. + if !mountExists(cmd.params.SourceName, client) { + log.Fatal("ERROR: the source mount does not exist!") + } + + // Retrieve the parent mount point and add a TranscodingMount. + m := &autoradio.Mount{ + Name: path, + Transcoding: cmd.params, + } + setRelay(m, "") + m.Fallback = cmd.fallback + + if err := m.Valid(); err != nil { + log.Fatalf("ERROR: mount configuration is invalid: %v", err) + } + + if err := client.SetMount(m); err != nil { + log.Fatalf("ERROR: creating mount: %v", err) + } + + printMount(m) +} + +// Edit a mountpoint. +type editMountCommand struct { + BaseCommand + params *autoradio.EncodingParams + relay string + fallback string +} + +var UNSET = "UNSET" + +func newEditMountCommand() *editMountCommand { + return &editMountCommand{ + BaseCommand: BaseCommand{ + UsageLine: "edit-mount <path>", + Short: "Edit an existing mountpoint", + Long: ` +Modify parameters of the specified mountpoint, such as the relay +and the fallback URL. If the relay option is set, the mountpoint +will not accept source connections anymore. To revert to the +default, non-relay behavior, set the relay to the empty string +(with --relay=""). +`, + }, + params: &autoradio.EncodingParams{}, + } +} + +func (cmd *editMountCommand) AddFlags(f *gonutsflag.FlagSet) { + // Note that we use a magic value to figure out whether a flag + // has been specified or not, to make it possible to clear a + // field (by setting it to the empty string). There might be + // better way to do this. + f.StringVar(&cmd.relay, "relay", UNSET, "Upstream URL to relay") + f.StringVar(&cmd.fallback, "fallback", UNSET, "Fallback stream URL") + addEncodingFlags(f, cmd.params) +} + +func (cmd *editMountCommand) Run(args []string) { + if len(args) != 1 { + log.Fatal("Wrong number of arguments") + } + + client := getClient() + m, err := client.GetMount(args[0]) + if err != nil { + log.Fatalf("ERROR: %v", err) + } + if m == nil { + log.Fatal("ERROR: mount not found") + } + + if cmd.fallback != UNSET { m.Fallback = cmd.fallback } + if cmd.relay != UNSET { + setRelay(m, cmd.relay) + } + + if err := m.Valid(); err != nil { + log.Fatalf("ERROR: mount configuration is invalid: %v", err) + } if err := client.SetMount(m); err != nil { - log.Fatal(err) + log.Fatalf("ERROR: updating mount: %v", err) } - fmt.Printf("%+v\n", m) + printMount(m) } // Delete an existing mountpoint. @@ -204,10 +332,31 @@ func (cmd *deleteMountCommand) Run(args []string) { if len(args) != 1 { log.Fatal("Wrong number of arguments") } - if err := getClient().DelMount(args[0]); err != nil { - log.Fatal(err) + path := args[0] + client := getClient() + if !mountExists(path, client) { + log.Fatal("ERROR: mount not found") } - log.Printf("mountpoint %s removed", args[0]) + + if err := client.DelMount(path); err != nil { + log.Fatalf("ERROR: deleting mount: %v", err) + } + + // Delete all the transcoding mounts that have this as a + // source. + mounts, err := client.ListMounts() + if err != nil { + log.Fatalf("ERROR: %v", err) + } + for _, m := range mounts { + if m.HasTranscoder() && m.Transcoding.SourceName == path { + if err := client.DelMount(m.Name); err != nil { + log.Printf("ERROR: deleting transcoded mount %s: %v", m.Name, err) + } + } + } + + log.Printf("mountpoint %s removed", path) } // List known mountpoints. @@ -234,10 +383,23 @@ func (cmd *listMountsCommand) Run(args []string) { mounts, err := getClient().ListMounts() if err != nil { - log.Fatal(err) + log.Fatalf("ERROR: %v", err) } + var names []string for _, m := range mounts { - fmt.Println(m.Name) + names = append(names, m.Name) + } + + switch *outputFormat { + case "json": + s, _ := json.MarshalIndent(names, "", " ") + os.Stdout.Write(s) + + //case "txt": + default: + for _, n := range names { + fmt.Println(n) + } } } @@ -261,28 +423,17 @@ the source credentials). func (cmd *showMountCommand) Run(args []string) { if len(args) != 1 { - log.Fatal("Wrong nubmer of arguments") + log.Fatal("Wrong number of arguments") } mount, err := getClient().GetMount(args[0]) if err != nil { - log.Fatal(err) + log.Fatalf("ERROR: %v", err) } if mount == nil { - log.Fatal("Mount not found") + log.Fatal("ERROR: mount not found") } - // Print out simple key=value pairs. - fmt.Printf("path = %s\n", mount.Name) - if mount.Username != "" { - fmt.Printf("username = %s\n", mount.Username) - fmt.Printf("password = %s\n", mount.Password) - } - if mount.RelayUrl != "" { - fmt.Printf("relay_url = %s\n", mount.RelayUrl) - } - if mount.Fallback != "" { - fmt.Printf("fallback_url = %s\n", mount.Fallback) - } + printMount(mount) } var cmdr = &commander.Command{ @@ -304,6 +455,7 @@ func addCommand(c CommandInterface) { func init() { addCommand(newCreateMountCommand()) + addCommand(newCreateTranscodingMountCommand()) addCommand(newEditMountCommand()) addCommand(newDeleteMountCommand()) addCommand(newListMountsCommand()) diff --git a/fe/loadbalancing_test.go b/fe/loadbalancing_test.go index 2b3c7a49a443db9a8dc100a970b88a068f60b8f2..9f19216bcfc5b2eadcd44a411949216d7257951d 100644 --- a/fe/loadbalancing_test.go +++ b/fe/loadbalancing_test.go @@ -197,7 +197,7 @@ func TestLoadBalancer_UtilizationPredictor(t *testing.T) { newNr := okRequests() if newNr == 0 && nr < cap { - t.Errorf("iteration %d: no successful requests but utilization is below 100% (avail=%d, ok_requests=%d)", i, cap-nr, newNr) + t.Errorf("iteration %d: no successful requests but utilization is below 100%% (avail=%d, ok_requests=%d)", i, cap-nr, newNr) } if newNr > (cap - nr) { t.Fatalf("iteration %d: over capacity (avail=%d, ok_requests=%d)", i, cap-nr, newNr) diff --git a/node/icecast.go b/node/icecast.go index 0ebc5d4348a8aab3f9e4432f7910ddf035f9c26b..e63c8ccb4d6a6a6a57abad3d6d4065b41e87a822 100644 --- a/node/icecast.go +++ b/node/icecast.go @@ -25,12 +25,6 @@ var ( icecastOk = instrumentation.NewGauge("icecast.ok") ) -type Controller interface { - Update(*clusterConfig, bool, net.IP) error - GetStatus() *IcecastStatus - Run(chan bool) -} - // Icecast returns empty fields in our status handler, which we'll // need to turn into integers (the xml unmarshaler will return an // error in this specific case), so we use a separate type for diff --git a/node/liquidsoap.go b/node/liquidsoap.go new file mode 100644 index 0000000000000000000000000000000000000000..483ae3beba7f6c9309f77a6fbee5758fa6b6e78b --- /dev/null +++ b/node/liquidsoap.go @@ -0,0 +1,182 @@ +package node + +import ( + "flag" + "fmt" + "io" + "io/ioutil" + "log" + "os" + "os/exec" + "path/filepath" + "sync" + "text/template" + "time" + + "git.autistici.org/ale/autoradio" +) + +var ( + liquidsoapBin = flag.String("liquidsoap", "/usr/bin/liquidsoap", "Location of the liquidsoap binary") + + liquidsoapConfigStr = ` +# Automatically generated config. +upstream = mksafe(input.http("{{.SourceURL}}", buffer=5.0)) +output.icecast(%{{.Format}}(samplerate={{.SampleRate}}, {{if gt .BitRate 0}}bitrate={{.BitRate}}, {{end}}{{if gt .Quality 0.0}}quality={{.Quality}}, {{end}}{{if eq .Channels 2}}stereo{{else}}mono{{end}}, + mount="{{.TargetMount}}", + host="{{.TargetIP}}", port={{.TargetPort}}, password="{{.TargetPassword}}", + upstream) +` + + liquidsoapConfigTpl = template.Must(template.New("liquidsoap").Parse(liquidsoapConfigStr)) +) + +// Parameters to configure a liquidsoap-based transcoder. +type liquidsoapParams struct { + // Source (upstream) URL. + SourceURL string + + // Target (downstream) connection parameters. + TargetIP string + TargetPort int + TargetMount string + TargetPassword string + + // Stream encoding parameters. One note on the 'float32' + // choice for Quality: text/template can't evaluate the 'gt' + // function successfully on a float64 type! + Format string + BitRate int + SampleRate int + Channels int + Quality float32 +} + +// Create new parameters for liquidsoap for a transcoding mount. If +// mount.Transcoding is nil, this function will panic, so the caller +// should check mount.HasTranscoder(). +func newLiquidsoapParams(mount *autoradio.Mount) *liquidsoapParams { + return &liquidsoapParams{ + SourceURL: fmt.Sprintf("http://localhost%s", mount.Transcoding.SourceName), + TargetIP: "localhost", + TargetPort: 80, + TargetMount: mount.Name, + TargetPassword: mount.Password, + Format: mount.Transcoding.Format, + BitRate: mount.Transcoding.BitRate, + SampleRate: mount.Transcoding.SampleRate, + Channels: mount.Transcoding.Channels, + Quality: float32(mount.Transcoding.Quality), + } +} + +func (p *liquidsoapParams) Equal(other *liquidsoapParams) bool { + return *p == *other +} + +func (p *liquidsoapParams) Render(w io.Writer) error { + return liquidsoapConfigTpl.Execute(w, p) +} + +// Controls a single liquidsoap process, transcoding a single stream. +type liquidsoapController struct { + params *liquidsoapParams + tmpdir string + configPath string + restartDelay time.Duration + + stop chan bool + wg sync.WaitGroup + + lock sync.Mutex + process *os.Process +} + +func newLiquidsoap(params *liquidsoapParams) (*liquidsoapController, error) { + tmpdir, err := ioutil.TempDir("", "") + if err != nil { + return nil, err + } + configpath := filepath.Join(tmpdir, "transcode.liq") + + return &liquidsoapController{ + params: params, + tmpdir: tmpdir, + configPath: configpath, + restartDelay: 500 * time.Millisecond, + stop: make(chan bool), + }, nil +} + +func (l *liquidsoapController) writeConfig() error { + file, err := os.Create(l.configPath) + if err != nil { + return err + } + defer file.Close() + return l.params.Render(file) +} + +func (l *liquidsoapController) setProcess(p *os.Process) { + l.lock.Lock() + l.process = p + defer l.lock.Unlock() +} + +func (l *liquidsoapController) run() { + defer l.wg.Done() + + if err := l.writeConfig(); err != nil { + log.Printf("error starting liquidsoap: %v", err) + return + } + + // Keep restarting liquidsoap if it fails, until the stop + // channel is closed (requesting termination). Keep the lock + // held until l.process is set, so that we can synchronize the + // process termination properly. + l.lock.Lock() + defer l.lock.Unlock() + for { + select { + case <-l.stop: + return + case <-time.After(l.restartDelay): + } + + cmd := exec.Command(*liquidsoapBin, "-T", "-U", "-v", l.configPath) + if err := cmd.Start(); err != nil { + log.Printf("error starting liquidsoap: %v", err) + continue + } + l.process = cmd.Process + l.lock.Unlock() + err := cmd.Wait() + l.lock.Lock() + l.process = nil + if err != nil { + log.Printf("liquidsoap exited: %v", err) + } + } +} + +func (l *liquidsoapController) Start() { + l.wg.Add(1) + go l.run() +} + +func (l *liquidsoapController) Stop() { + // Close the stop channel to prevent restarts. + close(l.stop) + + // Kill the currently running process, if any. + l.lock.Lock() + if l.process != nil { + l.process.Kill() + } + l.lock.Unlock() + + l.wg.Wait() + + os.RemoveAll(l.tmpdir) +} diff --git a/node/liquidsoap_test.go b/node/liquidsoap_test.go new file mode 100644 index 0000000000000000000000000000000000000000..58db3684585c837b1bda4b9155dd5db96dbf4518 --- /dev/null +++ b/node/liquidsoap_test.go @@ -0,0 +1,87 @@ +package node + +import ( + "bytes" + "io" + "io/ioutil" + "os" + "testing" + "time" + + "git.autistici.org/ale/autoradio" +) + +var testParams = &liquidsoapParams{ + SourceURL: "http://localhost/stream.ogg", + TargetIP: "localhost", + TargetPort: 80, + TargetMount: "/stream.mp3", + TargetPassword: "password", + Format: "mp3", + BitRate: 64, + SampleRate: 22050, + Channels: 2, +} + +func TestLiquidsoapParams_New(t *testing.T) { + mount := &autoradio.Mount{ + Name: "/stream.mp3", + Username: "sourceuser", + Password: "password", + Transcoding: &autoradio.EncodingParams{ + SourceName: "/stream.ogg", + Format: "mp3", + BitRate: 64, + SampleRate: 22050, + Channels: 2, + }, + } + + params := newLiquidsoapParams(mount) + expected := testParams + if !params.Equal(expected) { + t.Fatalf("newLiquidsoapParams(): got %v, want %v", params, expected) + } +} + +func TestLiquidsoapParams_Render(t *testing.T) { + var b bytes.Buffer + params := testParams + if err := params.Render(&b); err != nil { + t.Fatal(err) + } + if b.Len() == 0 { + t.Fatal("empty output") + } +} + +func TestLiquidsoapController(t *testing.T) { + // Create a fake 'liquidsoap' binary that accepts all + // parameters and waits forever. + tmpf, err := ioutil.TempFile("", "") + if err != nil { + t.Fatal(err) + } + io.WriteString(tmpf, "#!/bin/sh\nsleep 3600\n") + tmpf.Close() + os.Chmod(tmpf.Name(), 0755) + defer os.Remove(tmpf.Name()) + *liquidsoapBin = tmpf.Name() + + l, err := newLiquidsoap(testParams) + if err != nil { + t.Fatal(err) + } + l.restartDelay = 0 + + l.Start() + time.Sleep(50 * time.Millisecond) + if l.process == nil { + t.Fatal("program not started") + } + + l.Stop() + if l.process != nil { + t.Fatal("program not cleaned up properly") + } +} diff --git a/node/node.go b/node/node.go index 938b1be0018498d34ab91aa5fa352434f3e88e5a..036542a02c9f1c4ee0e6f3ff41dbef8a0d76fc22 100644 --- a/node/node.go +++ b/node/node.go @@ -229,6 +229,22 @@ func (w *configWatcher) Start(stop chan bool) func(chan bool) { } } +// Private interfaces for process controllers. These are used to +// replace real processes with mocks while testing. +type controller interface { + Update(*clusterConfig, bool, net.IP) error + GetStatus() *IcecastStatus + Run(chan bool) +} + +type transcodingController interface { + Start() + Stop() +} + +// Factory for transcodingControllers. +type transcodingControllerFunc func(*liquidsoapParams) (transcodingController, error) + // An active streaming node, managing the local icecast server. type RadioNode struct { client autoradio.EtcdClient @@ -237,7 +253,8 @@ type RadioNode struct { ips []net.IP me *masterelection.MasterElection watcher *configWatcher - icecast Controller + icecast controller + transcoderFn transcodingControllerFunc bw *bwmonitor.BandwidthUsageMonitor maxListeners int heartbeat uint64 @@ -293,8 +310,11 @@ func NewRadioNode(name string, ips []net.IP, netDev string, bwLimit float64, max string(minfodata), uint64(*masterElectionTtl), mech), - watcher: newConfigWatcher(client, config, upch), - icecast: NewIcecastController(name, maxListeners*2), + watcher: newConfigWatcher(client, config, upch), + icecast: NewIcecastController(name, maxListeners*2), + transcoderFn: func(p *liquidsoapParams) (transcodingController, error) { + return newLiquidsoap(p) + }, reloadDelay: 1000 * time.Millisecond, heartbeat: uint64(*nodeHeartbeat), bw: bwmonitor.NewBandwidthUsageMonitor(netDev, bwLimit), @@ -357,7 +377,16 @@ func (rc *RadioNode) updater(stop chan bool) { // initialize properly. time.Sleep(200 * time.Millisecond) - rc.Log.Printf("starting icecast updater") + // Keep track of all the configured transcoders (and clean + // them up at the end). + transcoders := make(map[string]*transcoder) + defer func() { + for _, t := range transcoders { + t.Stop() + } + }() + + rc.Log.Printf("starting updater") for { select { case <-rc.upch: @@ -368,14 +397,54 @@ func (rc *RadioNode) updater(stop chan bool) { continue } + masterAddr := rc.getMasterAddr() + + // Reload the Icecast daemon. icecastReloads.Incr() rc.Log.Printf("reloading icecast config") - if err := rc.icecast.Update(rc.config, rc.me.IsMaster(), rc.getMasterAddr()); err != nil { + if err := rc.icecast.Update(rc.config, rc.me.IsMaster(), masterAddr); err != nil { icecastReloadErrors.Incr() rc.Log.Printf("Update(): %v", err) } - // Limit the rate of icecast reloads. + // Check the configuration for new or removed + // transcoding mounts, and start (or stop) the + // associated transcoder objects. We also need + // to detect changes in the encoding params + // and restart the transcoder if necessary. + tmp := make(map[string]struct{}) + for name := range transcoders { + tmp[name] = struct{}{} + } + for _, m := range rc.config.ListMounts() { + if !m.HasTranscoder() { + continue + } + + tparams := newLiquidsoapParams(m) + cur, ok := transcoders[m.Name] + if ok { + delete(tmp, m.Name) + if cur.Changed(tparams) { + cur.Stop() + ok = false + } + } + if !ok { + if t, err := newTranscoder(tparams, rc.transcoderFn, rc.name, rc.client); err != nil { + rc.Log.Printf("could not create transcoder: %v", err) + } else { + t.Start() + transcoders[m.Name] = t + } + } + } + for name := range tmp { + transcoders[name].Stop() + delete(transcoders, name) + } + + // Limit the rate of reconfigurations. if rc.reloadDelay > 0 { time.Sleep(rc.reloadDelay) } @@ -444,3 +513,83 @@ func (rc *RadioNode) Run() { func (rc *RadioNode) Stop() { close(rc.stop) } + +// Transcoder just runs a master election protocol and starts +// liquidsoap for a submount whenever this node becomes the master. +// Transcoder instances can be started and stopped individually. The +// transcoding parameters set at creation time can't be changed while +// the transcoder is running (it must be stopped and restarted). +type transcoder struct { + params *liquidsoapParams + nodeName string + client autoradio.EtcdClient + liquidsoap transcodingController + stop chan bool + wg sync.WaitGroup +} + +func newTranscoder(params *liquidsoapParams, tfn transcodingControllerFunc, nodeName string, client autoradio.EtcdClient) (*transcoder, error) { + l, err := tfn(params) + if err != nil { + return nil, err + } + return &transcoder{ + params: params, + liquidsoap: l, + client: client, + nodeName: nodeName, + stop: make(chan bool), + }, nil +} + +// Changed returns true if the stream parameters have changed, +// requiring a transcoder restart. +func (t *transcoder) Changed(newParams *liquidsoapParams) bool { + return !t.params.Equal(newParams) +} + +func (t *transcoder) run() { + defer t.wg.Done() + + // The master election protocol must be stopped when the + // transcoder terminates, so its lifecycle is fully contained + // within the scope of this function. + update := make(chan masterelection.State) + mestop := make(chan bool) + me := masterelection.NewMasterElection( + t.client, + autoradio.TranscoderMasterElectionBase+t.params.TargetMount, + t.nodeName, + uint64(*masterElectionTtl), + update) + go me.Run(mestop) + defer close(mestop) + + running := false + for { + select { + case state := <-update: + if state.Role == masterelection.ROLE_MASTER { + t.liquidsoap.Start() + running = true + } else if running { + t.liquidsoap.Stop() + } + case <-t.stop: + if running { + t.liquidsoap.Stop() + } + return + } + } +} + +func (t *transcoder) Start() { + t.wg.Add(1) + go t.run() +} + +func (t *transcoder) Stop() { + close(t.stop) + t.wg.Wait() +} diff --git a/node/node_test.go b/node/node_test.go index 143981fc88c8c34f55b2b9403588df3814ecf397..6dcfc9beb990e32e66ece655b5a107bf0201c17f 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -34,6 +34,30 @@ func (m *mockController) GetStatus() *IcecastStatus { return &IcecastStatus{Up: true} } +type mockTranscoder struct { + startCount int + stopCount int +} + +func (t *mockTranscoder) Start() { + t.startCount++ +} + +func (t *mockTranscoder) Stop() { + t.stopCount++ +} + +func (t *mockTranscoder) Reset() { + t.startCount = 0 + t.stopCount = 0 +} + +var globalMockTranscoder = &mockTranscoder{} + +func newMockTranscoder(params *liquidsoapParams) (transcodingController, error) { + return globalMockTranscoder, nil +} + func startTestNodes(n int, etcd autoradio.EtcdClient) []*RadioNode { var nodes []*RadioNode @@ -46,6 +70,7 @@ func startTestNodes(n int, etcd autoradio.EtcdClient) []*RadioNode { 1000, etcd) node.icecast = &mockController{} + node.transcoderFn = newMockTranscoder node.reloadDelay = time.Duration(0) node.Start() node.Log.SetPrefix(fmt.Sprintf("node%d: ", i+1)) @@ -69,6 +94,7 @@ func loadTestData(etcd autoradio.EtcdClient) { } func TestRadioNode_MasterElection(t *testing.T) { + globalMockTranscoder.Reset() etcd := util.NewTestEtcdClient() loadTestData(etcd) nodes := startTestNodes(3, etcd) @@ -98,6 +124,11 @@ func TestRadioNode_MasterElection(t *testing.T) { time.Sleep(20 * time.Millisecond) } + + // Transcoders should not have been started. + if globalMockTranscoder.startCount > 0 { + t.Fatal("transcoders were started unexpectedly") + } } func TestRadioNode_ConfigChangePropagation(t *testing.T) { @@ -154,3 +185,33 @@ func TestRadioNode_UpdatesDoNotTriggerIfNothingChanged(t *testing.T) { t.Errorf("node received %d updates (expected 1)", numUpdates) } } + +func TestRadioNode_TranscoderMasterElection(t *testing.T) { + globalMockTranscoder.Reset() + etcd := util.NewTestEtcdClient() + loadTestData(etcd) + + // Load a transcoding mount. + etcd.Set(autoradio.MountPrefix+"test.mp3", + `{"Name": "/test.mp3", "Username": "source2", "Password": "foo", + "Transcoding": {"BitRate": 64, "SampleRate": 22050}}`, + 86400) + + nodes := startTestNodes(3, etcd) + time.Sleep(500 * time.Millisecond) + if globalMockTranscoder.startCount != 1 { + t.Errorf("transcoder was started more than once (%d)", globalMockTranscoder.startCount) + } + + log.Printf("cleanup") + for _, n := range nodes { + n.Stop() + n.Wait() + } + + // At the end, the transcoder must have been started and + // stopped the same number of times. + if globalMockTranscoder.startCount != globalMockTranscoder.stopCount { + t.Errorf("transcoder was started/stopped an unequal number of times: start=%d, stop=%d", globalMockTranscoder.startCount, globalMockTranscoder.stopCount) + } +}