Commit 8b0ad4eb authored by ale's avatar ale
Browse files

Support cgroups v2

The code has been refactored with a slightly simpler architecture,
where the metrics collection frequency is now controlled directly
by Prometheus scrapes. Only the list of cgroups is updated on a
slower interval (as it requires a filesystem scan of /sys/fs/cgroup).

New code has been added to support the cgroups v2 filesystem hierarchy.
parent 248b02da
Pipeline #20594 passed with stages
in 51 seconds
package main
import (
"context"
"log"
"os"
"path/filepath"
"strings"
"sync"
"time"
)
var cgroupsRootPath string
func findCGroups() ([]string, error) {
var paths []string
err := filepath.Walk(cgroupsRootPath, func(path string, info os.FileInfo, err error) error {
if err != nil || !info.IsDir() || !strings.HasSuffix(path, ".service") {
return nil
}
// Do not track systemd internal services.
if strings.HasPrefix(info.Name(), "systemd-") {
return nil
}
relPath := path[len(cgroupsRootPath)+1:]
paths = append(paths, relPath)
return nil
})
debug("found %d units below %s", len(paths), cgroupsRootPath)
return paths, err
}
var (
// The lock is mostly there to ensure core-coherence, we're
// only ever replacing the slice, not touching the back-end
// array.
cgroupsMx sync.Mutex
curCGroups []string
)
func getCGroups() []string {
cgroupsMx.Lock()
defer cgroupsMx.Unlock()
return curCGroups
}
func updateCGroupsList(ctx context.Context) {
updateFn := func() {
cgroups, err := findCGroups()
if err != nil {
log.Printf("error scanning /sys/fs/cgroup: %v", err)
return
}
cgroupsMx.Lock()
curCGroups = cgroups
cgroupsMx.Unlock()
}
updateFn()
go func() {
ticker := time.NewTicker(*updateInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
updateFn()
}
}
}()
}
package main
import (
"github.com/prometheus/client_golang/prometheus"
)
var cpuV1Desc = prometheus.NewDesc(
"cgroup_cpu_usage",
"Cgroup CPU usage.",
[]string{"mode", "slice", "service"},
nil,
)
type cpuParser struct{}
func (p *cpuParser) describe(ch chan<- *prometheus.Desc) {
ch <- cpuV1Desc
}
func (p *cpuParser) parse(path, slice, unit string, ch chan<- prometheus.Metric) {
usage, err := parseMapFile(cgroupV1StatPath(path, "cpu,cpuacct", "cpuacct.stat"))
if err != nil {
debug("error parsing cpuacct.stat: %v", err)
return
}
ch <- prometheus.MustNewConstMetric(
cpuV1Desc,
prometheus.GaugeValue,
float64(usage["user"])/userHZ,
"user", slice, unit,
)
ch <- prometheus.MustNewConstMetric(
cpuV1Desc,
prometheus.GaugeValue,
float64(usage["system"])/userHZ,
"system", slice, unit,
)
}
package main
import (
"path/filepath"
"github.com/prometheus/client_golang/prometheus"
)
var usecs float64 = 1000000
type cpuV2Parser struct{}
func (p *cpuV2Parser) describe(ch chan<- *prometheus.Desc) {
ch <- cpuV1Desc
}
func (p *cpuV2Parser) parse(path, slice, unit string, ch chan<- prometheus.Metric) {
usage, err := parseMapFile(filepath.Join(cgroupsRootPath, path, "cpu.stat"))
if err != nil {
debug("error parsing cpu.stat: %v", err)
return
}
ch <- prometheus.MustNewConstMetric(
cpuV1Desc,
prometheus.GaugeValue,
float64(usage["user_usec"])/usecs,
"user", slice, unit,
)
ch <- prometheus.MustNewConstMetric(
cpuV1Desc,
prometheus.GaugeValue,
float64(usage["system_usec"])/usecs,
"system", slice, unit,
)
}
package main
import (
"bufio"
"bytes"
"context"
"flag"
"io"
......@@ -11,43 +9,27 @@ import (
"net/http"
"os"
"os/signal"
"path/filepath"
"strconv"
"strings"
"sync"
"syscall"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/tklauser/go-sysconf"
)
var (
addr = flag.String("addr", ":3909", "address to listen on")
updateInterval = flag.Duration("interval", 10*time.Second, "update interval")
doDebug = flag.Bool("debug", false, "log debug messages")
userHZ float64
cgroupsRootPath string
)
func setCgroupsRootPath() {
// Handle the drop of the 'systemd' hierarchy with Debian Bullseye.
cgroupsRootPath = "/sys/fs/cgroup/systemd"
if _, err := os.Stat(cgroupsRootPath); os.IsNotExist(err) {
cgroupsRootPath = "/sys/fs/cgroup"
}
}
func init() {
userHZ = 100
if clktck, err := sysconf.Sysconf(sysconf.SC_CLK_TCK); err == nil {
userHZ = float64(clktck)
}
const (
cgroupsV1Root = "/sys/fs/cgroup/systemd"
cgroupsV2Root = "/sys/fs/cgroup"
)
setCgroupsRootPath()
func hasCGroupsV2() bool {
_, err := os.Stat("/sys/fs/cgroup/unified")
return os.IsNotExist(err)
}
func debug(s string, args ...interface{}) {
......@@ -56,346 +38,68 @@ func debug(s string, args ...interface{}) {
}
}
func splitServiceName(path string) (string, string) {
slice, name := filepath.Split(path)
slice = strings.Trim(slice, "/")
return slice, name
}
func parseMapFile(path string) (map[string]int64, error) {
f, err := os.Open(path)
if err != nil {
return nil, err
}
defer f.Close()
result := make(map[string]int64)
scanner := bufio.NewScanner(f)
for scanner.Scan() {
line := scanner.Bytes()
parts := bytes.Split(line, []byte(" "))
if len(parts) != 2 {
continue
}
value, err := strconv.ParseInt(string(parts[1]), 10, 64)
if err != nil {
continue
}
result[string(parts[0])] = value
}
return result, scanner.Err()
}
func parseBlkioMapFile(path string) (map[string]int64, error) {
f, err := os.Open(path)
if err != nil {
return nil, err
}
defer f.Close()
// Aggregate counts by operation type (sum by device).
result := make(map[string]int64)
scanner := bufio.NewScanner(f)
for scanner.Scan() {
line := scanner.Bytes()
parts := bytes.Split(line, []byte(" "))
if len(parts) != 3 {
continue
}
value, err := strconv.ParseInt(string(parts[2]), 10, 64)
if err != nil {
continue
}
result[string(parts[1])] += value
}
return result, scanner.Err()
}
// func parseSingleValueFile(path string) (int64, error) {
// data, err := ioutil.ReadFile(path)
// if err != nil {
// return 0, err
// }
// return strconv.ParseInt(string(data), 10, 64)
// }
func cgroupStatPath(cgroupPath, collector, path string) string {
return filepath.Join("/sys/fs/cgroup", collector, cgroupPath, path)
}
type cpuParser struct {
desc *prometheus.Desc
}
func newCPUParser() *cpuParser {
return &cpuParser{
desc: prometheus.NewDesc(
"cgroup_cpu_usage",
"Cgroup CPU usage.",
[]string{"mode", "slice", "service"},
nil,
),
}
}
func (p *cpuParser) describe(ch chan<- *prometheus.Desc) {
ch <- p.desc
}
func (p *cpuParser) parse(path string) ([]prometheus.Metric, error) {
usage, err := parseMapFile(cgroupStatPath(path, "cpu,cpuacct", "cpuacct.stat"))
if err != nil {
return nil, err
}
slice, name := splitServiceName(path)
return []prometheus.Metric{
prometheus.MustNewConstMetric(
p.desc,
prometheus.GaugeValue,
float64(usage["user"])/userHZ,
"user", slice, name,
),
prometheus.MustNewConstMetric(
p.desc,
prometheus.GaugeValue,
float64(usage["system"])/userHZ,
"system", slice, name,
),
}, nil
}
type memoryParser struct {
desc *prometheus.Desc
}
func newMemoryParser() *memoryParser {
return &memoryParser{
desc: prometheus.NewDesc(
"cgroup_memory_usage",
"Cgroup memory usage (RSS, in bytes).",
[]string{"slice", "service"},
nil,
),
}
}
func (p *memoryParser) describe(ch chan<- *prometheus.Desc) {
ch <- p.desc
}
func (p *memoryParser) parse(path string) ([]prometheus.Metric, error) {
mstat, err := parseMapFile(cgroupStatPath(path, "memory", "memory.stat"))
if err != nil {
return nil, err
}
slice, name := splitServiceName(path)
return []prometheus.Metric{
prometheus.MustNewConstMetric(
p.desc,
prometheus.GaugeValue,
float64(mstat["total_rss"]),
slice, name,
),
}, nil
}
type blkioParser struct {
bytesDesc, latencyDesc *prometheus.Desc
}
func newBlkioParser() *blkioParser {
return &blkioParser{
bytesDesc: prometheus.NewDesc(
"cgroup_blkio_bytes",
"Bytes read/written by blkio.",
[]string{"mode", "slice", "service"},
nil,
),
latencyDesc: prometheus.NewDesc(
"cgroup_blkio_latency_ns",
"Average blkio operation latency (in nanoseconds).",
[]string{"mode", "slice", "service"},
nil,
),
}
}
func (p *blkioParser) describe(ch chan<- *prometheus.Desc) {
ch <- p.bytesDesc
ch <- p.latencyDesc
}
func (p *blkioParser) parse(path string) ([]prometheus.Metric, error) {
ops, err := parseBlkioMapFile(cgroupStatPath(path, "blkio", "blkio.io_serviced"))
if err != nil {
return nil, err
}
times, err := parseBlkioMapFile(cgroupStatPath(path, "blkio", "blkio.io_service_time"))
if err != nil {
return nil, err
}
totBytes, err := parseBlkioMapFile(cgroupStatPath(path, "blkio", "blkio.io_service_bytes"))
if err != nil {
return nil, err
}
slice, name := splitServiceName(path)
m := []prometheus.Metric{
prometheus.MustNewConstMetric(
p.bytesDesc,
prometheus.CounterValue,
float64(totBytes["Write"]),
"write", slice, name,
),
prometheus.MustNewConstMetric(
p.bytesDesc,
prometheus.CounterValue,
float64(totBytes["Read"]),
"read", slice, name,
),
}
// This is unfortunately an average.
if ops["Write"] > 0 {
m = append(m, prometheus.MustNewConstMetric(
p.latencyDesc,
prometheus.GaugeValue,
float64(times["Write"])/float64(ops["Write"]),
"write", slice, name,
))
}
if ops["Read"] > 0 {
m = append(m, prometheus.MustNewConstMetric(
p.latencyDesc,
prometheus.GaugeValue,
float64(times["Read"])/float64(ops["Read"]),
"read", slice, name,
))
}
return m, nil
}
type subsystem interface {
parse(string) ([]prometheus.Metric, error)
describe(chan<- *prometheus.Desc)
}
var subsystems = []subsystem{
newCPUParser(),
newMemoryParser(),
newBlkioParser(),
}
func walkCGroups() ([]prometheus.Metric, error) {
var metrics []prometheus.Metric
err := filepath.Walk(cgroupsRootPath, func(path string, info os.FileInfo, err error) error {
if err != nil || !info.IsDir() || !strings.HasSuffix(path, ".service") {
return nil
}
// Do not track systemd internal services.
if strings.HasPrefix(info.Name(), "systemd-") {
return nil
}
m, err := walkCGroup(path[len(cgroupsRootPath)+1:])
if err != nil {
return nil
}
metrics = append(metrics, m...)
return nil
})
return metrics, err
}
func walkCGroup(path string) ([]prometheus.Metric, error) {
debug("found service %s", path)
var metrics []prometheus.Metric
for _, s := range subsystems {
m, err := s.parse(path)
if err != nil {
debug("service %s, subsystem %v: error: %v", path, s, err)
continue
}
metrics = append(metrics, m...)
}
return metrics, nil
parse(string, string, string, chan<- prometheus.Metric)
}
// Keep a pre-rendered snapshot of all the metrics, so that scraping
// and updates can be independent of each other (but still serve a
// coherent view of all the metrics).
type collector struct {
mx sync.Mutex
metrics []prometheus.Metric
readyCh chan bool
subsystems []subsystem
}
func newCollector() *collector {
func newCollector(subsystems []subsystem) *collector {
return &collector{
readyCh: make(chan bool),
subsystems: subsystems,
}
}
func (c *collector) WaitReady() {
<-c.readyCh
}
func (c *collector) Describe(ch chan<- *prometheus.Desc) {
for _, s := range subsystems {
for _, s := range c.subsystems {
s.describe(ch)
}
}
func (c *collector) Collect(ch chan<- prometheus.Metric) {
c.mx.Lock()
count := 0
for _, m := range c.metrics {
ch <- m
count++
}
debug("collected %d metrics", count)
c.mx.Unlock()
}
func (c *collector) update(metrics []prometheus.Metric) {
c.mx.Lock()
c.metrics = metrics
c.mx.Unlock()
}
func (c *collector) loop(ctx context.Context) {
if m, err := walkCGroups(); err == nil {
c.update(m)
}
close(c.readyCh)
ticker := time.NewTicker(*updateInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if m, err := walkCGroups(); err == nil {
c.update(m)
}
for _, cgroup := range getCGroups() {
slice, unit := splitServiceName(cgroup)
for _, s := range c.subsystems {
s.parse(cgroup, slice, unit, ch)
}
count++
}
debug("collected metrics from %d cgroups", count)
}
func main() {
log.SetFlags(0)
flag.Parse()
c := newCollector()
// Set the right analyzers, and the root path for cgroup
// scanning, depending on whether we detect cgroups v1 or v2.
var subsystems []subsystem
if hasCGroupsV2() {
log.Printf("cgroups v2 detected")
subsystems = []subsystem{
&memoryV2Parser{},
&cpuV2Parser{},
&blkioV2Parser{},
}
cgroupsRootPath = cgroupsV2Root
} else {
log.Printf("cgroups v1 detected")
subsystems = []subsystem{
&memoryParser{},
&cpuParser{},
&blkioParser{},
}
cgroupsRootPath = cgroupsV1Root
}
c := newCollector(subsystems)
reg := prometheus.NewRegistry()
reg.MustRegister(c)
......@@ -403,12 +107,10 @@ func main() {
// termination signal. This will stop the metrics updater.
ctx, cancel := context.WithCancel(context.Background())
// Run the update loop in a goroutine.
go c.loop(ctx)
// Only start the HTTP server if we have collected the first
// round of metrics.
c.WaitReady()
// Run a goroutine that updates periodically the list of
// cgroups, independently from the Prometheus scrape
// frequency.
updateCGroupsList(ctx)
// Create a very simple HTTP server that only exposes the
// Prometheus metrics handler.
......
module git.autistici.org/ai3/tools/cgroups-exporter
go 1.15
require (
github.com/golang/protobuf v1.3.2-0.20190409050943-e91709a02e0e // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
github.com/moby/sys/mountinfo v0.4.1
github.com/prometheus/client_golang v0.9.3-0.20190412003733-5a3ec6a883d3
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90 // indirect
github.com/prometheus/common v0.3.0 // indirect
github.com/prometheus/procfs v0.0.0-20190403104016-ea9eea638872 // indirect
github.com/tklauser/go-sysconf v0.0.0-20190604080227-d90dfe8b2f4b
github.com/tklauser/numcpus v0.0.0-20190604080617-6b5085e2650f // indirect
)