From 7bddc6861c566f2115fa0bb319b5cc5a55203da5 Mon Sep 17 00:00:00 2001
From: ale <ale@incal.net>
Date: Tue, 16 Apr 2019 22:48:10 +0100
Subject: [PATCH] Add a stream prober

It will listen to a stream and export metrics on connection quality
and audio to Prometheus.
---
 cmd/radioprober/radioprober.go |  55 ++++++++++
 prober/analysis.go             |  37 +++++++
 prober/decoder.go              |  59 +++++++++++
 prober/prober.go               | 178 +++++++++++++++++++++++++++++++++
 4 files changed, 329 insertions(+)
 create mode 100644 cmd/radioprober/radioprober.go
 create mode 100644 prober/analysis.go
 create mode 100644 prober/decoder.go
 create mode 100644 prober/prober.go

diff --git a/cmd/radioprober/radioprober.go b/cmd/radioprober/radioprober.go
new file mode 100644
index 00000000..fd6c57cd
--- /dev/null
+++ b/cmd/radioprober/radioprober.go
@@ -0,0 +1,55 @@
+package main
+
+import (
+	"flag"
+	"log"
+	"net/http"
+	"strings"
+
+	"git.autistici.org/ale/autoradio/prober"
+	"github.com/prometheus/client_golang/prometheus/promhttp"
+)
+
+type listFlag []string
+
+func (l listFlag) String() string {
+	return strings.Join(l, ",")
+}
+
+func (l *listFlag) Set(value string) error {
+	*l = append(*l, value)
+	return nil
+}
+
+var (
+	addr = flag.String("addr", ":2525", "Address to listen on")
+	urls listFlag
+)
+
+func init() {
+	flag.Var(&urls, "url", "URL of the remote stream (can be specified multiple times)")
+}
+
+func main() {
+	log.SetFlags(0)
+	flag.Parse()
+
+	if len(urls) == 0 {
+		log.Fatal("--url must be set")
+	}
+
+	// Spawn the stream listeners.
+	for _, u := range urls {
+		p, err := prober.New(u)
+		if err != nil {
+			log.Fatal(err)
+		}
+		go p.Run()
+	}
+
+	// Start the HTTP listener to export Prometheus metrics.
+	http.Handle("/metrics", promhttp.Handler())
+	if err := http.ListenAndServe(*addr, nil); err != http.ErrServerClosed {
+		log.Fatal(err)
+	}
+}
diff --git a/prober/analysis.go b/prober/analysis.go
new file mode 100644
index 00000000..f472885c
--- /dev/null
+++ b/prober/analysis.go
@@ -0,0 +1,37 @@
+package prober
+
+import "math"
+
+type stats struct {
+	peak float64
+	rms  float64
+}
+
+// Convert a float64 sample value to dB (0 is peak amplitude).
+func toDB(value float64) float64 {
+	if value < 1e-6 {
+		return -120
+	}
+	return 20 * math.Log10(value)
+}
+
+// Analyze a buffer of samples and return some statistics.
+func analyze(buf []float64) stats {
+	var peak float64
+	var sumSq float64
+	for _, f := range buf {
+		f = math.Abs(f)
+		if f > peak {
+			peak = f
+		}
+		sumSq += f * f
+	}
+
+	var s stats
+	s.peak = toDB(peak)
+	if len(buf) > 0 {
+		s.rms = toDB(math.Sqrt(sumSq / float64(len(buf))))
+	}
+
+	return s
+}
diff --git a/prober/decoder.go b/prober/decoder.go
new file mode 100644
index 00000000..b961b363
--- /dev/null
+++ b/prober/decoder.go
@@ -0,0 +1,59 @@
+package prober
+
+import (
+	"encoding/binary"
+	"io"
+	"os"
+	"os/exec"
+)
+
+const (
+	analysisSampleRate = "44100"
+	analysisChannels   = "1"
+)
+
+// The decoder uses sox(1) to decode the audio data to mono float64
+// samples at a fixed sample rate (in the same format expected by the
+// floatReader). We shell out to sox due to the lack of simple audio
+// libraries for Go that would do the job...
+type decoder struct {
+	io.ReadCloser
+	cmd *exec.Cmd
+}
+
+func newDecoder(r io.Reader) (*decoder, error) {
+	cmd := exec.Command("sox", "-", "-r", analysisSampleRate, "-c", analysisChannels, "-b", "64", "-e", "float", "--endian", "little", "-t", "raw", "-")
+	cmd.Stdin = r
+	cmd.Stderr = os.Stderr
+	stdout, err := cmd.StdoutPipe()
+	if err != nil {
+		return nil, err
+	}
+	if err := cmd.Start(); err != nil {
+		return nil, err
+	}
+	return &decoder{
+		ReadCloser: stdout,
+		cmd:        cmd,
+	}, nil
+}
+
+func (d *decoder) Close() error {
+	d.ReadCloser.Close()
+	return d.cmd.Wait()
+}
+
+// The floatReader reads float64 arrays from an io.Reader.
+type floatReader struct {
+	io.Reader
+}
+
+func (r floatReader) ReadFloats(buf []float64) (n int, err error) {
+	for n = 0; n < len(buf); n++ {
+		err = binary.Read(r.Reader, binary.LittleEndian, &buf[n])
+		if err != nil {
+			break
+		}
+	}
+	return
+}
diff --git a/prober/prober.go b/prober/prober.go
new file mode 100644
index 00000000..dd8b6f08
--- /dev/null
+++ b/prober/prober.go
@@ -0,0 +1,178 @@
+package prober
+
+import (
+	"io"
+	"log"
+	"net/http"
+	"net/url"
+	"time"
+
+	"github.com/prometheus/client_golang/prometheus"
+)
+
+var (
+	reconnectDelay = 3 * time.Second
+
+	// Exported metrics.
+	connections = prometheus.NewCounterVec(
+		prometheus.CounterOpts{
+			Name: "connections",
+			Help: "Number of connections",
+		},
+		[]string{"stream", "status"},
+	)
+	connected = prometheus.NewGaugeVec(
+		prometheus.GaugeOpts{
+			Name: "stream_connected",
+			Help: "Is the prober connected to the stream.",
+		},
+		[]string{"stream"},
+	)
+	bytesReceived = prometheus.NewCounterVec(
+		prometheus.CounterOpts{
+			Name: "bytes_received",
+			Help: "Bytes received",
+		},
+		[]string{"stream"},
+	)
+	decodingErrors = prometheus.NewCounterVec(
+		prometheus.CounterOpts{
+			Name: "stream_decode_errors",
+			Help: "Decoding errors.",
+		},
+		[]string{"stream"},
+	)
+	streamRMS = prometheus.NewGaugeVec(
+		prometheus.GaugeOpts{
+			Name: "stream_rms",
+			Help: "RMS amplitude of the stream audio (dB).",
+		},
+		[]string{"stream"},
+	)
+	streamPeak = prometheus.NewGaugeVec(
+		prometheus.GaugeOpts{
+			Name: "stream_peak",
+			Help: "Peak amplitude of the stream audio (dB).",
+		},
+		[]string{"stream"},
+	)
+)
+
+func init() {
+	prometheus.MustRegister(
+		connections,
+		connected,
+		bytesReceived,
+		decodingErrors,
+		streamRMS,
+		streamPeak,
+	)
+}
+
+// Analyze one second of audio at a time.
+const bufSize = 44100
+
+// Prober continuously listens to a stream and exports some metrics
+// related to connectivity and audio quality. If the connection fails,
+// it will try to reconnect forever, without a timeout.
+type Prober struct {
+	streamURL  string
+	streamName string
+}
+
+// New returns a new unstarted Prober.
+func New(streamURL string) (*Prober, error) {
+	u, err := url.Parse(streamURL)
+	if err != nil {
+		return nil, err
+	}
+	return &Prober{
+		streamURL:  streamURL,
+		streamName: u.Path,
+	}, nil
+
+}
+
+// Receive a stream content over HTTP. We're using the standard Go
+// client as it can handle this case, no need to hijack the connection.
+func (p *Prober) stream() {
+	resp, err := http.Get(p.streamURL)
+	if err != nil {
+		log.Printf("connection error: %v", err)
+		connections.WithLabelValues(p.streamName, "error").Inc()
+		return
+	}
+	defer resp.Body.Close()
+	if resp.StatusCode != 200 {
+		log.Printf("HTTP status %d", resp.StatusCode)
+		connections.WithLabelValues(p.streamName, "http_error").Inc()
+		return
+	}
+
+	// Instrument the raw bytes transfered, before decoding happens.
+	r := newInstrumentedReader(resp.Body, bytesReceived.WithLabelValues(p.streamName))
+	dec, err := newDecoder(r)
+	if err != nil {
+		log.Printf("decoder error: %v", err)
+		connections.WithLabelValues(p.streamName, "decoder_error").Inc()
+		return
+	}
+
+	log.Printf("connected to %s", resp.Request.URL.String())
+	connected.WithLabelValues(p.streamName).Set(1)
+	connections.WithLabelValues(p.streamName, "ok").Inc()
+
+	// Read the data one buffer at a time (which corresponds to 1
+	// second of audio), compute some metrics and export them.
+	fr := &floatReader{dec}
+	buf := make([]float64, bufSize)
+	for {
+		_, err := fr.ReadFloats(buf)
+		if err != nil {
+			break
+		}
+
+		stats := analyze(buf)
+		streamRMS.WithLabelValues(p.streamName).Set(stats.rms)
+		streamPeak.WithLabelValues(p.streamName).Set(stats.peak)
+	}
+
+	if err := dec.Close(); err != nil {
+		log.Printf("decode error: %v", err)
+	}
+
+	connected.WithLabelValues(p.streamName).Set(0)
+}
+
+// Run the Prober forever.
+func (p *Prober) Run() {
+	for {
+		p.stream()
+
+		// Limit the number of outgoing connections by
+		// sleeping a bit before retrying.
+		time.Sleep(reconnectDelay)
+	}
+}
+
+// An instrumentedReader wraps an io.Reader and exports the number of
+// bytes read through it to a prometheus.Counter.
+type instrumentedReader struct {
+	io.Reader
+	cntr prometheus.Counter
+}
+
+func newInstrumentedReader(r io.Reader, cntr prometheus.Counter) *instrumentedReader {
+	return &instrumentedReader{
+		Reader: r,
+		cntr:   cntr,
+	}
+}
+
+func (r *instrumentedReader) Read(b []byte) (int, error) {
+	n, err := r.Reader.Read(b)
+	if err == nil {
+		r.cntr.Add(float64(n))
+	}
+	return n, err
+}
-- 
GitLab