Commit e01222e1 authored by ale's avatar ale

Add more instrumentation to the Node

Export the node Status as metrics.
parent e6f5784c
Pipeline #2706 passed with stages
in 4 minutes and 27 seconds
......@@ -8,6 +8,7 @@ import (
)
var (
// Proxy metrics.
streamSentBytes = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "bytes_sent",
......@@ -36,14 +37,103 @@ var (
},
[]string{"stream", "upstream"},
)
// Node metrics.
icecastUpdateFailed = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "icecast_update_failed",
Help: "Status of the last attempt to reload Icecast.",
},
)
icecastIsLeader = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "icecast_is_leader",
Help: "Icecast leader state.",
},
)
// Descriptors for the nodeCollector below.
numListenersDesc = prometheus.NewDesc(
"status_num_listeners",
"Number of total listeners.",
nil, nil,
)
maxListenersDesc = prometheus.NewDesc(
"status_max_listeners",
"Maximum number of total listeners (for utilization).",
nil, nil,
)
curBandwidthDesc = prometheus.NewDesc(
"status_cur_bandwidth",
"Current bandwidth usage (for utilization).",
nil, nil,
)
maxBandwidthDesc = prometheus.NewDesc(
"status_max_bandwidth",
"Maximum bandwidth usage (for utilization).",
nil, nil,
)
icecastOkDesc = prometheus.NewDesc(
"status_icecast_ok",
"Status of the node / Icecast connection.",
nil, nil,
)
)
// Prometheus Collector that exports the node's Status protobuf as metrics.
type nodeCollector struct {
*Node
}
func (nc nodeCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- numListenersDesc
ch <- maxListenersDesc
ch <- curBandwidthDesc
ch <- maxBandwidthDesc
ch <- icecastOkDesc
}
func (nc nodeCollector) Collect(ch chan<- prometheus.Metric) {
status := nc.Node.getStatus()
ch <- prometheus.MustNewConstMetric(
numListenersDesc,
prometheus.GaugeValue,
float64(status.NumListeners),
)
ch <- prometheus.MustNewConstMetric(
maxListenersDesc,
prometheus.GaugeValue,
float64(status.MaxListeners),
)
ch <- prometheus.MustNewConstMetric(
curBandwidthDesc,
prometheus.GaugeValue,
float64(status.CurBandwidth),
)
ch <- prometheus.MustNewConstMetric(
maxBandwidthDesc,
prometheus.GaugeValue,
float64(status.MaxBandwidth),
)
var okVal float64
if status.IcecastOk {
okVal = 1
}
ch <- prometheus.MustNewConstMetric(
icecastOkDesc,
prometheus.GaugeValue,
okVal,
)
}
func init() {
prometheus.MustRegister(
streamSentBytes,
streamRcvdBytes,
streamListeners,
proxyConnectErrs,
icecastUpdateFailed,
icecastIsLeader,
)
}
......
......@@ -167,14 +167,25 @@ type LoadBalancer struct {
disablePredictors bool
}
// New returns a new LoadBalancer with no filters or policy set.
func New() *LoadBalancer {
// New returns a new LoadBalancer with no filters or policy set. The
// node name is only used to set a label on the associated Prometheus
// collector (in case multiple LoadBalancers are created within the
// same process).
func New(name string) *LoadBalancer {
lb := &LoadBalancer{
predictors: make(map[int]*costPredictor),
}
lc := loadBalancerCollector{lb}
prometheus.MustRegister(lc)
// TODO: we're currently using (due to etcd) an older
// Prometheus client, which does not have WrapRegistererWith.
// prometheus.WrapRegistererWith(prometheus.Labels{"name":
// name}, prometheus.DefaultRegisterer).MustRegister(lc)
if err := prometheus.Register(lc); err != nil {
if _, ok := err.(prometheus.AlreadyRegisteredError); !ok {
panic(err)
}
}
return lb
}
......
......@@ -27,8 +27,8 @@ type loadBalancer struct {
nodes []*nodeInfo
}
func newLoadBalancer(ctx context.Context, publicPeers *presence.EndpointSet, statusMgr *statusManager, lbSpec string) (*loadBalancer, error) {
lb, err := parseLoadBalancerSpec(lbSpec)
func newLoadBalancer(ctx context.Context, nodeID string, publicPeers *presence.EndpointSet, statusMgr *statusManager, lbSpec string) (*loadBalancer, error) {
lb, err := parseLoadBalancerSpec(nodeID, lbSpec)
if err != nil {
return nil, err
}
......@@ -243,8 +243,8 @@ func newIPProtocolFilter() lbv2.NodeFilter {
// Some filters will always be included in the resulting LoadBalancer
// and do not need to be specified explicitly (icecastActiveFilter and
// ipProtocolFilter, plus an activeNodesFilter at the end).
func parseLoadBalancerSpec(specstr string) (*lbv2.LoadBalancer, error) {
lb := lbv2.New()
func parseLoadBalancerSpec(nodeID, specstr string) (*lbv2.LoadBalancer, error) {
lb := lbv2.New(nodeID)
lb.AddFilter(newIcecastActiveFilter())
lb.AddFilter(newIPProtocolFilter())
......
......@@ -12,6 +12,7 @@ import (
"git.autistici.org/ale/autoradio/coordination/presence"
pb "git.autistici.org/ale/autoradio/proto"
"git.autistici.org/ale/autoradio/util"
"github.com/prometheus/client_golang/prometheus"
"go.etcd.io/etcd/clientv3/concurrency"
)
......@@ -137,12 +138,24 @@ func New(parentCtx context.Context, session *concurrency.Session, ice Icecast, n
})
// Create the loadBalancer that runs within the node.
n.lb, err = newLoadBalancer(ctx, n.publicPeers, n.statusMgr, lbSpec)
n.lb, err = newLoadBalancer(ctx, nodeID, n.publicPeers, n.statusMgr, lbSpec)
if err != nil {
cancel()
return nil, err
}
// Create the Prometheus nodeCollector.
nc := nodeCollector{n}
// TODO: we're currently using (due to etcd) an older
// Prometheus client, which does not have WrapRegistererWith.
// prometheus.WrapRegistererWith(prometheus.Labels{"name":
// name}, prometheus.DefaultRegisterer).MustRegister(nc)
if err := prometheus.Register(nc); err != nil {
if _, ok := err.(prometheus.AlreadyRegisteredError); !ok {
panic(err)
}
}
return n, nil
}
......@@ -166,6 +179,14 @@ func (n *Node) updateIcecastThread() {
elState.Leader.Addrs[0])
if err != nil {
log.Printf("error reloading Icecast: %v", err)
icecastUpdateFailed.Set(1)
} else {
icecastUpdateFailed.Set(0)
if elState.State == election.StateLeader {
icecastIsLeader.Set(1)
} else {
icecastIsLeader.Set(0)
}
}
// Sleeping here prevents us from reloading Icecast too
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment