Commit 6bcbc560 authored by ale's avatar ale

Limit concurrency on all jobs

parent 8bd8f483
Pipeline #3554 passed with stages
in 2 minutes and 15 seconds
......@@ -201,51 +201,42 @@ func (j *exclusiveJob) RunContext(ctx context.Context) error {
// parallel tasks. By default (or for unknown queues) there is no
// limit.
type QueueManager struct {
queues map[string]chan struct{}
sem chan struct{}
}
// QueueSpec describes the configuration of named queues.
type QueueSpec struct {
Workers map[string]int `yaml:"workers"`
Concurrency int `yaml:"concurrency"`
}
// NewQueueManager returns a new QueueManager with the provided
// configuration.
func NewQueueManager(spec *QueueSpec) *QueueManager {
q := map[string]chan struct{}{
"default": make(chan struct{}, 2),
n := spec.Concurrency
if n < 1 {
n = 1
}
for name, n := range spec.Workers {
q[name] = make(chan struct{}, n)
}
return &QueueManager{queues: q}
return &QueueManager{sem: make(chan struct{}, n)}
}
// WithQueue wraps a job with a concurrency limit controller.
func (m *QueueManager) WithQueue(j Job, queue string) Job {
ch, ok := m.queues[queue]
if !ok {
ch, ok = m.queues["default"]
if !ok {
return j
}
}
return &queuedJob{Job: j, ch: ch}
func (m *QueueManager) WithQueue(j Job) Job {
return &queuedJob{Job: j, sem: m.sem}
}
type queuedJob struct {
Job
ch chan struct{}
sem chan struct{}
}
func (j *queuedJob) RunContext(ctx context.Context) error {
select {
case j.ch <- struct{}{}:
case j.sem <- struct{}{}:
case <-ctx.Done():
return ctx.Err()
}
err := j.Job.RunContext(ctx)
<-j.ch
<-j.sem
return err
}
......
......@@ -134,7 +134,6 @@ func (m *tabaccoManager) makeBackupJob(ctx context.Context, rctx RuntimeContext,
backup,
ds,
),
"dataset",
))
}
......@@ -155,20 +154,17 @@ func (m *tabaccoManager) makeBackupJob(ctx context.Context, rctx RuntimeContext,
//
// Oh, and here is where we add per-dataset instrumentation.
id := fmt.Sprintf("backup-source-%s", src.Name)
return m.WithQueue(
m.WithStatus(
m.WithExclusiveLock(
m.withWorkDir(
withInstrumentation(
backupJob,
src.Name,
),
return m.WithStatus(
m.WithExclusiveLock(
m.withWorkDir(
withInstrumentation(
backupJob,
src.Name,
),
id,
false),
),
id,
),
"source",
false),
id,
), nil
}
......@@ -213,7 +209,6 @@ func (m *tabaccoManager) makeRestoreJob(rctx RuntimeContext, backup *Backup, src
restoreJobs,
m.WithQueue(
h.RestoreJob(rctx, backup, ds, target),
"dataset",
),
)
}
......@@ -233,15 +228,12 @@ func (m *tabaccoManager) makeRestoreJob(rctx RuntimeContext, backup *Backup, src
// datasource can run at any given time. Finally, the job runs
// in the 'restore' queue for concurrency limiting.
id := fmt.Sprintf("restore-source-%s", src.Name)
return m.WithQueue(
m.WithStatus(
m.WithExclusiveLock(
restoreJob,
id,
false),
return m.WithStatus(
m.WithExclusiveLock(
restoreJob,
id,
),
"restore",
false),
id,
), nil
}
......
......@@ -203,7 +203,7 @@ func TestManager_Backup(t *testing.T) {
},
}
queueSpec := &jobs.QueueSpec{
Workers: map[string]int{"backup": 2},
Concurrency: 2,
}
// Run the backup.
......
......@@ -68,7 +68,7 @@ func runResticTest(t *testing.T, tmpdir string, source *SourceSpec, restorePatte
},
}
queueSpec := &jobs.QueueSpec{
Workers: map[string]int{"backup": 2, "restore": 1},
Concurrency: 2,
}
sourceSpecs := []*SourceSpec{source}
......
---
queue_config:
workers:
backup: 2
concurrency: 2
repository:
name: main
type: restic
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment