From c5ec7eb826bfd08aa6e8dd880efa15930f78ba19 Mon Sep 17 00:00:00 2001
From: ale <ale@incal.net>
Date: Wed, 2 Jan 2019 09:53:42 +0000
Subject: [PATCH] Add multi-file output

The output stage can now write to size-limited, rotating WARC files
using a user-specified pattern, so that output files are always
unique.
---
 README.md          |   9 +++
 cmd/crawl/crawl.go |  21 +++++--
 warc/multi.go      | 121 +++++++++++++++++++++++++++++++++++++++
 warc/warc.go       |  71 ++++++++++++++---------
 warc/warc_test.go  | 138 +++++++++++++++++++++++++++++++++++++++++++++
 5 files changed, 330 insertions(+), 30 deletions(-)
 create mode 100644 warc/multi.go
 create mode 100644 warc/warc_test.go

diff --git a/README.md b/README.md
index 38f7bc3..b4d28e5 100644
--- a/README.md
+++ b/README.md
@@ -56,6 +56,15 @@ avoid calendars, admin panels of common CMS applications, and other
 well-known pitfalls. This list is sourced from the
 [ArchiveBot](https://github.com/ArchiveTeam/ArchiveBot) project.
 
+If you're running a larger crawl, the tool can be told to rotate the
+output WARC files when they reach a certain size (100MB by default,
+controlled by the *--output-max-size* flag. To do so, make sure the
+*--output* option contains somewhere the literal token `%s`, which
+will be replaced by a unique identifier every time a new file is
+created, e.g.:
+
+    $ crawl --output=out-%s.warc.gz http://example.com/
+
 ## Limitations
 
 Like most crawlers, this one has a number of limitations:
diff --git a/cmd/crawl/crawl.go b/cmd/crawl/crawl.go
index d5e012a..2ebba98 100644
--- a/cmd/crawl/crawl.go
+++ b/cmd/crawl/crawl.go
@@ -34,7 +34,8 @@ var (
 	depth          = flag.Int("depth", 100, "maximum link depth")
 	validSchemes   = flag.String("schemes", "http,https", "comma-separated list of allowed protocols")
 	excludeRelated = flag.Bool("exclude-related", false, "include related resources (css, images, etc) only if their URL is in scope")
-	outputFile     = flag.String("output", "crawl.warc.gz", "output WARC file")
+	outputFile     = flag.String("output", "crawl.warc.gz", "output WARC file or pattern (patterns must include a \"%s\" literal token)")
+	warcFileSizeMB = flag.Int("output-max-size", 100, "maximum output WARC file size (in MB) when using patterns")
 	cpuprofile     = flag.String("cpuprofile", "", "create cpu profile")
 
 	excludes []*regexp.Regexp
@@ -63,7 +64,7 @@ type excludesFileFlag struct{}
 func (f *excludesFileFlag) String() string { return "" }
 
 func (f *excludesFileFlag) Set(s string) error {
-	ff, err := os.Open(s)
+	ff, err := os.Open(s) // #nosec
 	if err != nil {
 		return err
 	}
@@ -246,6 +247,19 @@ func (b *byteCounter) Read(buf []byte) (int, error) {
 	return n, err
 }
 
+func warcWriterFromFlags() (w *warc.Writer, err error) {
+	if strings.Contains(*outputFile, "%s") {
+		w, err = warc.NewMultiWriter(*outputFile, uint64(*warcFileSizeMB)*1024*1024)
+	} else {
+		var f *os.File
+		f, err = os.Create(*outputFile)
+		if err == nil {
+			w = warc.NewWriter(f)
+		}
+	}
+	return
+}
+
 func main() {
 	flag.Parse()
 
@@ -271,11 +285,10 @@ func main() {
 		scope = crawl.AND(crawl.OR(scope, crawl.NewIncludeRelatedScope()), crawl.NewRegexpIgnoreScope(excludes))
 	}
 
-	outf, err := os.Create(*outputFile)
+	w, err := warcWriterFromFlags()
 	if err != nil {
 		log.Fatal(err)
 	}
-	w := warc.NewWriter(outf)
 	defer w.Close() // nolint
 
 	saver, err := newWarcSaveHandler(w)
diff --git a/warc/multi.go b/warc/multi.go
new file mode 100644
index 0000000..a18ceb8
--- /dev/null
+++ b/warc/multi.go
@@ -0,0 +1,121 @@
+package warc
+
+import (
+	"bufio"
+	"encoding/binary"
+	"encoding/hex"
+	"fmt"
+	"io"
+	"os"
+	"sync/atomic"
+	"time"
+)
+
+func newID() string {
+	var b [8]byte
+	binary.BigEndian.PutUint64(b[:], uint64(time.Now().UnixNano()))
+	return hex.EncodeToString(b[:])
+}
+
+type meteredWriter struct {
+	io.WriteCloser
+	bytes uint64
+}
+
+func (m *meteredWriter) Write(b []byte) (int, error) {
+	n, err := m.WriteCloser.Write(b)
+	if n > 0 {
+		atomic.AddUint64(&m.bytes, uint64(n))
+	}
+	return n, err
+}
+
+func (m *meteredWriter) Bytes() uint64 {
+	return atomic.LoadUint64(&m.bytes)
+}
+
+type bufferedWriter struct {
+	*bufio.Writer
+	io.Closer
+}
+
+func newBufferedWriter(w io.WriteCloser) *bufferedWriter {
+	return &bufferedWriter{
+		Writer: bufio.NewWriter(w),
+		Closer: w,
+	}
+}
+
+func (w *bufferedWriter) Close() error {
+	if err := w.Writer.Flush(); err != nil {
+		return err
+	}
+	return w.Closer.Close()
+}
+
+func openFile(path string) (*meteredWriter, error) {
+	f, err := os.Create(path)
+	if err != nil {
+		return nil, err
+	}
+	return &meteredWriter{WriteCloser: newBufferedWriter(f)}, nil
+}
+
+// Unsafe for concurrent access.
+type multiWriter struct {
+	pattern string
+	maxSize uint64
+
+	cur *meteredWriter
+}
+
+func newMultiWriter(pattern string, maxSize uint64) rawWriter {
+	if maxSize == 0 {
+		maxSize = 100 * 1024 * 1024
+	}
+	return &multiWriter{
+		pattern: pattern,
+		maxSize: maxSize,
+	}
+}
+
+func (w *multiWriter) newFilename() string {
+	return fmt.Sprintf(w.pattern, newID())
+}
+
+func (w *multiWriter) NewRecord() (err error) {
+	if w.cur == nil || w.cur.Bytes() > w.maxSize {
+		if w.cur != nil {
+			if err = w.cur.Close(); err != nil {
+				return
+			}
+		}
+		w.cur, err = openFile(w.newFilename())
+	}
+	return
+}
+
+func (w *multiWriter) Write(b []byte) (int, error) {
+	return w.cur.Write(b)
+}
+
+func (w *multiWriter) Close() error {
+	return w.cur.Close()
+}
+
+type simpleWriter struct {
+	*bufferedWriter
+}
+
+func newSimpleWriter(w io.WriteCloser) rawWriter {
+	return &simpleWriter{newBufferedWriter(w)}
+}
+
+func (w *simpleWriter) NewRecord() error {
+	return nil
+}
+
+type rawWriter interface {
+	io.WriteCloser
+	NewRecord() error
+}
diff --git a/warc/warc.go b/warc/warc.go
index 1184d9c..a552085 100644
--- a/warc/warc.go
+++ b/warc/warc.go
@@ -3,9 +3,10 @@
 package warc
 
 import (
-	"bufio"
+	"errors"
 	"fmt"
 	"io"
+	"strings"
 	"time"
 
 	"compress/gzip"
@@ -57,7 +58,7 @@ func (h Header) Encode(w io.Writer) error {
 			return err
 		}
 	}
-	_, err := fmt.Fprintf(w, "\r\n")
+	_, err := io.WriteString(w, "\r\n")
 	return err
 }
 
@@ -74,20 +75,22 @@ func NewHeader() Header {
 // Writer can write records to a file in WARC format. It is safe
 // for concurrent access, since writes are serialized internally.
 type Writer struct {
-	writer    io.WriteCloser
-	bufwriter *bufio.Writer
-	gzwriter  *gzip.Writer
-	lockCh    chan bool
+	writer rawWriter
+	lockCh chan struct{}
 }
 
 type recordWriter struct {
-	io.Writer
-	lockCh chan bool
+	io.WriteCloser
+	lockCh chan struct{}
 }
 
 func (rw *recordWriter) Close() error {
 	// Add the end-of-record marker.
-	_, err := fmt.Fprintf(rw, "\r\n\r\n")
+	_, err := io.WriteString(rw, "\r\n\r\n")
+	if err != nil {
+		return err
+	}
+	err = rw.WriteCloser.Close()
 	<-rw.lockCh
 	return err
 }
@@ -98,41 +101,57 @@ func (rw *recordWriter) Close() error {
 // is satisfied. If this function returns an error, the state of the
 // Writer is invalid and it should no longer be used.
 func (w *Writer) NewRecord(hdr Header) (io.WriteCloser, error) {
-	w.lockCh <- true
-	if w.gzwriter != nil {
-		w.gzwriter.Close() // nolint
+	w.lockCh <- struct{}{}
+
+	if err := w.writer.NewRecord(); err != nil {
+		return nil, err
 	}
-	var err error
-	w.gzwriter, err = gzip.NewWriterLevel(w.bufwriter, gzip.BestCompression)
+
+	gzwriter, err := gzip.NewWriterLevel(w.writer, gzip.BestCompression)
 	if err != nil {
 		return nil, err
 	}
-	w.gzwriter.Header.Name = hdr.Get("WARC-Record-ID")
-	if err = hdr.Encode(w.gzwriter); err != nil {
+	gzwriter.Header.Name = hdr.Get("WARC-Record-ID")
+	if err = hdr.Encode(gzwriter); err != nil {
 		return nil, err
 	}
-	return &recordWriter{Writer: w.gzwriter, lockCh: w.lockCh}, nil
+	return &recordWriter{
+		WriteCloser: gzwriter,
+		lockCh:      w.lockCh,
+	}, nil
 }
 
 // Close the WARC writer and flush all buffers. This will also call
 // Close on the wrapped io.WriteCloser object.
 func (w *Writer) Close() error {
-	if err := w.gzwriter.Close(); err != nil {
-		return err
-	}
-	if err := w.bufwriter.Flush(); err != nil {
-		return err
-	}
+	w.lockCh <- struct{}{} // do not release
+	defer close(w.lockCh)  // pending NewRecord calls will panic?
 	return w.writer.Close()
 }
 
 // NewWriter initializes a new Writer and returns it.
 func NewWriter(w io.WriteCloser) *Writer {
 	return &Writer{
-		writer:    w,
-		bufwriter: bufio.NewWriter(w),
+		writer: newSimpleWriter(w),
 		// Buffering is important here since we're using this
 		// channel as a semaphore.
-		lockCh: make(chan bool, 1),
+		lockCh: make(chan struct{}, 1),
+	}
+}
+
+// NewMultiWriter initializes a new Writer that writes its output to
+// multiple files of limited size approximately equal to maxSize,
+// rotating them when necessary. The input path should contain a
+// literal '%s' token, which will be replaced with a (lexically
+// sortable) unique token.
+func NewMultiWriter(pattern string, maxSize uint64) (*Writer, error) {
+	if !strings.Contains(pattern, "%s") {
+		return nil, errors.New("input path is not a pattern")
 	}
+	return &Writer{
+		writer: newMultiWriter(pattern, maxSize),
+		// Buffering is important here since we're using this
+		// channel as a semaphore.
+		lockCh: make(chan struct{}, 1),
+	}, nil
 }
diff --git a/warc/warc_test.go b/warc/warc_test.go
new file mode 100644
index 0000000..5e2de2f
--- /dev/null
+++ b/warc/warc_test.go
@@ -0,0 +1,138 @@
+package warc
+
+import (
+	"fmt"
+	"io/ioutil"
+	"os"
+	"path/filepath"
+	"sync"
+	"testing"
+)
+
+var testData = []byte("this is some very interesting test data of non-zero size")
+
+func writeRecords(w *Writer, n int) error {
+	for i := 0; i < n; i++ {
+		hdr := NewHeader()
+		rec, err := w.NewRecord(hdr)
+		if err != nil {
+			return fmt.Errorf("NewRecord: %v", err)
+		}
+		_, err = rec.Write(testData)
+		if err != nil {
+			return fmt.Errorf("record Write: %v", err)
+		}
+		if err := rec.Close(); err != nil {
+			return fmt.Errorf("record Close: %v", err)
+		}
+	}
+	return nil
+}
+
+func writeManyRecords(t testing.TB, w *Writer, n int) {
+	if err := writeRecords(w, n); err != nil {
+		t.Fatal(err)
+	}
+}
+
+func writeManyRecordsConcurrently(t testing.TB, w *Writer, n, nproc int) {
+	startCh := make(chan struct{})
+	errCh := make(chan error, nproc+1)
+	var wg sync.WaitGroup
+
+	for i := 0; i < nproc; i++ {
+		wg.Add(1)
+		go func() {
+			defer wg.Done()
+			<-startCh
+			if err := writeRecords(w, n); err != nil {
+				errCh <- err
+			}
+		}()
+	}
+	go func() {
+		wg.Wait()
+		errCh <- nil
+	}()
+	close(startCh)
+	if err := <-errCh; err != nil {
+		t.Fatalf("a worker got an error: %v", err)
+	}
+}
+
+func TestWARC_WriteSingleFile(t *testing.T) {
+	dir, err := ioutil.TempDir("", "")
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer os.RemoveAll(dir)
+
+	f, err := os.Create(filepath.Join(dir, "out.warc.gz"))
+	if err != nil {
+		t.Fatal(err)
+	}
+	w := NewWriter(f)
+
+	writeManyRecords(t, w, 1000)
+	if err := w.Close(); err != nil {
+		t.Fatalf("Close: %v", err)
+	}
+}
+
+func TestWARC_WriteMulti(t *testing.T) {
+	dir, err := ioutil.TempDir("", "")
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer os.RemoveAll(dir)
+
+	var targetSize int64 = 10240
+	w, err := NewMultiWriter(filepath.Join(dir, "out.%s.warc.gz"), uint64(targetSize))
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	writeManyRecords(t, w, 1000)
+	if err := w.Close(); err != nil {
+		t.Fatalf("Close: %v", err)
+	}
+
+	files, _ := ioutil.ReadDir(dir)
+	if len(files) < 2 {
+		t.Fatalf("MultiWriter didn't create enough files (%d)", len(files))
+	}
+	for _, f := range files[:len(files)-1] {
+		if f.Size() < targetSize {
+			t.Errorf("output file %s is too small (%d bytes)", f.Name(), f.Size())
+		}
+	}
+}
+
+func TestWARC_WriteMulti_Concurrent(t *testing.T) {
+	dir, err := ioutil.TempDir("", "")
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer os.RemoveAll(dir)
+
+	var targetSize int64 = 100000
+	w, err := NewMultiWriter(filepath.Join(dir, "out.%s.warc.gz"), uint64(targetSize))
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	writeManyRecordsConcurrently(t, w, 1000, 10)
+	if err := w.Close(); err != nil {
+		t.Fatalf("Close: %v", err)
+	}
+
+	files, _ := ioutil.ReadDir(dir)
+	if len(files) < 2 {
+		t.Fatalf("MultiWriter didn't create enough files (%d)", len(files))
+	}
+	for _, f := range files[:len(files)-1] {
+		if f.Size() < targetSize {
+			t.Errorf("output file %s is too small (%d bytes)", f.Name(), f.Size())
+		}
+	}
+}
-- 
GitLab