package node import ( "flag" "fmt" "io" "io/ioutil" "log" "os" "os/exec" "path/filepath" "sync" "text/template" "time" "git.autistici.org/ale/autoradio" ) var ( liquidsoapBin = flag.String("liquidsoap", "/usr/bin/liquidsoap", "Location of the liquidsoap binary") liquidsoapConfigStr = ` set("log.file", false) set("log.stdout", true) upstream = mksafe(input.http("{{.SourceURL}}", buffer=5.0)) output.icecast(%{{.Format}}(samplerate={{.SampleRate}}, {{if gt .BitRate 0}}bitrate={{.BitRate}}, {{end}}{{if gt .Quality 0.0}}quality={{.Quality}}, {{end}}{{if eq .Channels 2}}stereo{{else}}mono{{end}}), mount="{{.TargetMount}}", host="{{.TargetIP}}", port={{.TargetPort}}, password="{{.TargetPassword}}", upstream) ` liquidsoapConfigTpl = template.Must(template.New("liquidsoap").Parse(liquidsoapConfigStr)) ) // Parameters to configure a liquidsoap-based transcoder. type liquidsoapParams struct { // Source (upstream) URL. SourceURL string // Target (downstream) connection parameters. TargetIP string TargetPort int TargetMount string TargetPassword string // Stream encoding parameters. One note on the 'float32' // choice for Quality: text/template can't evaluate the 'gt' // function successfully on a float64 type! Format string BitRate int SampleRate int Channels int Quality float32 } // Create new parameters for liquidsoap for a transcoding mount. If // mount.Transcoding is nil, this function will panic, so the caller // should check mount.HasTranscoder(). func newLiquidsoapParams(mount *autoradio.Mount) *liquidsoapParams { return &liquidsoapParams{ SourceURL: fmt.Sprintf("http://localhost%s", mount.Transcoding.SourceName), TargetIP: "localhost", TargetPort: 80, TargetMount: mount.Name, TargetPassword: mount.Password, Format: mount.Transcoding.Format, BitRate: mount.Transcoding.BitRate, SampleRate: mount.Transcoding.SampleRate, Channels: mount.Transcoding.Channels, Quality: float32(mount.Transcoding.Quality), } } func (p *liquidsoapParams) Equal(other *liquidsoapParams) bool { return *p == *other } func (p *liquidsoapParams) Render(w io.Writer) error { return liquidsoapConfigTpl.Execute(w, p) } // Controls a single liquidsoap process, transcoding a single stream. type liquidsoapController struct { params *liquidsoapParams tmpdir string configPath string restartDelay time.Duration stop chan bool wg sync.WaitGroup lock sync.Mutex process *os.Process } func newLiquidsoap(params *liquidsoapParams) (*liquidsoapController, error) { tmpdir, err := ioutil.TempDir("", "") if err != nil { return nil, err } configpath := filepath.Join(tmpdir, "transcode.liq") return &liquidsoapController{ params: params, tmpdir: tmpdir, configPath: configpath, restartDelay: 500 * time.Millisecond, stop: make(chan bool), }, nil } func (l *liquidsoapController) writeConfig() error { file, err := os.Create(l.configPath) if err != nil { return err } defer file.Close() return l.params.Render(file) } func (l *liquidsoapController) setProcess(p *os.Process) { l.lock.Lock() l.process = p defer l.lock.Unlock() } func (l *liquidsoapController) run() { defer l.wg.Done() if err := l.writeConfig(); err != nil { log.Printf("error starting liquidsoap: %v", err) return } // Keep restarting liquidsoap if it fails, until the stop // channel is closed (requesting termination). Keep the lock // held until l.process is set, so that we can synchronize the // process termination properly. l.lock.Lock() defer l.lock.Unlock() for { select { case <-l.stop: return case <-time.After(l.restartDelay): } cmd := exec.Command(*liquidsoapBin, "-T", "-U", "-v", l.configPath) cmd.Stdout = os.Stderr cmd.Stderr = os.Stderr if err := cmd.Start(); err != nil { log.Printf("error starting liquidsoap: %v", err) continue } l.process = cmd.Process l.lock.Unlock() err := cmd.Wait() l.lock.Lock() l.process = nil if err != nil { log.Printf("liquidsoap exited: %v", err) } } } func (l *liquidsoapController) Start() { l.wg.Add(1) go l.run() } func (l *liquidsoapController) Stop() { // Close the stop channel to prevent restarts. close(l.stop) // Kill the currently running process, if any. l.lock.Lock() if l.process != nil { l.process.Kill() } l.lock.Unlock() l.wg.Wait() os.RemoveAll(l.tmpdir) }