Skip to content
Snippets Groups Projects
Commit 533f4725 authored by ale's avatar ale
Browse files

Propagate the link tag through redirects

In order to do this we have to plumb it through the queue and the
Handler interface, but it should allow fetches of the resources
associated with a page via the IncludeRelatedScope even if it's behind
a redirect.
parent fec78595
No related branches found
No related tags found
No related merge requests found
...@@ -33,7 +33,7 @@ var ( ...@@ -33,7 +33,7 @@ var (
concurrency = flag.Int("c", 10, "concurrent workers") concurrency = flag.Int("c", 10, "concurrent workers")
depth = flag.Int("depth", 100, "maximum link depth") depth = flag.Int("depth", 100, "maximum link depth")
validSchemes = flag.String("schemes", "http,https", "comma-separated list of allowed protocols") validSchemes = flag.String("schemes", "http,https", "comma-separated list of allowed protocols")
excludeRelated = flag.Bool("exclude-related", false, "include related resources (css, images, etc) only if their URL is in scope") excludeRelated = flag.Bool("exclude-related", false, "do not include related resources (css, images, etc) if their URL is not in scope")
outputFile = flag.String("output", "crawl.warc.gz", "output WARC file or pattern (patterns must include a \"%s\" literal token)") outputFile = flag.String("output", "crawl.warc.gz", "output WARC file or pattern (patterns must include a \"%s\" literal token)")
warcFileSizeMB = flag.Int("output-max-size", 100, "maximum output WARC file size (in MB) when using patterns") warcFileSizeMB = flag.Int("output-max-size", 100, "maximum output WARC file size (in MB) when using patterns")
cpuprofile = flag.String("cpuprofile", "", "create cpu profile") cpuprofile = flag.String("cpuprofile", "", "create cpu profile")
...@@ -127,7 +127,7 @@ func (h *warcSaveHandler) writeWARCRecord(typ, uri string, data []byte) error { ...@@ -127,7 +127,7 @@ func (h *warcSaveHandler) writeWARCRecord(typ, uri string, data []byte) error {
return w.Close() return w.Close()
} }
func (h *warcSaveHandler) Handle(p crawl.Publisher, u string, depth int, resp *http.Response, _ error) error { func (h *warcSaveHandler) Handle(p crawl.Publisher, u string, tag, depth int, resp *http.Response, _ error) error {
// Read the response body (so we can save it to the WARC // Read the response body (so we can save it to the WARC
// output) and replace it with a buffer. // output) and replace it with a buffer.
data, derr := ioutil.ReadAll(resp.Body) data, derr := ioutil.ReadAll(resp.Body)
......
...@@ -112,15 +112,15 @@ func (f FetcherFunc) Fetch(u string) (*http.Response, error) { ...@@ -112,15 +112,15 @@ func (f FetcherFunc) Fetch(u string) (*http.Response, error) {
// unless the handler returns the special error ErrRetryRequest. // unless the handler returns the special error ErrRetryRequest.
type Handler interface { type Handler interface {
// Handle the response from a URL. // Handle the response from a URL.
Handle(Publisher, string, int, *http.Response, error) error Handle(Publisher, string, int, int, *http.Response, error) error
} }
// HandlerFunc wraps a function into the Handler interface. // HandlerFunc wraps a function into the Handler interface.
type HandlerFunc func(Publisher, string, int, *http.Response, error) error type HandlerFunc func(Publisher, string, int, int, *http.Response, error) error
// Handle the response from a URL. // Handle the response from a URL.
func (f HandlerFunc) Handle(p Publisher, u string, depth int, resp *http.Response, err error) error { func (f HandlerFunc) Handle(p Publisher, u string, tag, depth int, resp *http.Response, err error) error {
return f(p, u, depth, resp, err) return f(p, u, tag, depth, resp, err)
} }
// ErrRetryRequest is returned by a Handler when the request should be // ErrRetryRequest is returned by a Handler when the request should be
...@@ -197,7 +197,7 @@ func (c *Crawler) Enqueue(link Outlink, depth int) error { ...@@ -197,7 +197,7 @@ func (c *Crawler) Enqueue(link Outlink, depth int) error {
// sure that subsequent calls to Enqueue with the same URL // sure that subsequent calls to Enqueue with the same URL
// will fail. // will fail.
wb := new(leveldb.Batch) wb := new(leveldb.Batch)
if err := c.queue.Add(wb, link.URL.String(), depth, time.Now()); err != nil { if err := c.queue.Add(wb, link.URL.String(), link.Tag, depth, time.Now()); err != nil {
return err return err
} }
c.setSeen(wb, link.URL) c.setSeen(wb, link.URL)
...@@ -250,7 +250,7 @@ func (c *Crawler) urlHandler(queue <-chan queuePair) { ...@@ -250,7 +250,7 @@ func (c *Crawler) urlHandler(queue <-chan queuePair) {
// Invoke the handler (even if the fetcher errored // Invoke the handler (even if the fetcher errored
// out). Errors in handling requests are fatal, crawl // out). Errors in handling requests are fatal, crawl
// will be aborted. // will be aborted.
err := c.handler.Handle(c, p.URL, p.Depth, httpResp, httpErr) err := c.handler.Handle(c, p.URL, p.Tag, p.Depth, httpResp, httpErr)
if httpErr == nil { if httpErr == nil {
respBody.Close() // nolint respBody.Close() // nolint
} }
...@@ -347,8 +347,8 @@ func (c *Crawler) Close() { ...@@ -347,8 +347,8 @@ func (c *Crawler) Close() {
// and adds them to the queue for crawling. It will call the wrapped // and adds them to the queue for crawling. It will call the wrapped
// handler on all requests regardless. // handler on all requests regardless.
func FollowRedirects(wrap Handler) Handler { func FollowRedirects(wrap Handler) Handler {
return HandlerFunc(func(p Publisher, u string, depth int, resp *http.Response, err error) error { return HandlerFunc(func(p Publisher, u string, tag, depth int, resp *http.Response, err error) error {
if herr := wrap.Handle(p, u, depth, resp, err); herr != nil { if herr := wrap.Handle(p, u, tag, depth, resp, err); herr != nil {
return herr return herr
} }
...@@ -362,7 +362,7 @@ func FollowRedirects(wrap Handler) Handler { ...@@ -362,7 +362,7 @@ func FollowRedirects(wrap Handler) Handler {
if uerr != nil { if uerr != nil {
log.Printf("error parsing Location header: %v", uerr) log.Printf("error parsing Location header: %v", uerr)
} else { } else {
return p.Enqueue(Outlink{URL: locationURL, Tag: TagPrimary}, depth+1) return p.Enqueue(Outlink{URL: locationURL, Tag: tag}, depth+1)
} }
} }
return nil return nil
...@@ -373,14 +373,14 @@ func FollowRedirects(wrap Handler) Handler { ...@@ -373,14 +373,14 @@ func FollowRedirects(wrap Handler) Handler {
// "successful" HTTP status code (anything < 400). When using this // "successful" HTTP status code (anything < 400). When using this
// wrapper, subsequent Handle calls will always have err set to nil. // wrapper, subsequent Handle calls will always have err set to nil.
func FilterErrors(wrap Handler) Handler { func FilterErrors(wrap Handler) Handler {
return HandlerFunc(func(p Publisher, u string, depth int, resp *http.Response, err error) error { return HandlerFunc(func(p Publisher, u string, tag, depth int, resp *http.Response, err error) error {
if err != nil { if err != nil {
return nil return nil
} }
if resp.StatusCode >= 400 { if resp.StatusCode >= 400 {
return nil return nil
} }
return wrap.Handle(p, u, depth, resp, nil) return wrap.Handle(p, u, tag, depth, resp, nil)
}) })
} }
...@@ -388,11 +388,11 @@ func FilterErrors(wrap Handler) Handler { ...@@ -388,11 +388,11 @@ func FilterErrors(wrap Handler) Handler {
// temporary errors (all transport-level errors are considered // temporary errors (all transport-level errors are considered
// temporary, as well as any HTTP status code >= 500). // temporary, as well as any HTTP status code >= 500).
func HandleRetries(wrap Handler) Handler { func HandleRetries(wrap Handler) Handler {
return HandlerFunc(func(p Publisher, u string, depth int, resp *http.Response, err error) error { return HandlerFunc(func(p Publisher, u string, tag, depth int, resp *http.Response, err error) error {
if err != nil || resp.StatusCode == http.StatusTooManyRequests || resp.StatusCode >= 500 { if err != nil || resp.StatusCode == http.StatusTooManyRequests || resp.StatusCode >= 500 {
return ErrRetryRequest return ErrRetryRequest
} }
return wrap.Handle(p, u, depth, resp, nil) return wrap.Handle(p, u, tag, depth, resp, nil)
}) })
} }
......
...@@ -33,7 +33,7 @@ func TestCrawler(t *testing.T) { ...@@ -33,7 +33,7 @@ func TestCrawler(t *testing.T) {
) )
var crawledPages int var crawledPages int
h := HandlerFunc(func(p Publisher, u string, depth int, resp *http.Response, err error) error { h := HandlerFunc(func(p Publisher, u string, tag, depth int, resp *http.Response, err error) error {
crawledPages++ crawledPages++
next := fmt.Sprintf(srv.URL+"/page/%d", crawledPages) next := fmt.Sprintf(srv.URL+"/page/%d", crawledPages)
log.Printf("%s -> %s", u, next) log.Printf("%s -> %s", u, next)
......
...@@ -28,6 +28,7 @@ type queuePair struct { ...@@ -28,6 +28,7 @@ type queuePair struct {
URL string URL string
Depth int Depth int
Tag int
} }
// Scan the pending queue and send items on 'ch'. Returns an error // Scan the pending queue and send items on 'ch'. Returns an error
...@@ -58,10 +59,10 @@ func (q *queue) Scan(ch chan<- queuePair) error { ...@@ -58,10 +59,10 @@ func (q *queue) Scan(ch chan<- queuePair) error {
} }
// Add an item to the pending work queue. // Add an item to the pending work queue.
func (q *queue) Add(wb *leveldb.Batch, urlStr string, depth int, when time.Time) error { func (q *queue) Add(wb *leveldb.Batch, urlStr string, tag, depth int, when time.Time) error {
t := uint64(when.UnixNano()) t := uint64(when.UnixNano())
qkey := bytes.Join([][]byte{queuePrefix, encodeUint64(t), encodeUint64(uint64(rand.Int63()))}, queueKeySep) qkey := bytes.Join([][]byte{queuePrefix, encodeUint64(t), encodeUint64(uint64(rand.Int63()))}, queueKeySep)
return q.db.PutObjBatch(wb, qkey, &queuePair{URL: urlStr, Depth: depth}) return q.db.PutObjBatch(wb, qkey, &queuePair{URL: urlStr, Tag: tag, Depth: depth})
} }
func (q *queue) acquire(qp queuePair) error { func (q *queue) acquire(qp queuePair) error {
...@@ -87,7 +88,7 @@ func (q *queue) Release(wb *leveldb.Batch, qp queuePair) { ...@@ -87,7 +88,7 @@ func (q *queue) Release(wb *leveldb.Batch, qp queuePair) {
// Retry processing this item at a later time. // Retry processing this item at a later time.
func (q *queue) Retry(wb *leveldb.Batch, qp queuePair, delay time.Duration) error { func (q *queue) Retry(wb *leveldb.Batch, qp queuePair, delay time.Duration) error {
wb.Delete(activeQueueKey(qp.key)) wb.Delete(activeQueueKey(qp.key))
if err := q.Add(wb, qp.URL, qp.Depth, time.Now().Add(delay)); err != nil { if err := q.Add(wb, qp.URL, qp.Tag, qp.Depth, time.Now().Add(delay)); err != nil {
return err return err
} }
atomic.AddInt32(&q.numActive, -1) atomic.AddInt32(&q.numActive, -1)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment