diff --git a/jobs/job.go b/jobs/job.go index e18fe8fc0722461f8f504d375eeefaa202ea063e..646727fb10a4bcf7d4f5102b69bd12a26a185a41 100644 --- a/jobs/job.go +++ b/jobs/job.go @@ -212,7 +212,9 @@ type QueueSpec struct { // NewQueueManager returns a new QueueManager with the provided // configuration. func NewQueueManager(spec *QueueSpec) *QueueManager { - q := make(map[string]chan struct{}) + q := map[string]chan struct{}{ + "default": make(chan struct{}, 2), + } for name, n := range spec.Workers { q[name] = make(chan struct{}, n) } diff --git a/manager.go b/manager.go index 7d89dc24b38a733728d57d999e0331a1903f33de..d0db333066666a6674817cf4e9c966b27d8e1d05 100644 --- a/manager.go +++ b/manager.go @@ -125,14 +125,16 @@ func (m *tabaccoManager) makeBackupJob(ctx context.Context, rctx RuntimeContext, // The actual backup operation. Assemble all the backup jobs // for the datasets in 'dsl', and add them to an AsyncGroup. - // - // TODO: get the timeout from the SourceSpec. + // Limit concurrency in the AsyncGroup. var backupJobs []jobs.Job for _, ds := range dsl { - backupJobs = append(backupJobs, m.withMetadata( - h.BackupJob(rctx, backup, ds), - backup, - ds, + backupJobs = append(backupJobs, m.WithQueue( + m.withMetadata( + h.BackupJob(rctx, backup, ds), + backup, + ds, + ), + "dataset", )) } @@ -166,7 +168,7 @@ func (m *tabaccoManager) makeBackupJob(ctx context.Context, rctx RuntimeContext, false), id, ), - "backup", + "source", ), nil } @@ -209,7 +211,10 @@ func (m *tabaccoManager) makeRestoreJob(rctx RuntimeContext, backup *Backup, src for _, ds := range dsl { restoreJobs = append( restoreJobs, - h.RestoreJob(rctx, backup, ds, target), + m.WithQueue( + h.RestoreJob(rctx, backup, ds, target), + "dataset", + ), ) }