diff --git a/node/http.go b/node/http.go index 428c854f7619461b9962bc3a50a53764442d1498..dcb03566e14b43e9b1803546acc244459774f407 100644 --- a/node/http.go +++ b/node/http.go @@ -120,7 +120,7 @@ func newHTTPHandler(n *Node, icecastPort int, domain string) http.Handler { Scheme: "http", Host: fmt.Sprintf("127.0.0.1:%d", icecastPort), Path: autoradio.MountPathToIcecastPath(m.Path), - }, m.Path) + }, m.Path, false) })) // redirectHandler serves different kinds of redirects, either @@ -200,7 +200,7 @@ func serveSource(n *Node, mount *pb.Mount, w http.ResponseWriter, r *http.Reques Scheme: "http", Host: targetAddr, Path: autoradio.MountPathToIcecastPath(mount.Path), - }, mount.Path) + }, mount.Path, true) } func serveRedirect(lb *loadBalancer, mount *pb.Mount, w http.ResponseWriter, r *http.Request) { diff --git a/node/instrumentation.go b/node/instrumentation.go index 6b57e7a4070e56812fc817d21c144c33e996f2a1..ef8ba4bce459106d1fa6ab830f103385b55a4590 100644 --- a/node/instrumentation.go +++ b/node/instrumentation.go @@ -30,6 +30,13 @@ var ( }, []string{"stream"}, ) + sourcesConnected = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "sources_connected", + Help: "Number of connected sources per stream.", + }, + []string{"stream"}, + ) proxyConnectErrs = prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "proxy_connect_errors", @@ -152,6 +159,7 @@ func init() { streamSentBytes, streamRcvdBytes, streamListeners, + sourcesConnected, proxyConnectErrs, icecastUpdateFailed, icecastIsLeader, diff --git a/node/proxy.go b/node/proxy.go index 593077c6a99021827af14816c30a7913e98486d4..0b28d3fa3fca957b1b0b8516f59d42e294003ab5 100644 --- a/node/proxy.go +++ b/node/proxy.go @@ -63,7 +63,7 @@ type wrappedWriter interface { // query string parameters and headers passed on from the original // request. The additional streamName parameter is used for // instrumentation. -func doIcecastProxy(rw http.ResponseWriter, req *http.Request, target *url.URL, streamName string) { +func doIcecastProxy(rw http.ResponseWriter, req *http.Request, target *url.URL, streamName string, isSource bool) { //log.Printf("proxy: in=%s out=%s stream=%s", req.URL.String(), target.String(), streamName) outreq := new(http.Request) @@ -170,7 +170,7 @@ func doIcecastProxy(rw http.ResponseWriter, req *http.Request, target *url.URL, } // Run two-way proxying. - handleProxy(conn, upstream, streamName) + handleProxy(conn, upstream, streamName, isSource) } // Copy data between two network connections. On recent Go versions @@ -206,9 +206,15 @@ func isInterestingError(err error) bool { // Simple two-way TCP proxy that copies data in both directions and // can shutdown each direction of the connection independently. -func handleProxy(conn, upstream io.ReadWriteCloser, streamName string) { - l := streamListeners.WithLabelValues(streamName) +func handleProxy(conn, upstream io.ReadWriteCloser, streamName string, isSource bool) { + var l prometheus.Gauge + if isSource { + l = sourcesConnected.WithLabelValues(streamName) + } else { + l = streamListeners.WithLabelValues(streamName) + } l.Inc() + var wg sync.WaitGroup wg.Add(2) diff --git a/node/proxy_test.go b/node/proxy_test.go index e0a319155482969081ddfd8214eba4ae873b1a6b..241a75bd93f65f0d0551cd857cbfde7e3949443b 100644 --- a/node/proxy_test.go +++ b/node/proxy_test.go @@ -98,7 +98,7 @@ func TestProxy(t *testing.T) { http.NotFound(w, r) return } - doIcecastProxy(w, r, u, r.URL.Path) + doIcecastProxy(w, r, u, r.URL.Path, false) })) // Start a large number of clients, talking to our proxy node.