Skip to content
Snippets Groups Projects
Commit c5ec7eb8 authored by ale's avatar ale
Browse files

Add multi-file output

The output stage can now write to size-limited, rotating WARC files
using a user-specified pattern, so that output files are always
unique.
parent 3518feaf
No related branches found
No related tags found
No related merge requests found
......@@ -56,6 +56,15 @@ avoid calendars, admin panels of common CMS applications, and other
well-known pitfalls. This list is sourced from the
[ArchiveBot](https://github.com/ArchiveTeam/ArchiveBot) project.
If you're running a larger crawl, the tool can be told to rotate the
output WARC files when they reach a certain size (100MB by default,
controlled by the *--output-max-size* flag. To do so, make sure the
*--output* option contains somewhere the literal token `%s`, which
will be replaced by a unique identifier every time a new file is
created, e.g.:
$ crawl --output=out-%s.warc.gz http://example.com/
## Limitations
Like most crawlers, this one has a number of limitations:
......
......@@ -34,7 +34,8 @@ var (
depth = flag.Int("depth", 100, "maximum link depth")
validSchemes = flag.String("schemes", "http,https", "comma-separated list of allowed protocols")
excludeRelated = flag.Bool("exclude-related", false, "include related resources (css, images, etc) only if their URL is in scope")
outputFile = flag.String("output", "crawl.warc.gz", "output WARC file")
outputFile = flag.String("output", "crawl.warc.gz", "output WARC file or pattern (patterns must include a \"%s\" literal token)")
warcFileSizeMB = flag.Int("output-max-size", 100, "maximum output WARC file size (in MB) when using patterns")
cpuprofile = flag.String("cpuprofile", "", "create cpu profile")
excludes []*regexp.Regexp
......@@ -63,7 +64,7 @@ type excludesFileFlag struct{}
func (f *excludesFileFlag) String() string { return "" }
func (f *excludesFileFlag) Set(s string) error {
ff, err := os.Open(s)
ff, err := os.Open(s) // #nosec
if err != nil {
return err
}
......@@ -246,6 +247,19 @@ func (b *byteCounter) Read(buf []byte) (int, error) {
return n, err
}
func warcWriterFromFlags() (w *warc.Writer, err error) {
if strings.Contains(*outputFile, "%s") {
w, err = warc.NewMultiWriter(*outputFile, uint64(*warcFileSizeMB)*1024*1024)
} else {
var f *os.File
f, err = os.Create(*outputFile)
if err == nil {
w = warc.NewWriter(f)
}
}
return
}
func main() {
flag.Parse()
......@@ -271,11 +285,10 @@ func main() {
scope = crawl.AND(crawl.OR(scope, crawl.NewIncludeRelatedScope()), crawl.NewRegexpIgnoreScope(excludes))
}
outf, err := os.Create(*outputFile)
w, err := warcWriterFromFlags()
if err != nil {
log.Fatal(err)
}
w := warc.NewWriter(outf)
defer w.Close() // nolint
saver, err := newWarcSaveHandler(w)
......
package warc
import (
"bufio"
"encoding/binary"
"encoding/hex"
"fmt"
"io"
"os"
"sync/atomic"
"time"
)
func newID() string {
var b [8]byte
binary.BigEndian.PutUint64(b[:], uint64(time.Now().UnixNano()))
return hex.EncodeToString(b[:])
}
type meteredWriter struct {
io.WriteCloser
bytes uint64
}
func (m *meteredWriter) Write(b []byte) (int, error) {
n, err := m.WriteCloser.Write(b)
if n > 0 {
atomic.AddUint64(&m.bytes, uint64(n))
}
return n, err
}
func (m *meteredWriter) Bytes() uint64 {
return atomic.LoadUint64(&m.bytes)
}
type bufferedWriter struct {
*bufio.Writer
io.Closer
}
func newBufferedWriter(w io.WriteCloser) *bufferedWriter {
return &bufferedWriter{
Writer: bufio.NewWriter(w),
Closer: w,
}
}
func (w *bufferedWriter) Close() error {
if err := w.Writer.Flush(); err != nil {
return err
}
return w.Closer.Close()
}
func openFile(path string) (*meteredWriter, error) {
f, err := os.Create(path)
if err != nil {
return nil, err
}
return &meteredWriter{WriteCloser: newBufferedWriter(f)}, nil
}
// Unsafe for concurrent access.
type multiWriter struct {
pattern string
maxSize uint64
cur *meteredWriter
}
func newMultiWriter(pattern string, maxSize uint64) rawWriter {
if maxSize == 0 {
maxSize = 100 * 1024 * 1024
}
return &multiWriter{
pattern: pattern,
maxSize: maxSize,
}
}
func (w *multiWriter) newFilename() string {
return fmt.Sprintf(w.pattern, newID())
}
func (w *multiWriter) NewRecord() (err error) {
if w.cur == nil || w.cur.Bytes() > w.maxSize {
if w.cur != nil {
if err = w.cur.Close(); err != nil {
return
}
}
w.cur, err = openFile(w.newFilename())
}
return
}
func (w *multiWriter) Write(b []byte) (int, error) {
return w.cur.Write(b)
}
func (w *multiWriter) Close() error {
return w.cur.Close()
}
type simpleWriter struct {
*bufferedWriter
}
func newSimpleWriter(w io.WriteCloser) rawWriter {
return &simpleWriter{newBufferedWriter(w)}
}
func (w *simpleWriter) NewRecord() error {
return nil
}
type rawWriter interface {
io.WriteCloser
NewRecord() error
}
......@@ -3,9 +3,10 @@
package warc
import (
"bufio"
"errors"
"fmt"
"io"
"strings"
"time"
"compress/gzip"
......@@ -57,7 +58,7 @@ func (h Header) Encode(w io.Writer) error {
return err
}
}
_, err := fmt.Fprintf(w, "\r\n")
_, err := io.WriteString(w, "\r\n")
return err
}
......@@ -74,20 +75,22 @@ func NewHeader() Header {
// Writer can write records to a file in WARC format. It is safe
// for concurrent access, since writes are serialized internally.
type Writer struct {
writer io.WriteCloser
bufwriter *bufio.Writer
gzwriter *gzip.Writer
lockCh chan bool
writer rawWriter
lockCh chan struct{}
}
type recordWriter struct {
io.Writer
lockCh chan bool
io.WriteCloser
lockCh chan struct{}
}
func (rw *recordWriter) Close() error {
// Add the end-of-record marker.
_, err := fmt.Fprintf(rw, "\r\n\r\n")
_, err := io.WriteString(rw, "\r\n\r\n")
if err != nil {
return err
}
err = rw.WriteCloser.Close()
<-rw.lockCh
return err
}
......@@ -98,41 +101,57 @@ func (rw *recordWriter) Close() error {
// is satisfied. If this function returns an error, the state of the
// Writer is invalid and it should no longer be used.
func (w *Writer) NewRecord(hdr Header) (io.WriteCloser, error) {
w.lockCh <- true
if w.gzwriter != nil {
w.gzwriter.Close() // nolint
w.lockCh <- struct{}{}
if err := w.writer.NewRecord(); err != nil {
return nil, err
}
var err error
w.gzwriter, err = gzip.NewWriterLevel(w.bufwriter, gzip.BestCompression)
gzwriter, err := gzip.NewWriterLevel(w.writer, gzip.BestCompression)
if err != nil {
return nil, err
}
w.gzwriter.Header.Name = hdr.Get("WARC-Record-ID")
if err = hdr.Encode(w.gzwriter); err != nil {
gzwriter.Header.Name = hdr.Get("WARC-Record-ID")
if err = hdr.Encode(gzwriter); err != nil {
return nil, err
}
return &recordWriter{Writer: w.gzwriter, lockCh: w.lockCh}, nil
return &recordWriter{
WriteCloser: gzwriter,
lockCh: w.lockCh,
}, nil
}
// Close the WARC writer and flush all buffers. This will also call
// Close on the wrapped io.WriteCloser object.
func (w *Writer) Close() error {
if err := w.gzwriter.Close(); err != nil {
return err
}
if err := w.bufwriter.Flush(); err != nil {
return err
}
w.lockCh <- struct{}{} // do not release
defer close(w.lockCh) // pending NewRecord calls will panic?
return w.writer.Close()
}
// NewWriter initializes a new Writer and returns it.
func NewWriter(w io.WriteCloser) *Writer {
return &Writer{
writer: w,
bufwriter: bufio.NewWriter(w),
writer: newSimpleWriter(w),
// Buffering is important here since we're using this
// channel as a semaphore.
lockCh: make(chan bool, 1),
lockCh: make(chan struct{}, 1),
}
}
// NewMultiWriter initializes a new Writer that writes its output to
// multiple files of limited size approximately equal to maxSize,
// rotating them when necessary. The input path should contain a
// literal '%s' token, which will be replaced with a (lexically
// sortable) unique token.
func NewMultiWriter(pattern string, maxSize uint64) (*Writer, error) {
if !strings.Contains(pattern, "%s") {
return nil, errors.New("input path is not a pattern")
}
return &Writer{
writer: newMultiWriter(pattern, maxSize),
// Buffering is important here since we're using this
// channel as a semaphore.
lockCh: make(chan struct{}, 1),
}, nil
}
package warc
import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"sync"
"testing"
)
var testData = []byte("this is some very interesting test data of non-zero size")
func writeRecords(w *Writer, n int) error {
for i := 0; i < n; i++ {
hdr := NewHeader()
rec, err := w.NewRecord(hdr)
if err != nil {
return fmt.Errorf("NewRecord: %v", err)
}
_, err = rec.Write(testData)
if err != nil {
return fmt.Errorf("record Write: %v", err)
}
if err := rec.Close(); err != nil {
return fmt.Errorf("record Close: %v", err)
}
}
return nil
}
func writeManyRecords(t testing.TB, w *Writer, n int) {
if err := writeRecords(w, n); err != nil {
t.Fatal(err)
}
}
func writeManyRecordsConcurrently(t testing.TB, w *Writer, n, nproc int) {
startCh := make(chan struct{})
errCh := make(chan error, nproc+1)
var wg sync.WaitGroup
for i := 0; i < nproc; i++ {
wg.Add(1)
go func() {
defer wg.Done()
<-startCh
if err := writeRecords(w, n); err != nil {
errCh <- err
}
}()
}
go func() {
wg.Wait()
errCh <- nil
}()
close(startCh)
if err := <-errCh; err != nil {
t.Fatalf("a worker got an error: %v", err)
}
}
func TestWARC_WriteSingleFile(t *testing.T) {
dir, err := ioutil.TempDir("", "")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(dir)
f, err := os.Create(filepath.Join(dir, "out.warc.gz"))
if err != nil {
t.Fatal(err)
}
w := NewWriter(f)
writeManyRecords(t, w, 1000)
if err := w.Close(); err != nil {
t.Fatalf("Close: %v", err)
}
}
func TestWARC_WriteMulti(t *testing.T) {
dir, err := ioutil.TempDir("", "")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(dir)
var targetSize int64 = 10240
w, err := NewMultiWriter(filepath.Join(dir, "out.%s.warc.gz"), uint64(targetSize))
if err != nil {
t.Fatal(err)
}
writeManyRecords(t, w, 1000)
if err := w.Close(); err != nil {
t.Fatalf("Close: %v", err)
}
files, _ := ioutil.ReadDir(dir)
if len(files) < 2 {
t.Fatalf("MultiWriter didn't create enough files (%d)", len(files))
}
for _, f := range files[:len(files)-1] {
if f.Size() < targetSize {
t.Errorf("output file %s is too small (%d bytes)", f.Name(), f.Size())
}
}
}
func TestWARC_WriteMulti_Concurrent(t *testing.T) {
dir, err := ioutil.TempDir("", "")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(dir)
var targetSize int64 = 100000
w, err := NewMultiWriter(filepath.Join(dir, "out.%s.warc.gz"), uint64(targetSize))
if err != nil {
t.Fatal(err)
}
writeManyRecordsConcurrently(t, w, 1000, 10)
if err := w.Close(); err != nil {
t.Fatalf("Close: %v", err)
}
files, _ := ioutil.ReadDir(dir)
if len(files) < 2 {
t.Fatalf("MultiWriter didn't create enough files (%d)", len(files))
}
for _, f := range files[:len(files)-1] {
if f.Size() < targetSize {
t.Errorf("output file %s is too small (%d bytes)", f.Name(), f.Size())
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment