Skip to content
Snippets Groups Projects
Commit 64203771 authored by ale's avatar ale
Browse files

Merge branch 'sources-metrics' into 'master'

Add a metric to count the number of sources per stream

See merge request !40
parents 28a256ea 7867cc71
Branches
No related tags found
1 merge request!40Add a metric to count the number of sources per stream
...@@ -120,7 +120,7 @@ func newHTTPHandler(n *Node, icecastPort int, domain string) http.Handler { ...@@ -120,7 +120,7 @@ func newHTTPHandler(n *Node, icecastPort int, domain string) http.Handler {
Scheme: "http", Scheme: "http",
Host: fmt.Sprintf("127.0.0.1:%d", icecastPort), Host: fmt.Sprintf("127.0.0.1:%d", icecastPort),
Path: autoradio.MountPathToIcecastPath(m.Path), Path: autoradio.MountPathToIcecastPath(m.Path),
}, m.Path) }, m.Path, false)
})) }))
// redirectHandler serves different kinds of redirects, either // redirectHandler serves different kinds of redirects, either
...@@ -200,7 +200,7 @@ func serveSource(n *Node, mount *pb.Mount, w http.ResponseWriter, r *http.Reques ...@@ -200,7 +200,7 @@ func serveSource(n *Node, mount *pb.Mount, w http.ResponseWriter, r *http.Reques
Scheme: "http", Scheme: "http",
Host: targetAddr, Host: targetAddr,
Path: autoradio.MountPathToIcecastPath(mount.Path), Path: autoradio.MountPathToIcecastPath(mount.Path),
}, mount.Path) }, mount.Path, true)
} }
func serveRedirect(lb *loadBalancer, mount *pb.Mount, w http.ResponseWriter, r *http.Request) { func serveRedirect(lb *loadBalancer, mount *pb.Mount, w http.ResponseWriter, r *http.Request) {
......
...@@ -30,6 +30,13 @@ var ( ...@@ -30,6 +30,13 @@ var (
}, },
[]string{"stream"}, []string{"stream"},
) )
sourcesConnected = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "sources_connected",
Help: "Number of connected sources per stream.",
},
[]string{"stream"},
)
proxyConnectErrs = prometheus.NewCounterVec( proxyConnectErrs = prometheus.NewCounterVec(
prometheus.CounterOpts{ prometheus.CounterOpts{
Name: "proxy_connect_errors", Name: "proxy_connect_errors",
...@@ -152,6 +159,7 @@ func init() { ...@@ -152,6 +159,7 @@ func init() {
streamSentBytes, streamSentBytes,
streamRcvdBytes, streamRcvdBytes,
streamListeners, streamListeners,
sourcesConnected,
proxyConnectErrs, proxyConnectErrs,
icecastUpdateFailed, icecastUpdateFailed,
icecastIsLeader, icecastIsLeader,
......
...@@ -63,7 +63,7 @@ type wrappedWriter interface { ...@@ -63,7 +63,7 @@ type wrappedWriter interface {
// query string parameters and headers passed on from the original // query string parameters and headers passed on from the original
// request. The additional streamName parameter is used for // request. The additional streamName parameter is used for
// instrumentation. // instrumentation.
func doIcecastProxy(rw http.ResponseWriter, req *http.Request, target *url.URL, streamName string) { func doIcecastProxy(rw http.ResponseWriter, req *http.Request, target *url.URL, streamName string, isSource bool) {
//log.Printf("proxy: in=%s out=%s stream=%s", req.URL.String(), target.String(), streamName) //log.Printf("proxy: in=%s out=%s stream=%s", req.URL.String(), target.String(), streamName)
outreq := new(http.Request) outreq := new(http.Request)
...@@ -170,7 +170,7 @@ func doIcecastProxy(rw http.ResponseWriter, req *http.Request, target *url.URL, ...@@ -170,7 +170,7 @@ func doIcecastProxy(rw http.ResponseWriter, req *http.Request, target *url.URL,
} }
// Run two-way proxying. // Run two-way proxying.
handleProxy(conn, upstream, streamName) handleProxy(conn, upstream, streamName, isSource)
} }
// Copy data between two network connections. On recent Go versions // Copy data between two network connections. On recent Go versions
...@@ -206,9 +206,15 @@ func isInterestingError(err error) bool { ...@@ -206,9 +206,15 @@ func isInterestingError(err error) bool {
// Simple two-way TCP proxy that copies data in both directions and // Simple two-way TCP proxy that copies data in both directions and
// can shutdown each direction of the connection independently. // can shutdown each direction of the connection independently.
func handleProxy(conn, upstream io.ReadWriteCloser, streamName string) { func handleProxy(conn, upstream io.ReadWriteCloser, streamName string, isSource bool) {
l := streamListeners.WithLabelValues(streamName) var l prometheus.Gauge
if isSource {
l = sourcesConnected.WithLabelValues(streamName)
} else {
l = streamListeners.WithLabelValues(streamName)
}
l.Inc() l.Inc()
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(2) wg.Add(2)
......
...@@ -98,7 +98,7 @@ func TestProxy(t *testing.T) { ...@@ -98,7 +98,7 @@ func TestProxy(t *testing.T) {
http.NotFound(w, r) http.NotFound(w, r)
return return
} }
doIcecastProxy(w, r, u, r.URL.Path) doIcecastProxy(w, r, u, r.URL.Path, false)
})) }))
// Start a large number of clients, talking to our proxy node. // Start a large number of clients, talking to our proxy node.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment