Skip to content
Snippets Groups Projects
Commit 71ffebc0 authored by ale's avatar ale
Browse files

Merge branch 'concurrent-tools' into 'main'

Add concurrent processing to I/O-intensive tools

See merge request !22
parents 3c2cf1ef 524f7c5d
No related branches found
No related tags found
1 merge request!22Add concurrent processing to I/O-intensive tools
Pipeline #76935 passed
...@@ -14,6 +14,7 @@ import ( ...@@ -14,6 +14,7 @@ import (
var ( var (
target = flag.String("to", "", "target repository name") target = flag.String("to", "", "target repository name")
move = flag.Bool("move", false, "delete original files") move = flag.Bool("move", false, "delete original files")
concurrency = flag.Int("concurrency", 10, "concurrent workers")
docstring = ` docstring = `
...@@ -137,7 +138,7 @@ func main() { ...@@ -137,7 +138,7 @@ func main() {
} }
// Move files into the repository (that aren't already there). // Move files into the repository (that aren't already there).
err = coll.Each(func(obj model.Object) error { err = collection.ConcurrentEach(coll, *concurrency, func(obj model.Object) error {
return archive(coll, fs, repo, obj) return archive(coll, fs, repo, obj)
}) })
......
...@@ -18,6 +18,7 @@ var ( ...@@ -18,6 +18,7 @@ var (
target = flag.String("to", "", "target repository name") target = flag.String("to", "", "target repository name")
width = flag.Int("width", 100, "width") width = flag.Int("width", 100, "width")
height = flag.Int("height", 100, "height") height = flag.Int("height", 100, "height")
concurrency = flag.Int("concurrency", 10, "concurrent workers")
docstring = ` docstring = `
...@@ -103,7 +104,7 @@ func main() { ...@@ -103,7 +104,7 @@ func main() {
} }
// Move files into the repository (that aren't already there). // Move files into the repository (that aren't already there).
err = coll.Each(func(obj model.Object) error { err = collection.ConcurrentEach(coll, *concurrency, func(obj model.Object) error {
return mkThumbnail(coll, fs, repo, obj) return mkThumbnail(coll, fs, repo, obj)
}) })
......
...@@ -21,6 +21,7 @@ import ( ...@@ -21,6 +21,7 @@ import (
var ( var (
force = flag.Bool("force", false, "always request remote metadata") force = flag.Bool("force", false, "always request remote metadata")
concurrency = flag.Int("concurrency", 10, "concurrent workers")
enableOL = flag.Bool("enable-openlibrary", true, "enable the OpenLibrary metadata source") enableOL = flag.Bool("enable-openlibrary", true, "enable the OpenLibrary metadata source")
enableGB = flag.Bool("enable-googlebooks", true, "enable the Google Books metadata source") enableGB = flag.Bool("enable-googlebooks", true, "enable the Google Books metadata source")
enableSqlite = flag.String("enable-sqlite", "", "enable the generic SQLite source (argument is config file)") enableSqlite = flag.String("enable-sqlite", "", "enable the generic SQLite source (argument is config file)")
...@@ -128,7 +129,7 @@ func main() { ...@@ -128,7 +129,7 @@ func main() {
} }
// Refine all Book objects in the collection. // Refine all Book objects in the collection.
err = coll.Each(func(obj model.Object) error { err = collection.ConcurrentEach(coll, *concurrency, func(obj model.Object) error {
if obj.GetID().Class != model.ClassBook { if obj.GetID().Class != model.ClassBook {
return nil return nil
} }
......
package collection
import (
"io"
"sync"
"git.autistici.org/ale/ebooks/meta/model"
)
func ConcurrentEach(coll Collection, concurrency int, f func(model.Object) error) error {
ch := make(chan model.Object, concurrency)
errCh := make(chan error, 1)
defer close(errCh)
var wg sync.WaitGroup
for i := 0; i < concurrency; i++ {
wg.Add(1)
go func() {
for obj := range ch {
if err := f(obj); err != nil {
select {
case errCh <- err:
default:
}
}
}
wg.Done()
}()
}
stop := make(chan bool)
doneCh := make(chan error)
defer close(doneCh)
go func() {
err := coll.Each(func(obj model.Object) error {
select {
case <-stop:
return io.EOF
default:
}
ch <- obj
return nil
})
doneCh <- err
close(ch)
}()
var err error
select {
case err = <-doneCh:
case err = <-errCh:
}
close(stop)
wg.Wait()
return err
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment