Skip to content
Snippets Groups Projects
Commit f5a35ca3 authored by ale's avatar ale
Browse files

made the liquidsoapController more robust

Previously the Controller would be left in a bad state on a master ->
slave transition.
parent d11a3578
Branches
No related tags found
No related merge requests found
...@@ -8,7 +8,6 @@ import ( ...@@ -8,7 +8,6 @@ import (
"log" "log"
"os" "os"
"os/exec" "os/exec"
"path/filepath"
"sync" "sync"
"text/template" "text/template"
"time" "time"
...@@ -84,55 +83,55 @@ func (p *liquidsoapParams) Render(w io.Writer) error { ...@@ -84,55 +83,55 @@ func (p *liquidsoapParams) Render(w io.Writer) error {
} }
// Controls a single liquidsoap process, transcoding a single stream. // Controls a single liquidsoap process, transcoding a single stream.
// This object will be created once, when the node finds out about a
// transcoded stream for the first time. From then on, the Start() and
// Stop() methods will be called, possibly multiple times, whenever
// the node acquires or loses mastership for the stream. Specifically,
// this means that it must be possible to call Start() again after a
// Stop().
type liquidsoapController struct { type liquidsoapController struct {
params *liquidsoapParams params *liquidsoapParams
tmpdir string tmpdir string
configPath string configPath string
restartDelay time.Duration restartDelay time.Duration
stop chan bool // Information about the running process. The lock protects
wg sync.WaitGroup // the 'process' field, and it is only unlocked when we're
// waiting for the process to exit, or if it is not running at
// all.
stop chan struct{}
wg sync.WaitGroup
lock sync.Mutex lock sync.Mutex
process *os.Process process *os.Process
} }
func newLiquidsoap(params *liquidsoapParams) (*liquidsoapController, error) { func newLiquidsoap(params *liquidsoapParams) (*liquidsoapController, error) {
tmpdir, err := ioutil.TempDir("", "")
if err != nil {
return nil, err
}
configpath := filepath.Join(tmpdir, "transcode.liq")
return &liquidsoapController{ return &liquidsoapController{
params: params, params: params,
tmpdir: tmpdir,
configPath: configpath,
restartDelay: 500 * time.Millisecond, restartDelay: 500 * time.Millisecond,
stop: make(chan bool),
}, nil }, nil
} }
func (l *liquidsoapController) writeConfig() error { // Creates a new temporary file with the Liquidsoap transcoding
file, err := os.Create(l.configPath) // configuration, and returns its name.
func (l *liquidsoapController) writeConfig() (string, error) {
file, err := ioutil.TempFile("", "liquidsoap-")
if err != nil { if err != nil {
return err return "", err
} }
defer file.Close() defer file.Close()
return l.params.Render(file) return file.Name(), l.params.Render(file)
}
func (l *liquidsoapController) setProcess(p *os.Process) {
l.lock.Lock()
l.process = p
defer l.lock.Unlock()
} }
func (l *liquidsoapController) run() { func (l *liquidsoapController) runOnce() {
defer l.wg.Done() config, err := l.writeConfig()
// Always remove the temporary file, if it was created, even
if err := l.writeConfig(); err != nil { // if there was an error.
log.Printf("error starting liquidsoap: %v", err) if config != "" {
defer os.Remove(config)
}
if err != nil {
log.Printf("error writing liquidsoap configuration file: %v", err)
return return
} }
...@@ -140,35 +139,48 @@ func (l *liquidsoapController) run() { ...@@ -140,35 +139,48 @@ func (l *liquidsoapController) run() {
// channel is closed (requesting termination). Keep the lock // channel is closed (requesting termination). Keep the lock
// held until l.process is set, so that we can synchronize the // held until l.process is set, so that we can synchronize the
// process termination properly. // process termination properly.
cmd := exec.Command(*liquidsoapBin, "-T", "-U", "-v", l.configPath)
cmd.Stdout = os.Stderr
cmd.Stderr = os.Stderr
if err := cmd.Start(); err != nil {
log.Printf("error starting liquidsoap: %v", err)
return
}
l.process = cmd.Process
// Release the lock while the process is running.
l.lock.Unlock()
err = cmd.Wait()
l.lock.Lock() l.lock.Lock()
l.process = nil
if err != nil {
log.Printf("liquidsoap exited: %v", err)
}
}
func (l *liquidsoapController) run() {
defer l.wg.Done()
// Called with l.lock being held, release it when done.
defer l.lock.Unlock() defer l.lock.Unlock()
for { for {
// Limit the restart rate in case of errors.
select { select {
case <-l.stop: case <-l.stop:
return return
case <-time.After(l.restartDelay): case <-time.After(l.restartDelay):
} }
cmd := exec.Command(*liquidsoapBin, "-T", "-U", "-v", l.configPath) l.runOnce()
cmd.Stdout = os.Stderr
cmd.Stderr = os.Stderr
if err := cmd.Start(); err != nil {
log.Printf("error starting liquidsoap: %v", err)
continue
}
l.process = cmd.Process
l.lock.Unlock()
err := cmd.Wait()
l.lock.Lock()
l.process = nil
if err != nil {
log.Printf("liquidsoap exited: %v", err)
}
} }
} }
func (l *liquidsoapController) Start() { func (l *liquidsoapController) Start() {
l.lock.Lock()
l.wg.Add(1) l.wg.Add(1)
l.stop = make(chan struct{})
go l.run() go l.run()
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment