Skip to content
Snippets Groups Projects
Commit fa34802c authored by ale's avatar ale
Browse files

detect underruns

parent 5811fd61
Branches
No related tags found
No related merge requests found
package main package main
import ( import (
"errors"
"flag" "flag"
"fmt" "fmt"
"io"
"io/ioutil" "io/ioutil"
"log" "log"
"math/rand"
"net/http" "net/http"
"os" "os"
"strings" "strings"
...@@ -22,6 +25,7 @@ type Stats struct { ...@@ -22,6 +25,7 @@ type Stats struct {
HttpStatus map[int]int HttpStatus map[int]int
HttpErrors int HttpErrors int
Errors int Errors int
Underruns int
lock sync.Mutex lock sync.Mutex
} }
...@@ -49,10 +53,20 @@ func (s *Stats) Error() { ...@@ -49,10 +53,20 @@ func (s *Stats) Error() {
s.Errors++ s.Errors++
} }
func (s *Stats) Underrun() {
s.lock.Lock()
defer s.lock.Unlock()
s.Underruns++
}
func (s *Stats) Dump() { func (s *Stats) Dump() {
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock() defer s.lock.Unlock()
log.Printf("errs=%d http_errs=%d http_status=%v", s.Errors, s.HttpErrors, s.HttpStatus) log.Printf("errs=%d underruns=%d http_errs=%d http_status=%v", s.Errors, s.Underruns, s.HttpErrors, s.HttpStatus)
}
func randomDuration(max time.Duration) time.Duration {
return time.Duration(rand.Int63n(int64(max)))
} }
func readstream(id int, streamUrl string) error { func readstream(id int, streamUrl string) error {
...@@ -67,6 +81,7 @@ func readstream(id int, streamUrl string) error { ...@@ -67,6 +81,7 @@ func readstream(id int, streamUrl string) error {
return fmt.Errorf("http status %s", resp.Status) return fmt.Errorf("http status %s", resp.Status)
} }
// Handle (very roughly) M3U files.
if resp.Header.Get("Content-Type") == "audio/x-mpegurl" { if resp.Header.Get("Content-Type") == "audio/x-mpegurl" {
data, err := ioutil.ReadAll(resp.Body) data, err := ioutil.ReadAll(resp.Body)
resp.Body.Close() resp.Body.Close()
...@@ -92,22 +107,44 @@ func readstream(id int, streamUrl string) error { ...@@ -92,22 +107,44 @@ func readstream(id int, streamUrl string) error {
defer resp.Body.Close() defer resp.Body.Close()
// Just read data and discard it. // Is it actually an audio stream?
switch resp.Header.Get("Content-Type") {
case "application/ogg", "audio/mpeg":
default:
return fmt.Errorf("unknown Content-Type: %s", resp.Header.Get("Content-Type"))
}
// Just read data and discard it. While reading data, attempt
// to detect stalled connections by constantly computing a
// bitrate approximation.
bytes := 0
lastStamp := time.Now()
buf := make([]byte, 16384) buf := make([]byte, 16384)
for { for {
n, err := resp.Body.Read(buf) n, err := io.ReadFull(resp.Body, buf)
if err != nil { if err != nil {
stats.Error()
break break
} }
if n == 0 { if n == 0 {
break break
} }
bytes += n
now := time.Now()
bps := float64(n) / now.Sub(lastStamp).Seconds()
// Ignore the first few seconds.
if bytes > 65535 && bps < 2000 {
stats.Underrun()
return fmt.Errorf("bitrate too low (%g Bps)", bps)
}
lastStamp = now
} }
return fmt.Errorf("connection lost") stats.Error()
return errors.New("connection lost")
} }
func worker(id int, streamUrl string) { func worker(id int, streamUrl string) {
time.Sleep(randomDuration(10 * time.Second))
for { for {
err := readstream(id, streamUrl) err := readstream(id, streamUrl)
log.Printf("worker(%d): %v", id, err) log.Printf("worker(%d): %v", id, err)
...@@ -140,7 +177,7 @@ func main() { ...@@ -140,7 +177,7 @@ func main() {
go func(id int) { go func(id int) {
worker(id, streamUrl) worker(id, streamUrl)
wg.Done() wg.Done()
}(i) }(i + 1)
} }
wg.Wait() wg.Wait()
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment