Commit cb0f5979 authored by ale's avatar ale
Browse files

Merge branch 'v2' into 'master'

Support cgroups v2

See merge request !3
parents 248b02da 8a36fb12
Pipeline #20615 passed with stages
in 43 seconds
package main
import (
"context"
"flag"
"fmt"
"log"
"os"
"path/filepath"
"regexp"
"strings"
"sync"
"time"
)
var (
cgroupsUpdateInterval = flag.Duration("cgroups-update-interval", 60*time.Second, "update interval")
cgroupsFilterRx = regexpFlag{Regexp: regexp.MustCompile("^system.slice/")}
cgroupsRootPath string
)
func init() {
flag.Var(&cgroupsFilterRx, "cgroups-filter", "filter for cgroup paths")
}
type regexpFlag struct {
*regexp.Regexp
}
func (f *regexpFlag) Set(value string) error {
rx, err := regexp.Compile(value)
if err != nil {
return err
}
f.Regexp = rx
return nil
}
func (f *regexpFlag) String() string {
if f.Regexp == nil {
return "<nil>"
}
return fmt.Sprintf("\"%s\"", f.Regexp.String())
}
func findCGroups() ([]string, error) {
var paths []string
err := filepath.Walk(cgroupsRootPath, func(path string, info os.FileInfo, err error) error {
if err != nil || !info.IsDir() || !strings.HasSuffix(path, ".service") {
return nil
}
// Do not track systemd internal services.
if strings.HasPrefix(info.Name(), "systemd-") {
return nil
}
relPath := path[len(cgroupsRootPath)+1:]
if !cgroupsFilterRx.MatchString(relPath) {
return nil
}
paths = append(paths, relPath)
return nil
})
debug("found %d cgroups below %s", len(paths), cgroupsRootPath)
return paths, err
}
var (
// The lock is mostly there to ensure core-coherence, we're
// only ever replacing the slice, not touching the back-end
// array.
cgroupsMx sync.Mutex
curCGroups []string
)
func getCGroups() []string {
cgroupsMx.Lock()
defer cgroupsMx.Unlock()
return curCGroups
}
func updateCGroupsList(ctx context.Context) {
updateFn := func() {
cgroups, err := findCGroups()
if err != nil {
log.Printf("error scanning /sys/fs/cgroup: %v", err)
return
}
cgroupsMx.Lock()
curCGroups = cgroups
cgroupsMx.Unlock()
}
updateFn()
go func() {
ticker := time.NewTicker(*cgroupsUpdateInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
updateFn()
}
}
}()
}
package main
import (
"github.com/prometheus/client_golang/prometheus"
)
var cpuV1Desc = prometheus.NewDesc(
"cgroup_cpu_usage",
"Cgroup CPU usage.",
[]string{"mode", "slice", "service"},
nil,
)
type cpuParser struct{}
func (p *cpuParser) describe(ch chan<- *prometheus.Desc) {
ch <- cpuV1Desc
}
func (p *cpuParser) parse(path, slice, unit string, ch chan<- prometheus.Metric) {
usage, err := parseMapFile(cgroupV1StatPath(path, "cpu,cpuacct", "cpuacct.stat"))
if err != nil {
debug("error parsing cpuacct.stat: %v", err)
return
}
ch <- prometheus.MustNewConstMetric(
cpuV1Desc,
prometheus.GaugeValue,
float64(usage["user"])/userHZ,
"user", slice, unit,
)
ch <- prometheus.MustNewConstMetric(
cpuV1Desc,
prometheus.GaugeValue,
float64(usage["system"])/userHZ,
"system", slice, unit,
)
}
package main
import (
"path/filepath"
"github.com/prometheus/client_golang/prometheus"
)
var (
usecs float64 = 1000000
cpuV2PressureStalledDesc = prometheus.NewDesc(
"cgroup_cpu_pressure_stalled_seconds_total",
"PSI stalled CPU seconds.",
[]string{"slice", "service"},
nil,
)
cpuV2PressureWaitingDesc = prometheus.NewDesc(
"cgroup_cpu_pressure_waiting_seconds_total",
"PSI waiting CPU seconds.",
[]string{"slice", "service"},
nil,
)
)
type cpuV2Parser struct{}
func (p *cpuV2Parser) describe(ch chan<- *prometheus.Desc) {
ch <- cpuV1Desc
ch <- cpuV2PressureStalledDesc
ch <- cpuV2PressureWaitingDesc
}
func (p *cpuV2Parser) parse(path, slice, unit string, ch chan<- prometheus.Metric) {
usage, err := parseMapFile(filepath.Join(cgroupsRootPath, path, "cpu.stat"))
if err != nil {
debug("error parsing cpu.stat: %v", err)
return
}
ch <- prometheus.MustNewConstMetric(
cpuV1Desc,
prometheus.GaugeValue,
float64(usage["user_usec"])/usecs,
"user", slice, unit,
)
ch <- prometheus.MustNewConstMetric(
cpuV1Desc,
prometheus.GaugeValue,
float64(usage["system_usec"])/usecs,
"system", slice, unit,
)
waiting, stalled, err := parsePressureFile(filepath.Join(cgroupsRootPath, path, "cpu.pressure"))
if err == nil {
ch <- prometheus.MustNewConstMetric(
cpuV2PressureWaitingDesc,
prometheus.CounterValue,
float64(waiting),
slice, unit,
)
ch <- prometheus.MustNewConstMetric(
cpuV2PressureStalledDesc,
prometheus.CounterValue,
float64(stalled),
slice, unit,
)
}
}
package main
import (
"bufio"
"bytes"
"context"
"flag"
"io"
......@@ -11,43 +9,26 @@ import (
"net/http"
"os"
"os/signal"
"path/filepath"
"strconv"
"strings"
"sync"
"syscall"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/tklauser/go-sysconf"
)
var (
addr = flag.String("addr", ":3909", "address to listen on")
updateInterval = flag.Duration("interval", 10*time.Second, "update interval")
doDebug = flag.Bool("debug", false, "log debug messages")
userHZ float64
cgroupsRootPath string
addr = flag.String("addr", ":3909", "address to listen on")
doDebug = flag.Bool("debug", false, "log debug messages")
)
func setCgroupsRootPath() {
// Handle the drop of the 'systemd' hierarchy with Debian Bullseye.
cgroupsRootPath = "/sys/fs/cgroup/systemd"
if _, err := os.Stat(cgroupsRootPath); os.IsNotExist(err) {
cgroupsRootPath = "/sys/fs/cgroup"
}
}
func init() {
userHZ = 100
if clktck, err := sysconf.Sysconf(sysconf.SC_CLK_TCK); err == nil {
userHZ = float64(clktck)
}
const (
cgroupsV1Root = "/sys/fs/cgroup/systemd"
cgroupsV2Root = "/sys/fs/cgroup"
)
setCgroupsRootPath()
func hasCGroupsV2() bool {
_, err := os.Stat("/sys/fs/cgroup/unified")
return os.IsNotExist(err)
}
func debug(s string, args ...interface{}) {
......@@ -56,346 +37,68 @@ func debug(s string, args ...interface{}) {
}
}
func splitServiceName(path string) (string, string) {
slice, name := filepath.Split(path)
slice = strings.Trim(slice, "/")
return slice, name
}
func parseMapFile(path string) (map[string]int64, error) {
f, err := os.Open(path)
if err != nil {
return nil, err
}
defer f.Close()
result := make(map[string]int64)
scanner := bufio.NewScanner(f)
for scanner.Scan() {
line := scanner.Bytes()
parts := bytes.Split(line, []byte(" "))
if len(parts) != 2 {
continue
}
value, err := strconv.ParseInt(string(parts[1]), 10, 64)
if err != nil {
continue
}
result[string(parts[0])] = value
}
return result, scanner.Err()
}
func parseBlkioMapFile(path string) (map[string]int64, error) {
f, err := os.Open(path)
if err != nil {
return nil, err
}
defer f.Close()
// Aggregate counts by operation type (sum by device).
result := make(map[string]int64)
scanner := bufio.NewScanner(f)
for scanner.Scan() {
line := scanner.Bytes()
parts := bytes.Split(line, []byte(" "))
if len(parts) != 3 {
continue
}
value, err := strconv.ParseInt(string(parts[2]), 10, 64)
if err != nil {
continue
}
result[string(parts[1])] += value
}
return result, scanner.Err()
}
// func parseSingleValueFile(path string) (int64, error) {
// data, err := ioutil.ReadFile(path)
// if err != nil {
// return 0, err
// }
// return strconv.ParseInt(string(data), 10, 64)
// }
func cgroupStatPath(cgroupPath, collector, path string) string {
return filepath.Join("/sys/fs/cgroup", collector, cgroupPath, path)
}
type cpuParser struct {
desc *prometheus.Desc
}
func newCPUParser() *cpuParser {
return &cpuParser{
desc: prometheus.NewDesc(
"cgroup_cpu_usage",
"Cgroup CPU usage.",
[]string{"mode", "slice", "service"},
nil,
),
}
}
func (p *cpuParser) describe(ch chan<- *prometheus.Desc) {
ch <- p.desc
}
func (p *cpuParser) parse(path string) ([]prometheus.Metric, error) {
usage, err := parseMapFile(cgroupStatPath(path, "cpu,cpuacct", "cpuacct.stat"))
if err != nil {
return nil, err
}
slice, name := splitServiceName(path)
return []prometheus.Metric{
prometheus.MustNewConstMetric(
p.desc,
prometheus.GaugeValue,
float64(usage["user"])/userHZ,
"user", slice, name,
),
prometheus.MustNewConstMetric(
p.desc,
prometheus.GaugeValue,
float64(usage["system"])/userHZ,
"system", slice, name,
),
}, nil
}
type memoryParser struct {
desc *prometheus.Desc
}
func newMemoryParser() *memoryParser {
return &memoryParser{
desc: prometheus.NewDesc(
"cgroup_memory_usage",
"Cgroup memory usage (RSS, in bytes).",
[]string{"slice", "service"},
nil,
),
}
}
func (p *memoryParser) describe(ch chan<- *prometheus.Desc) {
ch <- p.desc
}
func (p *memoryParser) parse(path string) ([]prometheus.Metric, error) {
mstat, err := parseMapFile(cgroupStatPath(path, "memory", "memory.stat"))
if err != nil {
return nil, err
}
slice, name := splitServiceName(path)
return []prometheus.Metric{
prometheus.MustNewConstMetric(
p.desc,
prometheus.GaugeValue,
float64(mstat["total_rss"]),
slice, name,
),
}, nil
}
type blkioParser struct {
bytesDesc, latencyDesc *prometheus.Desc
}
func newBlkioParser() *blkioParser {
return &blkioParser{
bytesDesc: prometheus.NewDesc(
"cgroup_blkio_bytes",
"Bytes read/written by blkio.",
[]string{"mode", "slice", "service"},
nil,
),
latencyDesc: prometheus.NewDesc(
"cgroup_blkio_latency_ns",
"Average blkio operation latency (in nanoseconds).",
[]string{"mode", "slice", "service"},
nil,
),
}
}
func (p *blkioParser) describe(ch chan<- *prometheus.Desc) {
ch <- p.bytesDesc
ch <- p.latencyDesc
}
func (p *blkioParser) parse(path string) ([]prometheus.Metric, error) {
ops, err := parseBlkioMapFile(cgroupStatPath(path, "blkio", "blkio.io_serviced"))
if err != nil {
return nil, err
}
times, err := parseBlkioMapFile(cgroupStatPath(path, "blkio", "blkio.io_service_time"))
if err != nil {
return nil, err
}
totBytes, err := parseBlkioMapFile(cgroupStatPath(path, "blkio", "blkio.io_service_bytes"))
if err != nil {
return nil, err
}
slice, name := splitServiceName(path)
m := []prometheus.Metric{
prometheus.MustNewConstMetric(
p.bytesDesc,
prometheus.CounterValue,
float64(totBytes["Write"]),
"write", slice, name,
),
prometheus.MustNewConstMetric(
p.bytesDesc,
prometheus.CounterValue,
float64(totBytes["Read"]),
"read", slice, name,
),
}
// This is unfortunately an average.
if ops["Write"] > 0 {
m = append(m, prometheus.MustNewConstMetric(
p.latencyDesc,
prometheus.GaugeValue,
float64(times["Write"])/float64(ops["Write"]),
"write", slice, name,
))
}
if ops["Read"] > 0 {
m = append(m, prometheus.MustNewConstMetric(
p.latencyDesc,
prometheus.GaugeValue,
float64(times["Read"])/float64(ops["Read"]),
"read", slice, name,
))
}
return m, nil
}
type subsystem interface {
parse(string) ([]prometheus.Metric, error)
describe(chan<- *prometheus.Desc)
}
var subsystems = []subsystem{
newCPUParser(),
newMemoryParser(),
newBlkioParser(),
}
func walkCGroups() ([]prometheus.Metric, error) {
var metrics []prometheus.Metric
err := filepath.Walk(cgroupsRootPath, func(path string, info os.FileInfo, err error) error {
if err != nil || !info.IsDir() || !strings.HasSuffix(path, ".service") {
return nil
}
// Do not track systemd internal services.
if strings.HasPrefix(info.Name(), "systemd-") {
return nil
}
m, err := walkCGroup(path[len(cgroupsRootPath)+1:])
if err != nil {
return nil
}
metrics = append(metrics, m...)
return nil
})
return metrics, err
}
func walkCGroup(path string) ([]prometheus.Metric, error) {
debug("found service %s", path)
var metrics []prometheus.Metric
for _, s := range subsystems {
m, err := s.parse(path)
if err != nil {
debug("service %s, subsystem %v: error: %v", path, s, err)
continue
}
metrics = append(metrics, m...)
}
return metrics, nil
parse(string, string, string, chan<- prometheus.Metric)
}
// Keep a pre-rendered snapshot of all the metrics, so that scraping
// and updates can be independent of each other (but still serve a
// coherent view of all the metrics).
type collector struct {
mx sync.Mutex
metrics []prometheus.Metric
readyCh chan bool
subsystems []subsystem
}
func newCollector() *collector {
func newCollector(subsystems []subsystem) *collector {
return &collector{
readyCh: make(chan bool),
subsystems: subsystems,
}
}
func (c *collector) WaitReady() {
<-c.readyCh
}
func (c *collector) Describe(ch chan<- *prometheus.Desc) {
for _, s := range subsystems {
for _, s := range c.subsystems {
s.describe(ch)
}
}
func (c *collector) Collect(ch chan<- prometheus.Metric) {
c.mx.Lock()
count := 0
for _, m := range c.metrics {
ch <- m
count++
}
debug("collected %d metrics", count)
c.mx.Unlock()
}
func (c *collector) update(metrics []prometheus.Metric) {
c.mx.Lock()
c.metrics = metrics
c.mx.Unlock()
}
func (c *collector) loop(ctx context.Context) {
if m, err := walkCGroups(); err == nil {
c.update(m)
}
close(c.readyCh)
ticker := time.NewTicker(*updateInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if m, err := walkCGroups(); err == nil {
c.update(m)
}
for _, cgroup := range getCGroups() {
slice, unit := splitServiceName(cgroup)
for _, s := range c.subsystems {
s.parse(cgroup, slice, unit, ch)
}
count++
}
debug("collected metrics from %d cgroups", count)
}