Skip to content
Snippets Groups Projects
Commit d4c561c2 authored by ale's avatar ale
Browse files

move the WARC code into its own package

Now generates well-formed, indexable WARC files.
parent b09f05f8
Branches
No related tags found
No related merge requests found
...@@ -17,6 +17,7 @@ import ( ...@@ -17,6 +17,7 @@ import (
"strings" "strings"
"git.autistici.org/ale/crawl" "git.autistici.org/ale/crawl"
"git.autistici.org/ale/crawl/warc"
"github.com/PuerkitoBio/goquery" "github.com/PuerkitoBio/goquery"
) )
...@@ -94,7 +95,7 @@ func hdr2str(h http.Header) []byte { ...@@ -94,7 +95,7 @@ func hdr2str(h http.Header) []byte {
} }
type warcSaveHandler struct { type warcSaveHandler struct {
warc *crawl.WarcWriter warc *warc.Writer
warcInfoID string warcInfoID string
} }
...@@ -108,7 +109,7 @@ func (h *warcSaveHandler) Handle(c *crawl.Crawler, u string, depth int, resp *ht ...@@ -108,7 +109,7 @@ func (h *warcSaveHandler) Handle(c *crawl.Crawler, u string, depth int, resp *ht
// Dump the request. // Dump the request.
var b bytes.Buffer var b bytes.Buffer
resp.Request.Write(&b) resp.Request.Write(&b)
hdr := crawl.NewWarcHeader() hdr := warc.NewHeader()
hdr.Set("WARC-Type", "request") hdr.Set("WARC-Type", "request")
hdr.Set("WARC-Target-URI", resp.Request.URL.String()) hdr.Set("WARC-Target-URI", resp.Request.URL.String())
hdr.Set("WARC-Warcinfo-ID", h.warcInfoID) hdr.Set("WARC-Warcinfo-ID", h.warcInfoID)
...@@ -122,7 +123,7 @@ func (h *warcSaveHandler) Handle(c *crawl.Crawler, u string, depth int, resp *ht ...@@ -122,7 +123,7 @@ func (h *warcSaveHandler) Handle(c *crawl.Crawler, u string, depth int, resp *ht
respPayload := bytes.Join([][]byte{ respPayload := bytes.Join([][]byte{
[]byte(statusLine), hdr2str(resp.Header), data}, []byte(statusLine), hdr2str(resp.Header), data},
[]byte{'\r', '\n'}) []byte{'\r', '\n'})
hdr = crawl.NewWarcHeader() hdr = warc.NewHeader()
hdr.Set("WARC-Type", "response") hdr.Set("WARC-Type", "response")
hdr.Set("WARC-Target-URI", resp.Request.URL.String()) hdr.Set("WARC-Target-URI", resp.Request.URL.String())
hdr.Set("WARC-Warcinfo-ID", h.warcInfoID) hdr.Set("WARC-Warcinfo-ID", h.warcInfoID)
...@@ -134,14 +135,14 @@ func (h *warcSaveHandler) Handle(c *crawl.Crawler, u string, depth int, resp *ht ...@@ -134,14 +135,14 @@ func (h *warcSaveHandler) Handle(c *crawl.Crawler, u string, depth int, resp *ht
return extractLinks(c, u, depth, resp, err) return extractLinks(c, u, depth, resp, err)
} }
func NewSaveHandler(w *crawl.WarcWriter) crawl.Handler { func NewSaveHandler(w *warc.Writer) crawl.Handler {
info := strings.Join([]string{ info := strings.Join([]string{
"Software: crawl/1.0\r\n", "Software: crawl/1.0\r\n",
"Format: WARC File Format 1.0\r\n", "Format: WARC File Format 1.0\r\n",
"Conformsto: http://bibnum.bnf.fr/WARC/WARC_ISO_28500_version1_latestdraft.pdf\r\n", "Conformsto: http://bibnum.bnf.fr/WARC/WARC_ISO_28500_version1_latestdraft.pdf\r\n",
}, "") }, "")
hdr := crawl.NewWarcHeader() hdr := warc.NewHeader()
hdr.Set("WARC-Type", "warcinfo") hdr.Set("WARC-Type", "warcinfo")
hdr.Set("WARC-Warcinfo-ID", hdr.Get("WARC-Record-ID")) hdr.Set("WARC-Warcinfo-ID", hdr.Get("WARC-Record-ID"))
hdr.Set("Content-Length", strconv.Itoa(len(info))) hdr.Set("Content-Length", strconv.Itoa(len(info)))
...@@ -165,7 +166,7 @@ func main() { ...@@ -165,7 +166,7 @@ func main() {
seeds := crawl.MustParseURLs(flag.Args()) seeds := crawl.MustParseURLs(flag.Args())
scope := crawl.NewSeedScope(seeds, *depth, strings.Split(*validSchemes, ",")) scope := crawl.NewSeedScope(seeds, *depth, strings.Split(*validSchemes, ","))
w := crawl.NewWarcWriter(outf) w := warc.NewWriter(outf)
defer w.Close() defer w.Close()
saver := NewSaveHandler(w) saver := NewSaveHandler(w)
...@@ -174,5 +175,5 @@ func main() { ...@@ -174,5 +175,5 @@ func main() {
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
crawler.Run() crawler.Run(*concurrency)
} }
...@@ -71,5 +71,5 @@ func main() { ...@@ -71,5 +71,5 @@ func main() {
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
crawler.Run() crawler.Run(*concurrency)
} }
...@@ -272,8 +272,9 @@ func NewCrawler(path string, seeds []*url.URL, scope Scope, f Fetcher, h Handler ...@@ -272,8 +272,9 @@ func NewCrawler(path string, seeds []*url.URL, scope Scope, f Fetcher, h Handler
return c, nil return c, nil
} }
// Run the crawl, does not exit until it is done. // Run the crawl with the specified number of workers. This function
func (c *Crawler) Run() { // does not exit until all work is done (no URLs left in the queue).
func (c *Crawler) Run(concurrency int) {
// Load initial seeds into the queue. // Load initial seeds into the queue.
for _, u := range c.seeds { for _, u := range c.seeds {
c.Enqueue(u, 0) c.Enqueue(u, 0)
...@@ -282,7 +283,7 @@ func (c *Crawler) Run() { ...@@ -282,7 +283,7 @@ func (c *Crawler) Run() {
// Start some runners and wait until they're done. // Start some runners and wait until they're done.
var wg sync.WaitGroup var wg sync.WaitGroup
ch := c.process() ch := c.process()
for i := 0; i < 3; i++ { for i := 0; i < concurrency; i++ {
wg.Add(1) wg.Add(1)
go func() { go func() {
c.urlHandler(ch) c.urlHandler(ch)
......
package crawl // Package to write WARC files.
package warc
import ( import (
"fmt" "fmt"
...@@ -21,12 +23,12 @@ var ( ...@@ -21,12 +23,12 @@ var (
} }
) )
// A Warc header. Header field names are case-sensitive. // A WARC header. Header field names are case-sensitive.
type WarcHeader map[string]string type Header map[string]string
// Set a header to the specified value. Multiple values are not // Set a header to the specified value. Multiple values are not
// supported. // supported.
func (h WarcHeader) Set(key, value string) { func (h Header) Set(key, value string) {
h[key] = value h[key] = value
// Keep Content-Type in sync with WARC-Type. // Keep Content-Type in sync with WARC-Type.
...@@ -40,12 +42,12 @@ func (h WarcHeader) Set(key, value string) { ...@@ -40,12 +42,12 @@ func (h WarcHeader) Set(key, value string) {
} }
// Get the value of a header. If not found, returns an empty string. // Get the value of a header. If not found, returns an empty string.
func (h WarcHeader) Get(key string) string { func (h Header) Get(key string) string {
return h[key] return h[key]
} }
// Encode the header to a Writer. // Encode the header to a Writer.
func (h WarcHeader) Encode(w io.Writer) { func (h Header) Encode(w io.Writer) {
fmt.Fprintf(w, "%s\r\n", warcVersion) fmt.Fprintf(w, "%s\r\n", warcVersion)
for hdr, value := range h { for hdr, value := range h {
fmt.Fprintf(w, "%s: %s\r\n", hdr, value) fmt.Fprintf(w, "%s: %s\r\n", hdr, value)
...@@ -53,44 +55,68 @@ func (h WarcHeader) Encode(w io.Writer) { ...@@ -53,44 +55,68 @@ func (h WarcHeader) Encode(w io.Writer) {
fmt.Fprintf(w, "\r\n") fmt.Fprintf(w, "\r\n")
} }
// NewWarcHeader returns a WarcHeader with its own unique ID and the // NewHeader returns a Header with its own unique ID and the
// current timestamp. // current timestamp.
func NewWarcHeader() WarcHeader { func NewHeader() Header {
h := make(WarcHeader) h := make(Header)
h.Set("WARC-Record-ID", fmt.Sprintf("<%s>", uuid.NewUUID().URN())) h.Set("WARC-Record-ID", fmt.Sprintf("<%s>", uuid.NewUUID().URN()))
h.Set("WARC-Date", time.Now().Format(warcTimeFmt)) h.Set("WARC-Date", time.Now().Format(warcTimeFmt))
h.Set("Content-Type", "application/octet-stream") h.Set("Content-Type", "application/octet-stream")
return h return h
} }
// WarcWriter can write records to a file in WARC format. // Writer can write records to a file in WARC format. It is safe
type WarcWriter struct { // for concurrent access, since writes are serialized internally.
writer io.WriteCloser type Writer struct {
writer io.WriteCloser
gzwriter *gzip.Writer
lockCh chan bool
} }
type recordWriter struct { type recordWriter struct {
io.Writer io.Writer
lockCh chan bool
} }
func (rw *recordWriter) Close() error { func (rw *recordWriter) Close() error {
// Add the end-of-record marker. // Add the end-of-record marker.
fmt.Fprintf(rw, "\r\n\r\n") fmt.Fprintf(rw, "\r\n\r\n")
<-rw.lockCh
return nil return nil
} }
// NewRecord starts a new WARC record with the provided header. The // NewRecord starts a new WARC record with the provided header. The
// caller must call Close on the returned writer before creating the // caller must call Close on the returned writer before creating the
// next record. // next record. Note that this function may block until that condition
func (w *WarcWriter) NewRecord(hdr WarcHeader) io.WriteCloser { // is satisfied.
hdr.Encode(w.writer) func (w *Writer) NewRecord(hdr Header) io.WriteCloser {
return &recordWriter{w.writer} w.lockCh <- true
if w.gzwriter != nil {
w.gzwriter.Close()
}
w.gzwriter, _ = gzip.NewWriterLevel(w.writer, gzip.BestCompression)
w.gzwriter.Header.Name = hdr.Get("WARC-Record-ID")
hdr.Encode(w.gzwriter)
return &recordWriter{Writer: w.gzwriter, lockCh: w.lockCh}
} }
// Close the WARC writer and flush all buffers. // Close the WARC writer and flush all buffers. This will also call
func (w *WarcWriter) Close() error { // Close on the wrapped io.WriteCloser object.
func (w *Writer) Close() error {
if err := w.gzwriter.Close(); err != nil {
return err
}
return w.writer.Close() return w.writer.Close()
} }
func NewWarcWriter(w io.WriteCloser) *WarcWriter { // NewWriter initializes a new Writer and returns it.
return &WarcWriter{gzip.NewWriter(w)} func NewWriter(w io.WriteCloser) *Writer {
return &Writer{
writer: w,
// Buffering is important here since we're using this
// channel as a semaphore.
lockCh: make(chan bool, 1),
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment