diff --git a/node/instrumentation.go b/node/instrumentation.go index 8d28762b0042a5d60219cb772e38bd620afd68a8..3791be9c5e164f286fab598a52e70f4836ea8be6 100644 --- a/node/instrumentation.go +++ b/node/instrumentation.go @@ -8,6 +8,7 @@ import ( ) var ( + // Proxy metrics. streamSentBytes = prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "bytes_sent", @@ -36,14 +37,103 @@ var ( }, []string{"stream", "upstream"}, ) + + // Node metrics. + icecastUpdateFailed = prometheus.NewGauge( + prometheus.GaugeOpts{ + Name: "icecast_update_failed", + Help: "Status of the last attempt to reload Icecast.", + }, + ) + icecastIsLeader = prometheus.NewGauge( + prometheus.GaugeOpts{ + Name: "icecast_is_leader", + Help: "Icecast leader state.", + }, + ) + + // Descriptors for the nodeCollector below. + numListenersDesc = prometheus.NewDesc( + "status_num_listeners", + "Number of total listeners.", + nil, nil, + ) + maxListenersDesc = prometheus.NewDesc( + "status_max_listeners", + "Maximum number of total listeners (for utilization).", + nil, nil, + ) + curBandwidthDesc = prometheus.NewDesc( + "status_cur_bandwidth", + "Current bandwidth usage (for utilization).", + nil, nil, + ) + maxBandwidthDesc = prometheus.NewDesc( + "status_max_bandwidth", + "Maximum bandwidth usage (for utilization).", + nil, nil, + ) + icecastOkDesc = prometheus.NewDesc( + "status_icecast_ok", + "Status of the node / Icecast connection.", + nil, nil, + ) ) +// Prometheus Collector that exports the node's Status protobuf as metrics. +type nodeCollector struct { + *Node +} + +func (nc nodeCollector) Describe(ch chan<- *prometheus.Desc) { + ch <- numListenersDesc + ch <- maxListenersDesc + ch <- curBandwidthDesc + ch <- maxBandwidthDesc + ch <- icecastOkDesc +} + +func (nc nodeCollector) Collect(ch chan<- prometheus.Metric) { + status := nc.Node.getStatus() + ch <- prometheus.MustNewConstMetric( + numListenersDesc, + prometheus.GaugeValue, + float64(status.NumListeners), + ) + ch <- prometheus.MustNewConstMetric( + maxListenersDesc, + prometheus.GaugeValue, + float64(status.MaxListeners), + ) + ch <- prometheus.MustNewConstMetric( + curBandwidthDesc, + prometheus.GaugeValue, + float64(status.CurBandwidth), + ) + ch <- prometheus.MustNewConstMetric( + maxBandwidthDesc, + prometheus.GaugeValue, + float64(status.MaxBandwidth), + ) + var okVal float64 + if status.IcecastOk { + okVal = 1 + } + ch <- prometheus.MustNewConstMetric( + icecastOkDesc, + prometheus.GaugeValue, + okVal, + ) +} + func init() { prometheus.MustRegister( streamSentBytes, streamRcvdBytes, streamListeners, proxyConnectErrs, + icecastUpdateFailed, + icecastIsLeader, ) } diff --git a/node/lbv2/lbv2.go b/node/lbv2/lbv2.go index 5dde87d33cee615bb30e0f3a9e1323418132dc8f..873316ce9adb0c71eb271aed0f04771e86fab77c 100644 --- a/node/lbv2/lbv2.go +++ b/node/lbv2/lbv2.go @@ -167,14 +167,25 @@ type LoadBalancer struct { disablePredictors bool } -// New returns a new LoadBalancer with no filters or policy set. -func New() *LoadBalancer { +// New returns a new LoadBalancer with no filters or policy set. The +// node name is only used to set a label on the associated Prometheus +// collector (in case multiple LoadBalancers are created within the +// same process). +func New(name string) *LoadBalancer { lb := &LoadBalancer{ predictors: make(map[int]*costPredictor), } lc := loadBalancerCollector{lb} - prometheus.MustRegister(lc) + // TODO: we're currently using (due to etcd) an older + // Prometheus client, which does not have WrapRegistererWith. + // prometheus.WrapRegistererWith(prometheus.Labels{"name": + // name}, prometheus.DefaultRegisterer).MustRegister(lc) + if err := prometheus.Register(lc); err != nil { + if _, ok := err.(prometheus.AlreadyRegisteredError); !ok { + panic(err) + } + } return lb } diff --git a/node/loadbalancing.go b/node/loadbalancing.go index f3b6d73b40d14de88f262b9df01be41563dff975..8ce5c35e0d9f3ed4f3ea4183adc67e42c3b9a40b 100644 --- a/node/loadbalancing.go +++ b/node/loadbalancing.go @@ -27,8 +27,8 @@ type loadBalancer struct { nodes []*nodeInfo } -func newLoadBalancer(ctx context.Context, publicPeers *presence.EndpointSet, statusMgr *statusManager, lbSpec string) (*loadBalancer, error) { - lb, err := parseLoadBalancerSpec(lbSpec) +func newLoadBalancer(ctx context.Context, nodeID string, publicPeers *presence.EndpointSet, statusMgr *statusManager, lbSpec string) (*loadBalancer, error) { + lb, err := parseLoadBalancerSpec(nodeID, lbSpec) if err != nil { return nil, err } @@ -243,8 +243,8 @@ func newIPProtocolFilter() lbv2.NodeFilter { // Some filters will always be included in the resulting LoadBalancer // and do not need to be specified explicitly (icecastActiveFilter and // ipProtocolFilter, plus an activeNodesFilter at the end). -func parseLoadBalancerSpec(specstr string) (*lbv2.LoadBalancer, error) { - lb := lbv2.New() +func parseLoadBalancerSpec(nodeID, specstr string) (*lbv2.LoadBalancer, error) { + lb := lbv2.New(nodeID) lb.AddFilter(newIcecastActiveFilter()) lb.AddFilter(newIPProtocolFilter()) diff --git a/node/node.go b/node/node.go index f45a623abe0b72fc4c5ddb2a08d9a99e81612836..7805fd52aadd67f15e25ad41cefee48c2614fc0b 100644 --- a/node/node.go +++ b/node/node.go @@ -12,6 +12,7 @@ import ( "git.autistici.org/ale/autoradio/coordination/presence" pb "git.autistici.org/ale/autoradio/proto" "git.autistici.org/ale/autoradio/util" + "github.com/prometheus/client_golang/prometheus" "go.etcd.io/etcd/clientv3/concurrency" ) @@ -137,12 +138,24 @@ func New(parentCtx context.Context, session *concurrency.Session, ice Icecast, n }) // Create the loadBalancer that runs within the node. - n.lb, err = newLoadBalancer(ctx, n.publicPeers, n.statusMgr, lbSpec) + n.lb, err = newLoadBalancer(ctx, nodeID, n.publicPeers, n.statusMgr, lbSpec) if err != nil { cancel() return nil, err } + // Create the Prometheus nodeCollector. + nc := nodeCollector{n} + // TODO: we're currently using (due to etcd) an older + // Prometheus client, which does not have WrapRegistererWith. + // prometheus.WrapRegistererWith(prometheus.Labels{"name": + // name}, prometheus.DefaultRegisterer).MustRegister(nc) + if err := prometheus.Register(nc); err != nil { + if _, ok := err.(prometheus.AlreadyRegisteredError); !ok { + panic(err) + } + } + return n, nil } @@ -166,6 +179,14 @@ func (n *Node) updateIcecastThread() { elState.Leader.Addrs[0]) if err != nil { log.Printf("error reloading Icecast: %v", err) + icecastUpdateFailed.Set(1) + } else { + icecastUpdateFailed.Set(0) + if elState.State == election.StateLeader { + icecastIsLeader.Set(1) + } else { + icecastIsLeader.Set(0) + } } // Sleeping here prevents us from reloading Icecast too