diff --git a/node/liquidsoap.go b/node/liquidsoap.go index bb6a03f044018e501f8fee61c14067ddca6f4d8f..3876ad17ae49caa7febcc8de3003eca29ef97bac 100644 --- a/node/liquidsoap.go +++ b/node/liquidsoap.go @@ -8,7 +8,6 @@ import ( "log" "os" "os/exec" - "path/filepath" "sync" "text/template" "time" @@ -84,55 +83,55 @@ func (p *liquidsoapParams) Render(w io.Writer) error { } // Controls a single liquidsoap process, transcoding a single stream. +// This object will be created once, when the node finds out about a +// transcoded stream for the first time. From then on, the Start() and +// Stop() methods will be called, possibly multiple times, whenever +// the node acquires or loses mastership for the stream. Specifically, +// this means that it must be possible to call Start() again after a +// Stop(). type liquidsoapController struct { params *liquidsoapParams tmpdir string configPath string restartDelay time.Duration - stop chan bool - wg sync.WaitGroup - + // Information about the running process. The lock protects + // the 'process' field, and it is only unlocked when we're + // waiting for the process to exit, or if it is not running at + // all. + stop chan struct{} + wg sync.WaitGroup lock sync.Mutex process *os.Process } func newLiquidsoap(params *liquidsoapParams) (*liquidsoapController, error) { - tmpdir, err := ioutil.TempDir("", "") - if err != nil { - return nil, err - } - configpath := filepath.Join(tmpdir, "transcode.liq") - return &liquidsoapController{ params: params, - tmpdir: tmpdir, - configPath: configpath, restartDelay: 500 * time.Millisecond, - stop: make(chan bool), }, nil } -func (l *liquidsoapController) writeConfig() error { - file, err := os.Create(l.configPath) +// Creates a new temporary file with the Liquidsoap transcoding +// configuration, and returns its name. +func (l *liquidsoapController) writeConfig() (string, error) { + file, err := ioutil.TempFile("", "liquidsoap-") if err != nil { - return err + return "", err } defer file.Close() - return l.params.Render(file) -} - -func (l *liquidsoapController) setProcess(p *os.Process) { - l.lock.Lock() - l.process = p - defer l.lock.Unlock() + return file.Name(), l.params.Render(file) } -func (l *liquidsoapController) run() { - defer l.wg.Done() - - if err := l.writeConfig(); err != nil { - log.Printf("error starting liquidsoap: %v", err) +func (l *liquidsoapController) runOnce() { + config, err := l.writeConfig() + // Always remove the temporary file, if it was created, even + // if there was an error. + if config != "" { + defer os.Remove(config) + } + if err != nil { + log.Printf("error writing liquidsoap configuration file: %v", err) return } @@ -140,35 +139,48 @@ func (l *liquidsoapController) run() { // channel is closed (requesting termination). Keep the lock // held until l.process is set, so that we can synchronize the // process termination properly. + cmd := exec.Command(*liquidsoapBin, "-T", "-U", "-v", l.configPath) + cmd.Stdout = os.Stderr + cmd.Stderr = os.Stderr + if err := cmd.Start(); err != nil { + log.Printf("error starting liquidsoap: %v", err) + return + } + l.process = cmd.Process + + // Release the lock while the process is running. + l.lock.Unlock() + err = cmd.Wait() l.lock.Lock() + + l.process = nil + if err != nil { + log.Printf("liquidsoap exited: %v", err) + } +} + +func (l *liquidsoapController) run() { + defer l.wg.Done() + + // Called with l.lock being held, release it when done. defer l.lock.Unlock() + for { + // Limit the restart rate in case of errors. select { case <-l.stop: return case <-time.After(l.restartDelay): } - cmd := exec.Command(*liquidsoapBin, "-T", "-U", "-v", l.configPath) - cmd.Stdout = os.Stderr - cmd.Stderr = os.Stderr - if err := cmd.Start(); err != nil { - log.Printf("error starting liquidsoap: %v", err) - continue - } - l.process = cmd.Process - l.lock.Unlock() - err := cmd.Wait() - l.lock.Lock() - l.process = nil - if err != nil { - log.Printf("liquidsoap exited: %v", err) - } + l.runOnce() } } func (l *liquidsoapController) Start() { + l.lock.Lock() l.wg.Add(1) + l.stop = make(chan struct{}) go l.run() }