Skip to content
Snippets Groups Projects
liquidsoap.go 4.38 KiB
package node

import (
	"flag"
	"fmt"
	"io"
	"io/ioutil"
	"log"
	"os"
	"os/exec"
	"path/filepath"
	"sync"
	"text/template"
	"time"

	"git.autistici.org/ale/autoradio"
)

var (
	liquidsoapBin = flag.String("liquidsoap", "/usr/bin/liquidsoap", "Location of the liquidsoap binary")

	liquidsoapConfigStr = `
set("log.file", false)
set("log.stdout", true)

upstream = mksafe(input.http("{{.SourceURL}}", buffer=5.0))
output.icecast(%{{.Format}}(samplerate={{.SampleRate}}, {{if gt .BitRate 0}}bitrate={{.BitRate}}, {{end}}{{if gt .Quality 0.0}}quality={{.Quality}}, {{end}}{{if eq .Channels 2}}stereo{{else}}mono{{end}}),
  mount="{{.TargetMount}}", host="{{.TargetIP}}", port={{.TargetPort}}, password="{{.TargetPassword}}",
  upstream)
`

	liquidsoapConfigTpl = template.Must(template.New("liquidsoap").Parse(liquidsoapConfigStr))
)

// Parameters to configure a liquidsoap-based transcoder.
type liquidsoapParams struct {
	// Source (upstream) URL.
	SourceURL string

	// Target (downstream) connection parameters.
	TargetIP       string
	TargetPort     int
	TargetMount    string
	TargetPassword string

	// Stream encoding parameters. One note on the 'float32'
	// choice for Quality: text/template can't evaluate the 'gt'
	// function successfully on a float64 type!
	Format     string
	BitRate    int
	SampleRate int
	Channels   int
	Quality    float32
}

// Create new parameters for liquidsoap for a transcoding mount. If
// mount.Transcoding is nil, this function will panic, so the caller
// should check mount.HasTranscoder().
func newLiquidsoapParams(mount *autoradio.Mount) *liquidsoapParams {
	return &liquidsoapParams{
		SourceURL:      fmt.Sprintf("http://localhost%s", mount.Transcoding.SourceName),
		TargetIP:       "localhost",
		TargetPort:     80,
		TargetMount:    mount.Name,
		TargetPassword: mount.Password,
		Format:         mount.Transcoding.Format,
		BitRate:        mount.Transcoding.BitRate,
		SampleRate:     mount.Transcoding.SampleRate,
		Channels:       mount.Transcoding.Channels,
		Quality:        float32(mount.Transcoding.Quality),
	}
}

func (p *liquidsoapParams) Equal(other *liquidsoapParams) bool {
	return *p == *other
}

func (p *liquidsoapParams) Render(w io.Writer) error {
	return liquidsoapConfigTpl.Execute(w, p)
}

// Controls a single liquidsoap process, transcoding a single stream.
type liquidsoapController struct {
	params       *liquidsoapParams
	tmpdir       string
	configPath   string
	restartDelay time.Duration

	stop chan bool
	wg   sync.WaitGroup

	lock    sync.Mutex
	process *os.Process
}

func newLiquidsoap(params *liquidsoapParams) (*liquidsoapController, error) {
	tmpdir, err := ioutil.TempDir("", "")
	if err != nil {
		return nil, err
	}
	configpath := filepath.Join(tmpdir, "transcode.liq")

	return &liquidsoapController{
		params:       params,
		tmpdir:       tmpdir,
		configPath:   configpath,
		restartDelay: 500 * time.Millisecond,
		stop:         make(chan bool),
	}, nil
}

func (l *liquidsoapController) writeConfig() error {
	file, err := os.Create(l.configPath)
	if err != nil {
		return err
	}
	defer file.Close()
	return l.params.Render(file)
}

func (l *liquidsoapController) setProcess(p *os.Process) {
	l.lock.Lock()
	l.process = p
	defer l.lock.Unlock()
}

func (l *liquidsoapController) run() {
	defer l.wg.Done()

	if err := l.writeConfig(); err != nil {
		log.Printf("error starting liquidsoap: %v", err)
		return
	}

	// Keep restarting liquidsoap if it fails, until the stop
	// channel is closed (requesting termination). Keep the lock
	// held until l.process is set, so that we can synchronize the
	// process termination properly.
	l.lock.Lock()
	defer l.lock.Unlock()
	for {
		select {
		case <-l.stop:
			return
		case <-time.After(l.restartDelay):
		}

		cmd := exec.Command(*liquidsoapBin, "-T", "-U", "-v", l.configPath)
		cmd.Stdout = os.Stderr
		cmd.Stderr = os.Stderr
		if err := cmd.Start(); err != nil {
			log.Printf("error starting liquidsoap: %v", err)
			continue
		}
		l.process = cmd.Process
		l.lock.Unlock()
		err := cmd.Wait()
		l.lock.Lock()
		l.process = nil
		if err != nil {
			log.Printf("liquidsoap exited: %v", err)
		}
	}
}

func (l *liquidsoapController) Start() {
	l.wg.Add(1)
	go l.run()
}

func (l *liquidsoapController) Stop() {
	// Close the stop channel to prevent restarts.
	close(l.stop)

	// Kill the currently running process, if any.
	l.lock.Lock()
	if l.process != nil {
		l.process.Kill()
	}
	l.lock.Unlock()

	l.wg.Wait()

	os.RemoveAll(l.tmpdir)
}