Commit 8bd8f483 authored by ale's avatar ale

Limit job execution concurrency

parent d1f5de8d
Pipeline #3546 passed with stages
in 2 minutes and 31 seconds
......@@ -212,7 +212,9 @@ type QueueSpec struct {
// NewQueueManager returns a new QueueManager with the provided
// configuration.
func NewQueueManager(spec *QueueSpec) *QueueManager {
q := make(map[string]chan struct{})
q := map[string]chan struct{}{
"default": make(chan struct{}, 2),
}
for name, n := range spec.Workers {
q[name] = make(chan struct{}, n)
}
......
......@@ -125,14 +125,16 @@ func (m *tabaccoManager) makeBackupJob(ctx context.Context, rctx RuntimeContext,
// The actual backup operation. Assemble all the backup jobs
// for the datasets in 'dsl', and add them to an AsyncGroup.
//
// TODO: get the timeout from the SourceSpec.
// Limit concurrency in the AsyncGroup.
var backupJobs []jobs.Job
for _, ds := range dsl {
backupJobs = append(backupJobs, m.withMetadata(
h.BackupJob(rctx, backup, ds),
backup,
ds,
backupJobs = append(backupJobs, m.WithQueue(
m.withMetadata(
h.BackupJob(rctx, backup, ds),
backup,
ds,
),
"dataset",
))
}
......@@ -166,7 +168,7 @@ func (m *tabaccoManager) makeBackupJob(ctx context.Context, rctx RuntimeContext,
false),
id,
),
"backup",
"source",
), nil
}
......@@ -209,7 +211,10 @@ func (m *tabaccoManager) makeRestoreJob(rctx RuntimeContext, backup *Backup, src
for _, ds := range dsl {
restoreJobs = append(
restoreJobs,
h.RestoreJob(rctx, backup, ds, target),
m.WithQueue(
h.RestoreJob(rctx, backup, ds, target),
"dataset",
),
)
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment