Commit 14cf3f68 authored by ale's avatar ale

Add instrumentation (via Prometheus) of backup jobs

parent d779743e
......@@ -5,9 +5,11 @@ import (
"fmt"
"log"
"net/http"
"net/http/pprof"
"git.autistici.org/ale/tabacco/jobs"
"git.autistici.org/ale/tabacco/util"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
// Agent holds a Manager and a Scheduler together, and runs periodic
......@@ -49,9 +51,14 @@ func (a *Agent) Close() {
a.sched.Stop()
}
func (a *Agent) startHTTPServer(addr string) error {
// StartHTTPServer starts a HTTP server that exports Prometheus
// metrics and debug information.
func (a *Agent) StartHTTPServer(addr string) error {
http.HandleFunc("/debug/jobs", a.handleStateManagerDebug)
http.HandleFunc("/debug/sched", a.handleSchedulerDebug)
http.Handle("/debug/pprof/", pprof.Handler(""))
http.Handle("/metrics", promhttp.Handler())
http.HandleFunc("/", a.handleDebugPage)
go http.ListenAndServe(addr, nil)
return nil
}
......
......@@ -21,6 +21,16 @@ body { background: white; font-family: "Helvetica", sans-serif; }
<body>
`
indexDebugTpl = `{{template "header"}}
<h1>tabacco</h1>
<p><a href="/debug/jobs">Job status</a></p>
<p><a href="/debug/sched">Schedule</a></p>
{{template "footer"}}
`
stateManagerDebugTpl = `{{template "header"}}
{{define "job_status"}}
<table class="table">
......@@ -106,12 +116,12 @@ func init() {
debugTpl = template.New("")
template.Must(debugTpl.New("header").Parse(headerTpl))
template.Must(debugTpl.New("footer").Parse(footerTpl))
template.Must(debugTpl.New("index").Parse(indexDebugTpl))
template.Must(debugTpl.New("state_manager_debug_page").Parse(stateManagerDebugTpl))
template.Must(debugTpl.New("scheduler_debug_page").Parse(schedulerDebugTpl))
}
// ServeHTTP implements the job status debug handler, by making the
// stateManager object match the http.Handler interface.
// Job status debug handler.
func (a *Agent) handleStateManagerDebug(w http.ResponseWriter, r *http.Request) {
pending, running, done := a.mgr.GetStatus()
......@@ -126,11 +136,21 @@ func (a *Agent) handleStateManagerDebug(w http.ResponseWriter, r *http.Request)
})
}
// ServeHTTP implements the job status debug handler, by making the
// stateManager object match the http.Handler interface.
// Scheduler debug handler.
func (a *Agent) handleSchedulerDebug(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/html")
_ = debugTpl.Lookup("scheduler_debug_page").Execute(w, map[string]interface{}{
"Schedule": a.sched.GetStatus(),
})
}
// Agent debug handler page, with links to the other two.
func (a *Agent) handleDebugPage(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/" {
http.NotFound(w, r)
return
}
w.Header().Set("Content-Type", "text/html")
_ = debugTpl.Lookup("index").Execute(w, nil) // nolint
}
package tabacco
import (
"context"
"time"
"github.com/prometheus/client_golang/prometheus"
"git.autistici.org/ale/tabacco/jobs"
)
var (
backupSuccesses = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "backup_ok",
Help: "Did the backup succeed.",
},
[]string{"dataset"},
)
backupElapsed = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "backup_elapsed",
Help: "Time taken by the last backup.",
},
[]string{"dataset"},
)
backupLastRun = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "backup_last_run_timestamp",
Help: "Timestamp of the last backup (succeeded or not).",
},
[]string{"dataset"},
)
backupLastOk = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "backup_last_ok_timestamp",
Help: "Timestamp of the last successful backup.",
},
[]string{"dataset"},
)
)
func init() {
prometheus.MustRegister(backupSuccesses, backupElapsed, backupLastRun, backupLastOk)
}
// Wrap a job with prometheus success/lastRun instrumentation based on
// the exit status of the job.
func withInstrumentation(j jobs.Job, dataset string) jobs.Job {
return jobs.JobFunc(func(ctx context.Context) error {
start := time.Now()
err := j.RunContext(ctx)
l := prometheus.Labels{"dataset": dataset}
elapsed := time.Since(start)
backupElapsed.With(l).Set(elapsed.Seconds())
backupLastRun.With(l).Set(float64(start.Unix()))
var statusValue float64
if err == nil {
backupLastOk.With(l).Set(float64(start.Unix()))
statusValue = 1
}
backupSuccesses.With(l).Set(statusValue)
return err
})
}
......@@ -103,10 +103,18 @@ func (m *tabaccoManager) backupDatasetJob(h Handler, backup Backup, ds Dataset)
// leave-running policy, so no more than one backup per
// datasource can run at any given time. Finally, the job runs
// in the 'backup' queue for concurrency limiting.
//
// Oh, and here is where we add per-dataset instrumentation.
id := fmt.Sprintf("backup-dataset-%s", ds.Name)
return m.WithQueue(
m.WithStatus(
m.WithExclusiveLock(jobs.SyncGroup(out), id, false),
m.WithExclusiveLock(
withInstrumentation(
jobs.SyncGroup(out),
ds.Name,
),
id,
false),
id,
),
"backup",
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment