Skip to content
Snippets Groups Projects
Commit 7bddc686 authored by ale's avatar ale
Browse files

Add a stream prober

It will listen to a stream and export metrics on connection quality
and audio to Prometheus.
parent 32f6bfbe
No related branches found
No related tags found
1 merge request!1v2.0
package main
import (
"flag"
"log"
"net/http"
"strings"
"git.autistici.org/ale/autoradio/prober"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
type listFlag []string
func (l listFlag) String() string {
return strings.Join(l, ",")
}
func (l *listFlag) Set(value string) error {
*l = append(*l, value)
return nil
}
var (
addr = flag.String("addr", ":2525", "Address to listen on")
urls listFlag
)
func init() {
flag.Var(&urls, "url", "URL of the remote stream (can be specified multiple times)")
}
func main() {
log.SetFlags(0)
flag.Parse()
if len(urls) == 0 {
log.Fatal("--url must be set")
}
// Spawn the stream listeners.
for _, u := range urls {
p, err := prober.New(u)
if err != nil {
log.Fatal(err)
}
go p.Run()
}
// Start the HTTP listener to export Prometheus metrics.
http.Handle("/metrics", promhttp.Handler())
if err := http.ListenAndServe(*addr, nil); err != http.ErrServerClosed {
log.Fatal(err)
}
}
package prober
import "math"
type stats struct {
peak float64
rms float64
}
// Convert a float64 sample value to dB (0 is peak amplitude).
func toDB(value float64) float64 {
if value < 1e-6 {
return -120
}
return 20 * math.Log10(value)
}
// Analyze a buffer of samples and return some statistics.
func analyze(buf []float64) stats {
var peak float64
var sumSq float64
for _, f := range buf {
f = math.Abs(f)
if f > peak {
peak = f
}
sumSq += f * f
}
var s stats
s.peak = toDB(peak)
if len(buf) > 0 {
s.rms = toDB(math.Sqrt(sumSq / float64(len(buf))))
}
return s
}
package prober
import (
"encoding/binary"
"io"
"os"
"os/exec"
)
const (
analysisSampleRate = "44100"
analysisChannels = "1"
)
// The decoder uses sox(1) to decode the audio data to mono float64
// samples at a fixed sample rate (in the same format expected by the
// floatReader). We shell out to sox due to the lack of simple audio
// libraries for Go that would do the job...
type decoder struct {
io.ReadCloser
cmd *exec.Cmd
}
func newDecoder(r io.Reader) (*decoder, error) {
cmd := exec.Command("sox", "-", "-r", analysisSampleRate, "-c", analysisChannels, "-b", "64", "-e", "float", "--endian", "little", "-t", "raw", "-")
cmd.Stdin = r
cmd.Stderr = os.Stderr
stdout, err := cmd.StdoutPipe()
if err != nil {
return nil, err
}
if err := cmd.Start(); err != nil {
return nil, err
}
return &decoder{
ReadCloser: stdout,
cmd: cmd,
}, nil
}
func (d *decoder) Close() error {
d.ReadCloser.Close()
return d.cmd.Wait()
}
// The floatReader reads float64 arrays from an io.Reader.
type floatReader struct {
io.Reader
}
func (r floatReader) ReadFloats(buf []float64) (n int, err error) {
for n = 0; n < len(buf); n++ {
err = binary.Read(r.Reader, binary.LittleEndian, &buf[n])
if err != nil {
break
}
}
return
}
package prober
import (
"io"
"log"
"net/http"
"net/url"
"time"
"github.com/prometheus/client_golang/prometheus"
)
var (
reconnectDelay = 3 * time.Second
// Exported metrics.
connections = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "connections",
Help: "Number of connections",
},
[]string{"stream", "status"},
)
connected = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "stream_connected",
Help: "Is the prober connected to the stream.",
},
[]string{"stream"},
)
bytesReceived = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "bytes_received",
Help: "Bytes received",
},
[]string{"stream"},
)
decodingErrors = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "stream_decode_errors",
Help: "Decoding errors.",
},
[]string{"stream"},
)
streamRMS = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "stream_rms",
Help: "RMS amplitude of the stream audio (dB).",
},
[]string{"stream"},
)
streamPeak = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "stream_peak",
Help: "Peak amplitude of the stream audio (dB).",
},
[]string{"stream"},
)
)
func init() {
prometheus.MustRegister(
connections,
connected,
bytesReceived,
decodingErrors,
streamRMS,
streamPeak,
)
}
// Analyze one second of audio at a time.
const bufSize = 44100
// Prober continuously listens to a stream and exports some metrics
// related to connectivity and audio quality. If the connection fails,
// it will try to reconnect forever, without a timeout.
type Prober struct {
streamURL string
streamName string
}
// New returns a new unstarted Prober.
func New(streamURL string) (*Prober, error) {
u, err := url.Parse(streamURL)
if err != nil {
return nil, err
}
return &Prober{
streamURL: streamURL,
streamName: u.Path,
}, nil
}
// Receive a stream content over HTTP. We're using the standard Go
// client as it can handle this case, no need to hijack the connection.
func (p *Prober) stream() {
resp, err := http.Get(p.streamURL)
if err != nil {
log.Printf("connection error: %v", err)
connections.WithLabelValues(p.streamName, "error").Inc()
return
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
log.Printf("HTTP status %d", resp.StatusCode)
connections.WithLabelValues(p.streamName, "http_error").Inc()
return
}
// Instrument the raw bytes transfered, before decoding happens.
r := newInstrumentedReader(resp.Body, bytesReceived.WithLabelValues(p.streamName))
dec, err := newDecoder(r)
if err != nil {
log.Printf("decoder error: %v", err)
connections.WithLabelValues(p.streamName, "decoder_error").Inc()
return
}
log.Printf("connected to %s", resp.Request.URL.String())
connected.WithLabelValues(p.streamName).Set(1)
connections.WithLabelValues(p.streamName, "ok").Inc()
// Read the data one buffer at a time (which corresponds to 1
// second of audio), compute some metrics and export them.
fr := &floatReader{dec}
buf := make([]float64, bufSize)
for {
_, err := fr.ReadFloats(buf)
if err != nil {
break
}
stats := analyze(buf)
streamRMS.WithLabelValues(p.streamName).Set(stats.rms)
streamPeak.WithLabelValues(p.streamName).Set(stats.peak)
}
if err := dec.Close(); err != nil {
log.Printf("decode error: %v", err)
}
connected.WithLabelValues(p.streamName).Set(0)
}
// Run the Prober forever.
func (p *Prober) Run() {
for {
p.stream()
// Limit the number of outgoing connections by
// sleeping a bit before retrying.
time.Sleep(reconnectDelay)
}
}
// An instrumentedReader wraps an io.Reader and exports the number of
// bytes read through it to a prometheus.Counter.
type instrumentedReader struct {
io.Reader
cntr prometheus.Counter
}
func newInstrumentedReader(r io.Reader, cntr prometheus.Counter) *instrumentedReader {
return &instrumentedReader{
Reader: r,
cntr: cntr,
}
}
func (r *instrumentedReader) Read(b []byte) (int, error) {
n, err := r.Reader.Read(b)
if err == nil {
r.cntr.Add(float64(n))
}
return n, err
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment