Commit e6f5784c authored by ale's avatar ale

Add a HTTP server for prometheus metrics

Also add instrumentation for the load balancer module.
parent 76b2cd49
Pipeline #2705 failed with stages
in 2 minutes and 41 seconds
......@@ -29,6 +29,7 @@ var (
httpPort = flag.Int("http-port", 80, "HTTP port")
dnsPort = flag.Int("dns-port", 53, "DNS port")
gossipPort = flag.Int("gossip-port", 2323, "Gossip GRPC port")
metricsPort = flag.Int("monitoring-port", 2424, "HTTP port for monitoring")
bwlimit = flag.Int("bwlimit", 100, "Bandwidth usage limit (Mbps), for load-balancing")
maxClients = flag.Int("max-clients", 1000, "Maximum number of connected clients, for load-balancing")
etcdEndpoints = flag.String("etcd", "http://localhost:2379", "Etcd endpoints (comma-separated list of URLs)")
......@@ -136,7 +137,7 @@ func main() {
}
// Start all the network services.
srv := node.NewServer(n, *domain, strings.Split(*nameservers, ","), *publicIPs, *peerIP, *httpPort, *dnsPort, *gossipPort, autoradio.IcecastPort)
srv := node.NewServer(n, *domain, strings.Split(*nameservers, ","), *publicIPs, *peerIP, *httpPort, *dnsPort, *gossipPort, autoradio.IcecastPort, *metricsPort)
// Wait until the Node and the Server terminate. A failure in
// either the network services or the Node itself should cause
......
package node
import "github.com/prometheus/client_golang/prometheus"
import (
"net/http"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
var (
streamSentBytes = prometheus.NewCounterVec(
......@@ -24,6 +29,13 @@ var (
},
[]string{"stream"},
)
proxyConnectErrs = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "proxy_connect_errors",
Help: "Proxy connection errors (client-side).",
},
[]string{"stream", "upstream"},
)
)
func init() {
......@@ -31,5 +43,12 @@ func init() {
streamSentBytes,
streamRcvdBytes,
streamListeners,
proxyConnectErrs,
)
}
func newMetricsHandler() http.Handler {
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.Handler())
return mux
}
......@@ -47,7 +47,7 @@ type costAndUtil struct {
func (l *LoadBalancer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Build a view of the cost/utilization data.
l.lock.Lock()
l.lock.RLock()
var nodes []nodeDebugData
for i := 0; i < l.nodes.Len(); i++ {
n := l.nodes.Get(i)
......@@ -67,7 +67,7 @@ func (l *LoadBalancer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
for dim := range l.predictors {
dimensions = append(dimensions, dim)
}
l.lock.Unlock()
l.lock.RUnlock()
ctx := struct {
Nodes []nodeDebugData
......
package lbv2
import (
"strconv"
"github.com/prometheus/client_golang/prometheus"
)
var (
requestsDesc = prometheus.NewDesc(
"lbv2_requests",
"Number of requests sent to each upstream.",
[]string{"upstream", "dimension"}, nil,
)
reportedUtilDesc = prometheus.NewDesc(
"lbv2_reported_utilization",
"Utilization (reported).",
[]string{"upstream", "dimension"}, nil,
)
predictedUtilDesc = prometheus.NewDesc(
"lbv2_predicted_utilization",
"Utilization (predicted).",
[]string{"upstream", "dimension"}, nil,
)
costDesc = prometheus.NewDesc(
"lbv2_cost",
"Estimated query cost.",
[]string{"upstream", "dimension"}, nil,
)
)
// Generating lb metrics is a relatively expensive operation, so we
// only do it at scraping time by implementing our own custom
// prometheus.Collector.
type loadBalancerCollector struct {
*LoadBalancer
}
func (lc loadBalancerCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- requestsDesc
ch <- reportedUtilDesc
ch <- predictedUtilDesc
ch <- costDesc
}
type nodeMetrics struct {
name string
dimension int
requests int
util float64
pred float64
cost float64
}
func (lc loadBalancerCollector) Collect(ch chan<- prometheus.Metric) {
lc.LoadBalancer.lock.RLock()
data := make([]nodeMetrics, 0, lc.LoadBalancer.nodes.Len())
for i := 0; i < lc.LoadBalancer.nodes.Len(); i++ {
n := lc.LoadBalancer.nodes.Get(i)
for dim, pred := range lc.LoadBalancer.predictors {
util := n.Utilization(dim)
data = append(data, nodeMetrics{
name: n.Name(),
dimension: dim,
requests: util.Requests,
util: util.Utilization,
pred: pred.Utilization(n),
cost: float64(pred.cost[n.Name()]),
})
}
}
lc.LoadBalancer.lock.RUnlock()
for _, d := range data {
dimLabel := strconv.Itoa(d.dimension)
ch <- prometheus.MustNewConstMetric(
requestsDesc,
prometheus.CounterValue,
float64(d.requests),
d.name,
dimLabel,
)
ch <- prometheus.MustNewConstMetric(
reportedUtilDesc,
prometheus.GaugeValue,
d.util,
d.name,
dimLabel,
)
ch <- prometheus.MustNewConstMetric(
predictedUtilDesc,
prometheus.GaugeValue,
d.pred,
d.name,
dimLabel,
)
ch <- prometheus.MustNewConstMetric(
costDesc,
prometheus.GaugeValue,
d.cost,
d.name,
dimLabel,
)
}
}
......@@ -9,6 +9,7 @@ import (
"time"
"github.com/jmcvetta/randutil"
"github.com/prometheus/client_golang/prometheus"
)
// Node utilization along a specific dimension. Utilization is treated
......@@ -156,7 +157,7 @@ type NodeList interface {
// receives an equal share of incoming traffic.
//
type LoadBalancer struct {
lock sync.Mutex
lock sync.RWMutex
nodes NodeList
predictors map[int]*costPredictor
filters []NodeFilter
......@@ -168,9 +169,14 @@ type LoadBalancer struct {
// New returns a new LoadBalancer with no filters or policy set.
func New() *LoadBalancer {
return &LoadBalancer{
lb := &LoadBalancer{
predictors: make(map[int]*costPredictor),
}
lc := loadBalancerCollector{lb}
prometheus.MustRegister(lc)
return lb
}
// AddFilters appends a filter to the filter list.
......@@ -212,8 +218,8 @@ func (l *LoadBalancer) Update(nodes NodeList) {
// Choose a node according to the specified policies.
func (l *LoadBalancer) Choose(ctx RequestContext) Node {
l.lock.Lock()
defer l.lock.Unlock()
l.lock.RLock()
defer l.lock.RUnlock()
if l.nodes == nil || l.nodes.Len() == 0 {
return nil
}
......
......@@ -100,12 +100,14 @@ func doIcecastProxy(rw http.ResponseWriter, req *http.Request, target *url.URL,
if err != nil {
log.Printf("http: proxy dial error: %v", err)
rw.WriteHeader(http.StatusInternalServerError)
proxyConnectErrs.WithLabelValues(streamName, target.Host).Inc()
return
}
defer upstream.Close()
if err := outreq.Write(upstream); err != nil {
log.Printf("http: proxy request write error: %v", err)
rw.WriteHeader(http.StatusInternalServerError)
proxyConnectErrs.WithLabelValues(streamName, target.Host).Inc()
return
}
......
package node
import (
"fmt"
"net"
"strconv"
"sync"
......@@ -71,12 +72,13 @@ func (s *Server) Wait() error {
// NewServer creates a new Server. Will use publicAddrs / peerAddr to
// build all the necessary addr/port combinations.
func NewServer(n *Node, domain string, nameservers []string, publicAddrs []net.IP, peerAddr net.IP, httpPort, dnsPort, gossipPort, icecastPort int) *Server {
func NewServer(n *Node, domain string, nameservers []string, publicAddrs []net.IP, peerAddr net.IP, httpPort, dnsPort, gossipPort, icecastPort, metricsPort int) *Server {
httpHandler := newHTTPHandler(n, icecastPort, domain)
dnsHandler := newDNSHandler(n, domain, nameservers)
servers := []genericServer{
newStatusServer(mkaddr(peerAddr, gossipPort), n.statusMgr),
newHTTPServer(fmt.Sprintf(":%d", metricsPort), newMetricsHandler()),
}
for _, ip := range publicAddrs {
servers = append(servers,
......
......@@ -82,7 +82,7 @@ func Duration(p *durpb.Duration) (time.Duration, error) {
return 0, fmt.Errorf("duration: %v is out of range for time.Duration", p)
}
if p.Nanos != 0 {
d += time.Duration(p.Nanos)
d += time.Duration(p.Nanos) * time.Nanosecond
if (d < 0) != (p.Nanos < 0) {
return 0, fmt.Errorf("duration: %v is out of range for time.Duration", p)
}
......
......@@ -111,11 +111,9 @@ func TimestampNow() *tspb.Timestamp {
// TimestampProto converts the time.Time to a google.protobuf.Timestamp proto.
// It returns an error if the resulting Timestamp is invalid.
func TimestampProto(t time.Time) (*tspb.Timestamp, error) {
seconds := t.Unix()
nanos := int32(t.Sub(time.Unix(seconds, 0)))
ts := &tspb.Timestamp{
Seconds: seconds,
Nanos: nanos,
Seconds: t.Unix(),
Nanos: int32(t.Nanosecond()),
}
if err := validateTimestamp(ts); err != nil {
return nil, err
......
// Copyright 2018 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build ignore
// mkasm_darwin.go generates assembly trampolines to call libSystem routines from Go.
//This program must be run after mksyscall.go.
package main
import (
"bytes"
"fmt"
"io/ioutil"
"log"
"os"
"strings"
)
func main() {
in1, err := ioutil.ReadFile("syscall_darwin.go")
if err != nil {
log.Fatalf("can't open syscall_darwin.go: %s", err)
}
arch := os.Args[1]
in2, err := ioutil.ReadFile(fmt.Sprintf("syscall_darwin_%s.go", arch))
if err != nil {
log.Fatalf("can't open syscall_darwin_%s.go: %s", arch, err)
}
in3, err := ioutil.ReadFile(fmt.Sprintf("zsyscall_darwin_%s.go", arch))
if err != nil {
log.Fatalf("can't open zsyscall_darwin_%s.go: %s", arch, err)
}
in := string(in1) + string(in2) + string(in3)
trampolines := map[string]bool{}
var out bytes.Buffer
fmt.Fprintf(&out, "// go run mkasm_darwin.go %s\n", strings.Join(os.Args[1:], " "))
fmt.Fprintf(&out, "// Code generated by the command above; DO NOT EDIT.\n")
fmt.Fprintf(&out, "\n")
fmt.Fprintf(&out, "// +build go1.12\n")
fmt.Fprintf(&out, "\n")
fmt.Fprintf(&out, "#include \"textflag.h\"\n")
for _, line := range strings.Split(in, "\n") {
if !strings.HasPrefix(line, "func ") || !strings.HasSuffix(line, "_trampoline()") {
continue
}
fn := line[5 : len(line)-13]
if !trampolines[fn] {
trampolines[fn] = true
fmt.Fprintf(&out, "TEXT ·%s_trampoline(SB),NOSPLIT,$0-0\n", fn)
fmt.Fprintf(&out, "\tJMP\t%s(SB)\n", fn)
}
}
err = ioutil.WriteFile(fmt.Sprintf("zsyscall_darwin_%s.s", arch), out.Bytes(), 0644)
if err != nil {
log.Fatalf("can't write zsyscall_darwin_%s.s: %s", arch, err)
}
}
// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build ignore
// mkpost processes the output of cgo -godefs to
// modify the generated types. It is used to clean up
// the sys API in an architecture specific manner.
//
// mkpost is run after cgo -godefs; see README.md.
package main
import (
"bytes"
"fmt"
"go/format"
"io/ioutil"
"log"
"os"
"regexp"
)
func main() {
// Get the OS and architecture (using GOARCH_TARGET if it exists)
goos := os.Getenv("GOOS")
goarch := os.Getenv("GOARCH_TARGET")
if goarch == "" {
goarch = os.Getenv("GOARCH")
}
// Check that we are using the Docker-based build system if we should be.
if goos == "linux" {
if os.Getenv("GOLANG_SYS_BUILD") != "docker" {
os.Stderr.WriteString("In the Docker-based build system, mkpost should not be called directly.\n")
os.Stderr.WriteString("See README.md\n")
os.Exit(1)
}
}
b, err := ioutil.ReadAll(os.Stdin)
if err != nil {
log.Fatal(err)
}
// Intentionally export __val fields in Fsid and Sigset_t
valRegex := regexp.MustCompile(`type (Fsid|Sigset_t) struct {(\s+)X__val(\s+\S+\s+)}`)
b = valRegex.ReplaceAll(b, []byte("type $1 struct {${2}Val$3}"))
// Intentionally export __fds_bits field in FdSet
fdSetRegex := regexp.MustCompile(`type (FdSet) struct {(\s+)X__fds_bits(\s+\S+\s+)}`)
b = fdSetRegex.ReplaceAll(b, []byte("type $1 struct {${2}Bits$3}"))
// If we have empty Ptrace structs, we should delete them. Only s390x emits
// nonempty Ptrace structs.
ptraceRexexp := regexp.MustCompile(`type Ptrace((Psw|Fpregs|Per) struct {\s*})`)
b = ptraceRexexp.ReplaceAll(b, nil)
// Replace the control_regs union with a blank identifier for now.
controlRegsRegex := regexp.MustCompile(`(Control_regs)\s+\[0\]uint64`)
b = controlRegsRegex.ReplaceAll(b, []byte("_ [0]uint64"))
// Remove fields that are added by glibc
// Note that this is unstable as the identifers are private.
removeFieldsRegex := regexp.MustCompile(`X__glibc\S*`)
b = removeFieldsRegex.ReplaceAll(b, []byte("_"))
// Convert [65]int8 to [65]byte in Utsname members to simplify
// conversion to string; see golang.org/issue/20753
convertUtsnameRegex := regexp.MustCompile(`((Sys|Node|Domain)name|Release|Version|Machine)(\s+)\[(\d+)\]u?int8`)
b = convertUtsnameRegex.ReplaceAll(b, []byte("$1$3[$4]byte"))
// Convert [1024]int8 to [1024]byte in Ptmget members
convertPtmget := regexp.MustCompile(`([SC]n)(\s+)\[(\d+)\]u?int8`)
b = convertPtmget.ReplaceAll(b, []byte("$1[$3]byte"))
// Remove spare fields (e.g. in Statx_t)
spareFieldsRegex := regexp.MustCompile(`X__spare\S*`)
b = spareFieldsRegex.ReplaceAll(b, []byte("_"))
// Remove cgo padding fields
removePaddingFieldsRegex := regexp.MustCompile(`Pad_cgo_\d+`)
b = removePaddingFieldsRegex.ReplaceAll(b, []byte("_"))
// Remove padding, hidden, or unused fields
removeFieldsRegex = regexp.MustCompile(`\b(X_\S+|Padding)`)
b = removeFieldsRegex.ReplaceAll(b, []byte("_"))
// Remove the first line of warning from cgo
b = b[bytes.IndexByte(b, '\n')+1:]
// Modify the command in the header to include:
// mkpost, our own warning, and a build tag.
replacement := fmt.Sprintf(`$1 | go run mkpost.go
// Code generated by the command above; see README.md. DO NOT EDIT.
// +build %s,%s`, goarch, goos)
cgoCommandRegex := regexp.MustCompile(`(cgo -godefs .*)`)
b = cgoCommandRegex.ReplaceAll(b, []byte(replacement))
// gofmt
b, err = format.Source(b)
if err != nil {
log.Fatal(err)
}
os.Stdout.Write(b)
}
// Copyright 2018 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build ignore
/*
This program reads a file containing function prototypes
(like syscall_darwin.go) and generates system call bodies.
The prototypes are marked by lines beginning with "//sys"
and read like func declarations if //sys is replaced by func, but:
* The parameter lists must give a name for each argument.
This includes return parameters.
* The parameter lists must give a type for each argument:
the (x, y, z int) shorthand is not allowed.
* If the return parameter is an error number, it must be named errno.
A line beginning with //sysnb is like //sys, except that the
goroutine will not be suspended during the execution of the system
call. This must only be used for system calls which can never
block, as otherwise the system call could cause all goroutines to
hang.
*/
package main
import (
"bufio"
"flag"
"fmt"
"os"
"regexp"
"strings"
)
var (
b32 = flag.Bool("b32", false, "32bit big-endian")
l32 = flag.Bool("l32", false, "32bit little-endian")
plan9 = flag.Bool("plan9", false, "plan9")
openbsd = flag.Bool("openbsd", false, "openbsd")
netbsd = flag.Bool("netbsd", false, "netbsd")
dragonfly = flag.Bool("dragonfly", false, "dragonfly")
arm = flag.Bool("arm", false, "arm") // 64-bit value should use (even, odd)-pair
tags = flag.String("tags", "", "build tags")
filename = flag.String("output", "", "output file name (standard output if omitted)")
)
// cmdLine returns this programs's commandline arguments
func cmdLine() string {
return "go run mksyscall.go " + strings.Join(os.Args[1:], " ")
}
// buildTags returns build tags
func buildTags() string {
return *tags
}
// Param is function parameter
type Param struct {
Name string
Type string
}
// usage prints the program usage
func usage() {
fmt.Fprintf(os.Stderr, "usage: go run mksyscall.go [-b32 | -l32] [-tags x,y] [file ...]\n")
os.Exit(1)
}
// parseParamList parses parameter list and returns a slice of parameters
func parseParamList(list string) []string {
list = strings.TrimSpace(list)
if list == "" {
return []string{}
}
return regexp.MustCompile(`\s*,\s*`).Split(list, -1)
}
// parseParam splits a parameter into name and type
func parseParam(p string) Param {
ps := regexp.MustCompile(`^(\S*) (\S*)$`).FindStringSubmatch(p)
if ps == nil {
fmt.Fprintf(os.Stderr, "malformed parameter: %s\n", p)
os.Exit(1)
}
return Param{ps[1], ps[2]}
}
func main() {
// Get the OS and architecture (using GOARCH_TARGET if it exists)
goos := os.Getenv("GOOS")
if goos == "" {
fmt.Fprintln(os.Stderr, "GOOS not defined in environment")
os.Exit(1)
}
goarch := os.Getenv("GOARCH_TARGET")
if goarch == "" {
goarch = os.Getenv("GOARCH")
}
// Check that we are using the Docker-based build system if we should
if goos == "linux" {
if os.Getenv("GOLANG_SYS_BUILD") != "docker" {
fmt.Fprintf(os.Stderr, "In the Docker-based build system, mksyscall should not be called directly.\n")
fmt.Fprintf(os.Stderr, "See README.md\n")
os.Exit(1)
}
}
flag.Usage = usage
flag.Parse()
if len(flag.Args()) <= 0 {
fmt.Fprintf(os.Stderr, "no files to parse provided\n")
usage()
}
endianness := ""
if *b32 {
endianness = "big-endian"
} else if *l32 {
endianness = "little-endian"
}
libc := false
if goos == "darwin" && strings.Contains(buildTags(), ",go1.12") {
libc = true
}
trampolines := map[string]bool{}
text := ""
for _, path := range flag.Args() {
file, err := os.Open(path)
if err != nil {
fmt.Fprintf(os.Stderr, err.Error())
os.Exit(1)
}
s := bufio.NewScanner(file)
for s.Scan() {
t := s.Text()
t = strings.TrimSpace(t)
t = regexp.MustCompile(`\s+`).ReplaceAllString(t, ` `)
nonblock := regexp.MustCompile(`^\/\/sysnb `).FindStringSubmatch(t)
if regexp.MustCompile(`^\/\/sys `).FindStringSubmatch(t) == nil && nonblock == nil {
continue
}
// Line must be of the form
// func Open(path string, mode int, perm int) (fd int, errno error)
// Split into name, in params, out params.
f := regexp.MustCompile(`^\/\/sys(nb)? (\w+)\(([^()]*)\)\s*(?:\(([^()]+)\))?\s*(?:=\s*((?i)SYS_[A-Z0-9_]+))?$`).FindStringSubmatch(t)
if f == nil {
fmt.Fprintf(os.Stderr, "%s:%s\nmalformed //sys declaration\n", path, t)
os.Exit(1)
}
funct, inps, outps, sysname := f[2], f[3], f[4], f[5]
// Split argument lists on comma.
in := parseParamList(inps)
out := parseParamList(outps)
// Try in vain to keep people from editing this file.
// The theory is that they jump into the middle of the file
// without reading the header.
text += "// THIS FILE IS GENERATED BY THE COMMAND AT THE TOP; DO NOT EDIT\n\n"
// Go function header.
outDecl := ""