Commit ebdd223f authored by ale's avatar ale

Log command output along with job IDs

parent 4b1c7f94
Pipeline #3496 passed with stages
in 2 minutes and 12 seconds
......@@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"
"log"
"sync"
"time"
......@@ -23,14 +24,24 @@ type Job interface {
Wait() error
}
// Give a job its own random unique ID.
// Give a job its own random unique ID. The ID is stored in the
// Context and can be retrieved at runtime by calling the GetID()
// function.
type idJob struct {
Job
id string
}
type idJobKeyType int
var idJobKey idJobKeyType = 1
func (j *idJob) ID() string { return j.id }
func (j *idJob) RunContext(ctx context.Context) error {
return j.Job.RunContext(context.WithValue(ctx, idJobKey, j.id))
}
// WithID gives a job a random unique ID.
func WithID(j Job) Job {
return &idJob{
......@@ -39,6 +50,15 @@ func WithID(j Job) Job {
}
}
// GetID returns the job ID when called from RunContext.
func GetID(ctx context.Context) string {
id, ok := ctx.Value(idJobKey).(string)
if ok {
return id
}
return ""
}
// Adds a Cancel method to a job.
type cancelableJob struct {
Job
......@@ -268,10 +288,6 @@ type Status struct {
//
// It has no practical effect on the jobs themselves, it's just a way
// to provide the user with debugging and auditing information.
//
// Note that it must use its own random ID, and not job.ID, otherwise
// nesting WithStatus handlers on the same job will cause panics (due
// to duplicated transitions from state to state).
type StateManager struct {
mx sync.Mutex
pending map[string]*Status
......@@ -295,6 +311,10 @@ func (m *StateManager) setStatusPending(id, name string, j Job) {
m.mx.Lock()
defer m.mx.Unlock()
if _, ok := m.pending[id]; ok {
panic("duplicate job ID detected")
}
m.pending[id] = &Status{
ID: id,
Name: name,
......@@ -312,6 +332,8 @@ func (m *StateManager) setStatusRunning(id string) {
js.Status = JobStatusRunning
delete(m.pending, id)
m.running[id] = js
log.Printf("job %s (%s): starting", id, js.Name)
}
func (m *StateManager) setStatusDone(id string, err error) {
......@@ -328,6 +350,13 @@ func (m *StateManager) setStatusDone(id string, err error) {
for m.done.Len() > logsToKeep {
m.done.Remove(m.done.Back())
}
statusMsg := "ok"
if err != nil {
statusMsg = fmt.Sprintf("ERROR: %v", err)
}
elapsed := js.CompletedAt.Sub(js.StartedAt)
log.Printf("job %s (%s): completed in %s, %s", id, js.Name, elapsed, statusMsg)
}
// WithStatus tracks a job through its lifetime. The name is used for
......@@ -336,10 +365,16 @@ func (m *StateManager) WithStatus(j Job, name string) Job {
sj := &statusJob{
Job: j,
mgr: m,
id: util.RandomID(),
id: j.ID(),
}
if sj.id == "" {
panic("job with null ID")
}
m.setStatusPending(sj.id, name, sj)
return sj
// Give this job its own ID, in case WithStatus calls are
// nested.
return WithID(sj)
}
func jobStatusMapToSlice(m map[string]*Status) []Status {
......
package tabacco
import (
"bufio"
"context"
"fmt"
"io/ioutil"
"log"
"os"
......@@ -101,24 +103,58 @@ func (s *Shell) command(ctx context.Context, arg string) *exec.Cmd {
var env []string
env = append(env, s.env...)
c.Env = env
c.Stderr = os.Stderr
c.Dir = getWorkDir(ctx)
log.Printf("sh: %s", arg)
if jobID := jobs.GetID(ctx); jobID != "" {
log.Printf("job %s: sh: %s", jobID, arg)
} else {
log.Printf("sh: %s", arg)
}
return c
}
// Run a command.
// Run a command. Log its standard output and error.
func (s *Shell) Run(ctx context.Context, arg string) error {
c := s.command(ctx, arg)
c.Stdout = os.Stdout
return c.Run()
stdout, err := c.StdoutPipe()
if err != nil {
return err
}
stderr, err := c.StderrPipe()
if err != nil {
return err
}
if err := c.Start(); err != nil {
return err
}
var logPrefix string
if jobID := jobs.GetID(ctx); jobID != "" {
logPrefix = fmt.Sprintf("job %s: ", jobID)
}
go func() {
scanner := bufio.NewScanner(stdout)
for scanner.Scan() {
log.Printf("%s%s", logPrefix, scanner.Text())
}
}()
go func() {
scanner := bufio.NewScanner(stderr)
for scanner.Scan() {
log.Printf("%sstderr: %s", logPrefix, scanner.Text())
}
}()
return c.Wait()
}
// Output runs a command and returns the standard output.
func (s *Shell) Output(ctx context.Context, arg string) ([]byte, error) {
return s.command(ctx, arg).Output()
c := s.command(ctx, arg)
c.Stderr = os.Stderr
return c.Output()
}
// Manage a work directory (temporary scratch with lots of space).
......@@ -146,6 +182,7 @@ func (w *workdirManager) withWorkDir(j jobs.Job) jobs.Job {
}
}
// Give a Job its own temporary directory.
type workdirJob struct {
jobs.Job
dir string
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment