From 48b96fc283627d43fb7737b9b6c561bc90a1965b Mon Sep 17 00:00:00 2001
From: ale <ale@incal.net>
Date: Tue, 3 Mar 2020 12:21:06 +0000
Subject: [PATCH] Reconnect prober streams every two minutes

---
 prober/analysis.go |  4 +++-
 prober/prober.go   | 19 ++++++++++++++++---
 2 files changed, 19 insertions(+), 4 deletions(-)

diff --git a/prober/analysis.go b/prober/analysis.go
index f472885c..b1d5fbc4 100644
--- a/prober/analysis.go
+++ b/prober/analysis.go
@@ -7,10 +7,12 @@ type stats struct {
 	rms  float64
 }
 
+const noSignal = -120
+
 // Convert a float64 sample value to dB (0 is peak amplitude).
 func toDB(value float64) float64 {
 	if value < 1e-6 {
-		return -120
+		return noSignal
 	}
 	return 20 * math.Log10(value)
 }
diff --git a/prober/prober.go b/prober/prober.go
index 8086f135..6a854413 100644
--- a/prober/prober.go
+++ b/prober/prober.go
@@ -12,6 +12,7 @@ import (
 
 var (
 	reconnectDelay = 3 * time.Second
+	streamTimeout  = 2 * time.Minute
 
 	// Exported metrics.
 	connections = prometheus.NewCounterVec(
@@ -42,6 +43,13 @@ var (
 		},
 		[]string{"stream"},
 	)
+	streamErrors = prometheus.NewCounterVec(
+		prometheus.CounterOpts{
+			Name: "stream_errors",
+			Help: "Number of times the connection was broken.",
+		},
+		[]string{"stream"},
+	)
 	streamRMS = prometheus.NewGaugeVec(
 		prometheus.GaugeOpts{
 			Name: "stream_rms",
@@ -95,7 +103,7 @@ func New(streamURL string) (*Prober, error) {
 
 // Receive a stream content over HTTP. We're using the standard Go
 // client as it can handle this case, no need to hijack the connection.
-func (p *Prober) stream() {
+func (p *Prober) stream(deadline time.Time) {
 	resp, err := http.Get(p.streamURL)
 	if err != nil {
 		log.Printf("connection error: %v", err)
@@ -126,8 +134,9 @@ func (p *Prober) stream() {
 	// second of audio), compute some metrics and export them.
 	fr := &floatReader{dec}
 	buf := make([]float64, bufSize)
-	for {
+	for time.Now().Before(deadline) {
 		if err := fr.ReadFloats(buf); err != nil {
+			streamErrors.WithLabelValues(p.streamName).Inc()
 			break
 		}
 
@@ -140,13 +149,17 @@ func (p *Prober) stream() {
 		log.Printf("decode error: %v", err)
 	}
 
+	// Reset stream analysis values for convenience, so we don't
+	// have to join with 'stream_connected' every time.
 	connected.WithLabelValues(p.streamName).Set(0)
+	streamRMS.WithLabelValues(p.streamName).Set(noSignal)
+	streamPeak.WithLabelValues(p.streamName).Set(noSignal)
 }
 
 // Run the Prober forever.
 func (p *Prober) Run() {
 	for {
-		p.stream()
+		p.stream(time.Now().Add(streamTimeout))
 
 		// Limit the number of outgoing connections by
 		// sleeping a bit before retrying.
-- 
GitLab