From 7867cc718812fc573ad198f9d59bfaedfb90bd0d Mon Sep 17 00:00:00 2001 From: ale <ale@incal.net> Date: Wed, 26 May 2021 17:11:15 +0100 Subject: [PATCH] Add a metric to count the number of sources per stream --- node/http.go | 4 ++-- node/instrumentation.go | 8 ++++++++ node/proxy.go | 14 ++++++++++---- node/proxy_test.go | 2 +- 4 files changed, 21 insertions(+), 7 deletions(-) diff --git a/node/http.go b/node/http.go index 428c854f..dcb03566 100644 --- a/node/http.go +++ b/node/http.go @@ -120,7 +120,7 @@ func newHTTPHandler(n *Node, icecastPort int, domain string) http.Handler { Scheme: "http", Host: fmt.Sprintf("127.0.0.1:%d", icecastPort), Path: autoradio.MountPathToIcecastPath(m.Path), - }, m.Path) + }, m.Path, false) })) // redirectHandler serves different kinds of redirects, either @@ -200,7 +200,7 @@ func serveSource(n *Node, mount *pb.Mount, w http.ResponseWriter, r *http.Reques Scheme: "http", Host: targetAddr, Path: autoradio.MountPathToIcecastPath(mount.Path), - }, mount.Path) + }, mount.Path, true) } func serveRedirect(lb *loadBalancer, mount *pb.Mount, w http.ResponseWriter, r *http.Request) { diff --git a/node/instrumentation.go b/node/instrumentation.go index 6b57e7a4..ef8ba4bc 100644 --- a/node/instrumentation.go +++ b/node/instrumentation.go @@ -30,6 +30,13 @@ var ( }, []string{"stream"}, ) + sourcesConnected = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "sources_connected", + Help: "Number of connected sources per stream.", + }, + []string{"stream"}, + ) proxyConnectErrs = prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "proxy_connect_errors", @@ -152,6 +159,7 @@ func init() { streamSentBytes, streamRcvdBytes, streamListeners, + sourcesConnected, proxyConnectErrs, icecastUpdateFailed, icecastIsLeader, diff --git a/node/proxy.go b/node/proxy.go index 593077c6..0b28d3fa 100644 --- a/node/proxy.go +++ b/node/proxy.go @@ -63,7 +63,7 @@ type wrappedWriter interface { // query string parameters and headers passed on from the original // request. The additional streamName parameter is used for // instrumentation. -func doIcecastProxy(rw http.ResponseWriter, req *http.Request, target *url.URL, streamName string) { +func doIcecastProxy(rw http.ResponseWriter, req *http.Request, target *url.URL, streamName string, isSource bool) { //log.Printf("proxy: in=%s out=%s stream=%s", req.URL.String(), target.String(), streamName) outreq := new(http.Request) @@ -170,7 +170,7 @@ func doIcecastProxy(rw http.ResponseWriter, req *http.Request, target *url.URL, } // Run two-way proxying. - handleProxy(conn, upstream, streamName) + handleProxy(conn, upstream, streamName, isSource) } // Copy data between two network connections. On recent Go versions @@ -206,9 +206,15 @@ func isInterestingError(err error) bool { // Simple two-way TCP proxy that copies data in both directions and // can shutdown each direction of the connection independently. -func handleProxy(conn, upstream io.ReadWriteCloser, streamName string) { - l := streamListeners.WithLabelValues(streamName) +func handleProxy(conn, upstream io.ReadWriteCloser, streamName string, isSource bool) { + var l prometheus.Gauge + if isSource { + l = sourcesConnected.WithLabelValues(streamName) + } else { + l = streamListeners.WithLabelValues(streamName) + } l.Inc() + var wg sync.WaitGroup wg.Add(2) diff --git a/node/proxy_test.go b/node/proxy_test.go index e0a31915..241a75bd 100644 --- a/node/proxy_test.go +++ b/node/proxy_test.go @@ -98,7 +98,7 @@ func TestProxy(t *testing.T) { http.NotFound(w, r) return } - doIcecastProxy(w, r, u, r.URL.Path) + doIcecastProxy(w, r, u, r.URL.Path, false) })) // Start a large number of clients, talking to our proxy node. -- GitLab