diff --git a/go.mod b/go.mod index b9e42e0014e2f82e8f42c24a8f1dd7096831cfd8..a45d1b855fb50dec40370c8082aeabe263c6f282 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/prometheus/client_golang v1.11.0 github.com/syndtr/goleveldb v1.0.0 golang.org/x/net v0.0.0-20210420072503-d25e30425868 // indirect - golang.org/x/sync v0.0.0-20210220032951-036812b2e83c + golang.org/x/sync v0.0.0-20220513210516-0976fa681c29 google.golang.org/genproto v0.0.0-20210416161957-9910b6c460de // indirect google.golang.org/grpc v1.38.0 google.golang.org/protobuf v1.26.0 diff --git a/go.sum b/go.sum index 07067a72acb4b327e6a4d94cb81b4c1b6e1e6965..07214cb4b55afac54e684a779afd35ce2f749718 100644 --- a/go.sum +++ b/go.sum @@ -622,6 +622,8 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220513210516-0976fa681c29 h1:w8s32wxx3sY+OjLlv9qltkLU5yvJzxjjgiHWLjdIcw4= +golang.org/x/sync v0.0.0-20220513210516-0976fa681c29/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180224232135-f6cff0780e54/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/vendor/golang.org/x/sync/errgroup/errgroup.go b/vendor/golang.org/x/sync/errgroup/errgroup.go index 9857fe53d3c91c527ee9b363df2c6d3ed8c52576..1eab2fdf9336403c4cefd414d4ab0427e4ac1d5f 100644 --- a/vendor/golang.org/x/sync/errgroup/errgroup.go +++ b/vendor/golang.org/x/sync/errgroup/errgroup.go @@ -8,9 +8,12 @@ package errgroup import ( "context" + "fmt" "sync" ) +type token struct{} + // A Group is a collection of goroutines working on subtasks that are part of // the same overall task. // @@ -20,10 +23,19 @@ type Group struct { wg sync.WaitGroup + sem chan token + errOnce sync.Once err error } +func (g *Group) done() { + if g.sem != nil { + <-g.sem + } + g.wg.Done() +} + // WithContext returns a new Group and an associated Context derived from ctx. // // The derived Context is canceled the first time a function passed to Go @@ -45,14 +57,48 @@ func (g *Group) Wait() error { } // Go calls the given function in a new goroutine. +// It blocks until the new goroutine can be added without the number of +// active goroutines in the group exceeding the configured limit. // // The first call to return a non-nil error cancels the group; its error will be // returned by Wait. func (g *Group) Go(f func() error) { + if g.sem != nil { + g.sem <- token{} + } + g.wg.Add(1) + go func() { + defer g.done() + + if err := f(); err != nil { + g.errOnce.Do(func() { + g.err = err + if g.cancel != nil { + g.cancel() + } + }) + } + }() +} + +// TryGo calls the given function in a new goroutine only if the number of +// active goroutines in the group is currently below the configured limit. +// +// The return value reports whether the goroutine was started. +func (g *Group) TryGo(f func() error) bool { + if g.sem != nil { + select { + case g.sem <- token{}: + // Note: this allows barging iff channels in general allow barging. + default: + return false + } + } + g.wg.Add(1) go func() { - defer g.wg.Done() + defer g.done() if err := f(); err != nil { g.errOnce.Do(func() { @@ -63,4 +109,23 @@ func (g *Group) Go(f func() error) { }) } }() + return true +} + +// SetLimit limits the number of active goroutines in this group to at most n. +// A negative value indicates no limit. +// +// Any subsequent call to the Go method will block until it can add an active +// goroutine without exceeding the configured limit. +// +// The limit must not be modified while any goroutines in the group are active. +func (g *Group) SetLimit(n int) { + if n < 0 { + g.sem = nil + return + } + if len(g.sem) != 0 { + panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem))) + } + g.sem = make(chan token, n) } diff --git a/vendor/modules.txt b/vendor/modules.txt index d27039d7f77a5b02dee3149f3b6296eb4bf15b5d..292ec71835e09cb0b13f1a45f7a9402a9c1602fe 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -95,7 +95,7 @@ golang.org/x/net/http2/hpack golang.org/x/net/idna golang.org/x/net/internal/timeseries golang.org/x/net/trace -# golang.org/x/sync v0.0.0-20210220032951-036812b2e83c +# golang.org/x/sync v0.0.0-20220513210516-0976fa681c29 ## explicit golang.org/x/sync/errgroup # golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40