Skip to content
Snippets Groups Projects
Commit 48b96fc2 authored by ale's avatar ale
Browse files

Reconnect prober streams every two minutes

parent 90feac95
Branches
Tags
No related merge requests found
......@@ -7,10 +7,12 @@ type stats struct {
rms float64
}
const noSignal = -120
// Convert a float64 sample value to dB (0 is peak amplitude).
func toDB(value float64) float64 {
if value < 1e-6 {
return -120
return noSignal
}
return 20 * math.Log10(value)
}
......
......@@ -12,6 +12,7 @@ import (
var (
reconnectDelay = 3 * time.Second
streamTimeout = 2 * time.Minute
// Exported metrics.
connections = prometheus.NewCounterVec(
......@@ -42,6 +43,13 @@ var (
},
[]string{"stream"},
)
streamErrors = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "stream_errors",
Help: "Number of times the connection was broken.",
},
[]string{"stream"},
)
streamRMS = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "stream_rms",
......@@ -95,7 +103,7 @@ func New(streamURL string) (*Prober, error) {
// Receive a stream content over HTTP. We're using the standard Go
// client as it can handle this case, no need to hijack the connection.
func (p *Prober) stream() {
func (p *Prober) stream(deadline time.Time) {
resp, err := http.Get(p.streamURL)
if err != nil {
log.Printf("connection error: %v", err)
......@@ -126,8 +134,9 @@ func (p *Prober) stream() {
// second of audio), compute some metrics and export them.
fr := &floatReader{dec}
buf := make([]float64, bufSize)
for {
for time.Now().Before(deadline) {
if err := fr.ReadFloats(buf); err != nil {
streamErrors.WithLabelValues(p.streamName).Inc()
break
}
......@@ -140,13 +149,17 @@ func (p *Prober) stream() {
log.Printf("decode error: %v", err)
}
// Reset stream analysis values for convenience, so we don't
// have to join with 'stream_connected' every time.
connected.WithLabelValues(p.streamName).Set(0)
streamRMS.WithLabelValues(p.streamName).Set(noSignal)
streamPeak.WithLabelValues(p.streamName).Set(noSignal)
}
// Run the Prober forever.
func (p *Prober) Run() {
for {
p.stream()
p.stream(time.Now().Add(streamTimeout))
// Limit the number of outgoing connections by
// sleeping a bit before retrying.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment