Commit 98253349 authored by ale's avatar ale

Explicitly delegate retry logic to handlers

Makes it possible to retry requests for temporary HTTP errors (429,
500, etc).
parent 70c12b7a
Pipeline #1179 passed with stage
in 15 seconds
...@@ -81,11 +81,7 @@ func (h *warcSaveHandler) writeWARCRecord(typ, uri string, data []byte) error { ...@@ -81,11 +81,7 @@ func (h *warcSaveHandler) writeWARCRecord(typ, uri string, data []byte) error {
return w.Close() return w.Close()
} }
func (h *warcSaveHandler) Handle(c *crawl.Crawler, u string, depth int, resp *http.Response, err error) error { func (h *warcSaveHandler) Handle(c *crawl.Crawler, u string, depth int, resp *http.Response, _ error) error {
if err != nil {
return nil
}
// Read the response body (so we can save it to the WARC // Read the response body (so we can save it to the WARC
// output) and replace it with a buffer. // output) and replace it with a buffer.
data, derr := ioutil.ReadAll(resp.Body) data, derr := ioutil.ReadAll(resp.Body)
...@@ -113,7 +109,7 @@ func (h *warcSaveHandler) Handle(c *crawl.Crawler, u string, depth int, resp *ht ...@@ -113,7 +109,7 @@ func (h *warcSaveHandler) Handle(c *crawl.Crawler, u string, depth int, resp *ht
return werr return werr
} }
return extractLinks(c, u, depth, resp, err) return extractLinks(c, u, depth, resp, nil)
} }
func newWarcSaveHandler(w *warc.Writer) (crawl.Handler, error) { func newWarcSaveHandler(w *warc.Writer) (crawl.Handler, error) {
...@@ -240,7 +236,13 @@ func main() { ...@@ -240,7 +236,13 @@ func main() {
log.Fatal(err) log.Fatal(err)
} }
crawler, err := crawl.NewCrawler(*dbPath, seeds, scope, crawl.FetcherFunc(fetch), crawl.NewRedirectHandler(saver)) crawler, err := crawl.NewCrawler(
*dbPath,
seeds,
scope,
crawl.FetcherFunc(fetch),
crawl.HandleRetries(crawl.FollowRedirects(crawl.FilterErrors(saver))),
)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
......
...@@ -20,11 +20,7 @@ var ( ...@@ -20,11 +20,7 @@ var (
validSchemes = flag.String("schemes", "http,https", "comma-separated list of allowed protocols") validSchemes = flag.String("schemes", "http,https", "comma-separated list of allowed protocols")
) )
func extractLinks(c *crawl.Crawler, u string, depth int, resp *http.Response, err error) error { func extractLinks(c *crawl.Crawler, u string, depth int, resp *http.Response, _ error) error {
if err != nil {
return nil
}
links, err := analysis.GetLinks(resp) links, err := analysis.GetLinks(resp)
if err != nil { if err != nil {
// Not a fatal error, just a bad web page. // Not a fatal error, just a bad web page.
...@@ -50,7 +46,13 @@ func main() { ...@@ -50,7 +46,13 @@ func main() {
crawl.NewSeedScope(seeds), crawl.NewSeedScope(seeds),
) )
crawler, err := crawl.NewCrawler("crawldb", seeds, scope, crawl.FetcherFunc(http.Get), crawl.NewRedirectHandler(crawl.HandlerFunc(extractLinks))) crawler, err := crawl.NewCrawler(
"crawldb",
seeds,
scope,
crawl.FetcherFunc(http.Get),
crawl.HandleRetries(crawl.FollowRedirects(crawl.FilterErrors(crawl.HandlerFunc(extractLinks)))),
)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
......
...@@ -115,8 +115,9 @@ func (f FetcherFunc) Fetch(u string) (*http.Response, error) { ...@@ -115,8 +115,9 @@ func (f FetcherFunc) Fetch(u string) (*http.Response, error) {
} }
// A Handler processes crawled contents. Any errors returned by public // A Handler processes crawled contents. Any errors returned by public
// implementations of this interface are considered permanent and will // implementations of this interface are considered fatal and will
// not cause the URL to be fetched again. // cause the crawl to abort. The URL will be removed from the queue
// unless the handler returns the special error ErrRetryRequest.
type Handler interface { type Handler interface {
// Handle the response from a URL. // Handle the response from a URL.
Handle(*Crawler, string, int, *http.Response, error) error Handle(*Crawler, string, int, *http.Response, error) error
...@@ -130,6 +131,10 @@ func (f HandlerFunc) Handle(db *Crawler, u string, depth int, resp *http.Respons ...@@ -130,6 +131,10 @@ func (f HandlerFunc) Handle(db *Crawler, u string, depth int, resp *http.Respons
return f(db, u, depth, resp, err) return f(db, u, depth, resp, err)
} }
// ErrRetryRequest is returned by a Handler when the request should be
// retried after some time.
var ErrRetryRequest = errors.New("retry_request")
// The Crawler object contains the crawler state. // The Crawler object contains the crawler state.
type Crawler struct { type Crawler struct {
db *gobDB db *gobDB
...@@ -234,21 +239,22 @@ func (c *Crawler) urlHandler(queue <-chan queuePair) { ...@@ -234,21 +239,22 @@ func (c *Crawler) urlHandler(queue <-chan queuePair) {
// Invoke the handler (even if the fetcher errored // Invoke the handler (even if the fetcher errored
// out). Errors in handling requests are fatal, crawl // out). Errors in handling requests are fatal, crawl
// will be aborted. // will be aborted.
Must(c.handler.Handle(c, p.URL, p.Depth, httpResp, httpErr)) err := c.handler.Handle(c, p.URL, p.Depth, httpResp, httpErr)
// Write the result in our database.
wb := new(leveldb.Batch)
if httpErr == nil { if httpErr == nil {
respBody.Close() // nolint respBody.Close() // nolint
}
// Remove the URL from the queue if the fetcher was successful. wb := new(leveldb.Batch)
switch err {
case nil:
c.queue.Release(wb, p) c.queue.Release(wb, p)
} else { case ErrRetryRequest:
info.Error = httpErr.Error()
log.Printf("error retrieving %s: %v", p.URL, httpErr)
Must(c.queue.Retry(wb, p, errorRetryDelay)) Must(c.queue.Retry(wb, p, errorRetryDelay))
default:
log.Fatalf("fatal error in handling %s: %v", p.URL, err)
} }
// Write the result in our database.
Must(c.db.PutObjBatch(wb, urlkey, &info)) Must(c.db.PutObjBatch(wb, urlkey, &info))
Must(c.db.Write(wb, nil)) Must(c.db.Write(wb, nil))
} }
...@@ -327,37 +333,57 @@ func (c *Crawler) Close() { ...@@ -327,37 +333,57 @@ func (c *Crawler) Close() {
c.db.Close() // nolint c.db.Close() // nolint
} }
type redirectHandler struct { // FollowRedirects returns a Handler that follows HTTP redirects
h Handler // and adds them to the queue for crawling. It will call the wrapped
} // handler on all requests regardless.
func FollowRedirects(wrap Handler) Handler {
return HandlerFunc(func(c *Crawler, u string, depth int, resp *http.Response, err error) error {
if herr := wrap.Handle(c, u, depth, resp, err); herr != nil {
return herr
}
func (wrap *redirectHandler) Handle(c *Crawler, u string, depth int, resp *http.Response, err error) error { if err != nil {
if err != nil { return nil
return err }
}
if resp.StatusCode == 200 {
err = wrap.h.Handle(c, u, depth, resp, err)
} else if resp.StatusCode > 300 && resp.StatusCode < 400 {
location := resp.Header.Get("Location") location := resp.Header.Get("Location")
if location != "" { if resp.StatusCode >= 300 && resp.StatusCode < 400 && location != "" {
locationURL, uerr := resp.Request.URL.Parse(location) locationURL, uerr := resp.Request.URL.Parse(location)
if uerr != nil { if uerr != nil {
log.Printf("error parsing Location header: %v", uerr) log.Printf("error parsing Location header: %v", uerr)
} else { } else {
Must(c.Enqueue(Outlink{URL: locationURL, Tag: TagPrimary}, depth+1)) return c.Enqueue(Outlink{URL: locationURL, Tag: TagPrimary}, depth+1)
} }
} }
} else { return nil
err = errors.New(resp.Status) })
}
return err
} }
// NewRedirectHandler returns a Handler that follows HTTP redirects, // FilterErrors returns a Handler that forwards only requests with a
// and will call the wrapped handler on every request with HTTP status 200. // "successful" HTTP status code (anything < 400). When using this
func NewRedirectHandler(wrap Handler) Handler { // wrapper, subsequent Handle calls will always have err set to nil.
return &redirectHandler{wrap} func FilterErrors(wrap Handler) Handler {
return HandlerFunc(func(c *Crawler, u string, depth int, resp *http.Response, err error) error {
if err != nil {
return nil
}
if resp.StatusCode >= 400 {
return nil
}
return wrap.Handle(c, u, depth, resp, nil)
})
}
// HandleRetries returns a Handler that will retry requests on
// temporary errors (all transport-level errors are considered
// temporary, as well as any HTTP status code >= 500).
func HandleRetries(wrap Handler) Handler {
return HandlerFunc(func(c *Crawler, u string, depth int, resp *http.Response, err error) error {
if err != nil || resp.StatusCode == http.StatusTooManyRequests || resp.StatusCode >= 500 {
return ErrRetryRequest
}
return wrap.Handle(c, u, depth, resp, nil)
})
} }
// Must will abort the program with a message when we encounter an // Must will abort the program with a message when we encounter an
......
...@@ -44,7 +44,7 @@ func TestCrawler(t *testing.T) { ...@@ -44,7 +44,7 @@ func TestCrawler(t *testing.T) {
return nil return nil
}) })
crawler, err := NewCrawler(dir+"/crawl.db", seeds, scope, FetcherFunc(http.Get), NewRedirectHandler(h)) crawler, err := NewCrawler(dir+"/crawl.db", seeds, scope, FetcherFunc(http.Get), FollowRedirects(h))
if err != nil { if err != nil {
t.Fatal("NewCrawler", err) t.Fatal("NewCrawler", err)
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment