From e01222e1abbd52c53cb0ef1d67038db33ee096ae Mon Sep 17 00:00:00 2001
From: ale <ale@incal.net>
Date: Fri, 12 Apr 2019 09:41:11 +0100
Subject: [PATCH] Add more instrumentation to the Node

Export the node Status as metrics.
---
 node/instrumentation.go | 90 +++++++++++++++++++++++++++++++++++++++++
 node/lbv2/lbv2.go       | 17 ++++++--
 node/loadbalancing.go   |  8 ++--
 node/node.go            | 23 ++++++++++-
 4 files changed, 130 insertions(+), 8 deletions(-)

diff --git a/node/instrumentation.go b/node/instrumentation.go
index 8d28762b..3791be9c 100644
--- a/node/instrumentation.go
+++ b/node/instrumentation.go
@@ -8,6 +8,7 @@ import (
 )
 
 var (
+	// Proxy metrics.
 	streamSentBytes = prometheus.NewCounterVec(
 		prometheus.CounterOpts{
 			Name: "bytes_sent",
@@ -36,14 +37,103 @@ var (
 		},
 		[]string{"stream", "upstream"},
 	)
+
+	// Node metrics.
+	icecastUpdateFailed = prometheus.NewGauge(
+		prometheus.GaugeOpts{
+			Name: "icecast_update_failed",
+			Help: "Status of the last attempt to reload Icecast.",
+		},
+	)
+	icecastIsLeader = prometheus.NewGauge(
+		prometheus.GaugeOpts{
+			Name: "icecast_is_leader",
+			Help: "Icecast leader state.",
+		},
+	)
+
+	// Descriptors for the nodeCollector below.
+	numListenersDesc = prometheus.NewDesc(
+		"status_num_listeners",
+		"Number of total listeners.",
+		nil, nil,
+	)
+	maxListenersDesc = prometheus.NewDesc(
+		"status_max_listeners",
+		"Maximum number of total listeners (for utilization).",
+		nil, nil,
+	)
+	curBandwidthDesc = prometheus.NewDesc(
+		"status_cur_bandwidth",
+		"Current bandwidth usage (for utilization).",
+		nil, nil,
+	)
+	maxBandwidthDesc = prometheus.NewDesc(
+		"status_max_bandwidth",
+		"Maximum bandwidth usage (for utilization).",
+		nil, nil,
+	)
+	icecastOkDesc = prometheus.NewDesc(
+		"status_icecast_ok",
+		"Status of the node / Icecast connection.",
+		nil, nil,
+	)
 )
 
+// Prometheus Collector that exports the node's Status protobuf as metrics.
+type nodeCollector struct {
+	*Node
+}
+
+func (nc nodeCollector) Describe(ch chan<- *prometheus.Desc) {
+	ch <- numListenersDesc
+	ch <- maxListenersDesc
+	ch <- curBandwidthDesc
+	ch <- maxBandwidthDesc
+	ch <- icecastOkDesc
+}
+
+func (nc nodeCollector) Collect(ch chan<- prometheus.Metric) {
+	status := nc.Node.getStatus()
+	ch <- prometheus.MustNewConstMetric(
+		numListenersDesc,
+		prometheus.GaugeValue,
+		float64(status.NumListeners),
+	)
+	ch <- prometheus.MustNewConstMetric(
+		maxListenersDesc,
+		prometheus.GaugeValue,
+		float64(status.MaxListeners),
+	)
+	ch <- prometheus.MustNewConstMetric(
+		curBandwidthDesc,
+		prometheus.GaugeValue,
+		float64(status.CurBandwidth),
+	)
+	ch <- prometheus.MustNewConstMetric(
+		maxBandwidthDesc,
+		prometheus.GaugeValue,
+		float64(status.MaxBandwidth),
+	)
+	var okVal float64
+	if status.IcecastOk {
+		okVal = 1
+	}
+	ch <- prometheus.MustNewConstMetric(
+		icecastOkDesc,
+		prometheus.GaugeValue,
+		okVal,
+	)
+}
+
 func init() {
 	prometheus.MustRegister(
 		streamSentBytes,
 		streamRcvdBytes,
 		streamListeners,
 		proxyConnectErrs,
+		icecastUpdateFailed,
+		icecastIsLeader,
 	)
 }
 
diff --git a/node/lbv2/lbv2.go b/node/lbv2/lbv2.go
index 5dde87d3..873316ce 100644
--- a/node/lbv2/lbv2.go
+++ b/node/lbv2/lbv2.go
@@ -167,14 +167,25 @@ type LoadBalancer struct {
 	disablePredictors bool
 }
 
-// New returns a new LoadBalancer with no filters or policy set.
-func New() *LoadBalancer {
+// New returns a new LoadBalancer with no filters or policy set. The
+// node name is only used to set a label on the associated Prometheus
+// collector (in case multiple LoadBalancers are created within the
+// same process).
+func New(name string) *LoadBalancer {
 	lb := &LoadBalancer{
 		predictors: make(map[int]*costPredictor),
 	}
 
 	lc := loadBalancerCollector{lb}
-	prometheus.MustRegister(lc)
+	// TODO: we're currently using (due to etcd) an older
+	// Prometheus client, which does not have WrapRegistererWith.
+	// prometheus.WrapRegistererWith(prometheus.Labels{"name":
+	// name}, prometheus.DefaultRegisterer).MustRegister(lc)
+	if err := prometheus.Register(lc); err != nil {
+		if _, ok := err.(prometheus.AlreadyRegisteredError); !ok {
+			panic(err)
+		}
+	}
 
 	return lb
 }
diff --git a/node/loadbalancing.go b/node/loadbalancing.go
index f3b6d73b..8ce5c35e 100644
--- a/node/loadbalancing.go
+++ b/node/loadbalancing.go
@@ -27,8 +27,8 @@ type loadBalancer struct {
 	nodes  []*nodeInfo
 }
 
-func newLoadBalancer(ctx context.Context, publicPeers *presence.EndpointSet, statusMgr *statusManager, lbSpec string) (*loadBalancer, error) {
-	lb, err := parseLoadBalancerSpec(lbSpec)
+func newLoadBalancer(ctx context.Context, nodeID string, publicPeers *presence.EndpointSet, statusMgr *statusManager, lbSpec string) (*loadBalancer, error) {
+	lb, err := parseLoadBalancerSpec(nodeID, lbSpec)
 	if err != nil {
 		return nil, err
 	}
@@ -243,8 +243,8 @@ func newIPProtocolFilter() lbv2.NodeFilter {
 // Some filters will always be included in the resulting LoadBalancer
 // and do not need to be specified explicitly (icecastActiveFilter and
 // ipProtocolFilter, plus an activeNodesFilter at the end).
-func parseLoadBalancerSpec(specstr string) (*lbv2.LoadBalancer, error) {
-	lb := lbv2.New()
+func parseLoadBalancerSpec(nodeID, specstr string) (*lbv2.LoadBalancer, error) {
+	lb := lbv2.New(nodeID)
 	lb.AddFilter(newIcecastActiveFilter())
 	lb.AddFilter(newIPProtocolFilter())
 
diff --git a/node/node.go b/node/node.go
index f45a623a..7805fd52 100644
--- a/node/node.go
+++ b/node/node.go
@@ -12,6 +12,7 @@ import (
 	"git.autistici.org/ale/autoradio/coordination/presence"
 	pb "git.autistici.org/ale/autoradio/proto"
 	"git.autistici.org/ale/autoradio/util"
+	"github.com/prometheus/client_golang/prometheus"
 	"go.etcd.io/etcd/clientv3/concurrency"
 )
 
@@ -137,12 +138,24 @@ func New(parentCtx context.Context, session *concurrency.Session, ice Icecast, n
 	})
 
 	// Create the loadBalancer that runs within the node.
-	n.lb, err = newLoadBalancer(ctx, n.publicPeers, n.statusMgr, lbSpec)
+	n.lb, err = newLoadBalancer(ctx, nodeID, n.publicPeers, n.statusMgr, lbSpec)
 	if err != nil {
 		cancel()
 		return nil, err
 	}
 
+	// Create the Prometheus nodeCollector.
+	nc := nodeCollector{n}
+	// TODO: we're currently using (due to etcd) an older
+	// Prometheus client, which does not have WrapRegistererWith.
+	// prometheus.WrapRegistererWith(prometheus.Labels{"name":
+	// name}, prometheus.DefaultRegisterer).MustRegister(nc)
+	if err := prometheus.Register(nc); err != nil {
+		if _, ok := err.(prometheus.AlreadyRegisteredError); !ok {
+			panic(err)
+		}
+	}
+
 	return n, nil
 }
 
@@ -166,6 +179,14 @@ func (n *Node) updateIcecastThread() {
 			elState.Leader.Addrs[0])
 		if err != nil {
 			log.Printf("error reloading Icecast: %v", err)
+			icecastUpdateFailed.Set(1)
+		} else {
+			icecastUpdateFailed.Set(0)
+			if elState.State == election.StateLeader {
+				icecastIsLeader.Set(1)
+			} else {
+				icecastIsLeader.Set(0)
+			}
 		}
 
 		// Sleeping here prevents us from reloading Icecast too
-- 
GitLab