Commit 64eb5fb2 authored by ale's avatar ale

Refactor Handlers in terms of a Publisher interface

Introduce an interface to decouple the Enqueue functionality from the
Crawler implementation.
parent cce28f44
Pipeline #2088 passed with stage
in 22 seconds
...@@ -82,7 +82,7 @@ func (f *excludesFileFlag) Set(s string) error { ...@@ -82,7 +82,7 @@ func (f *excludesFileFlag) Set(s string) error {
return nil return nil
} }
func extractLinks(c *crawl.Crawler, u string, depth int, resp *http.Response, _ error) error { func extractLinks(p crawl.Publisher, u string, depth int, resp *http.Response, _ error) error {
links, err := analysis.GetLinks(resp) links, err := analysis.GetLinks(resp)
if err != nil { if err != nil {
// This is not a fatal error, just a bad web page. // This is not a fatal error, just a bad web page.
...@@ -90,7 +90,7 @@ func extractLinks(c *crawl.Crawler, u string, depth int, resp *http.Response, _ ...@@ -90,7 +90,7 @@ func extractLinks(c *crawl.Crawler, u string, depth int, resp *http.Response, _
} }
for _, link := range links { for _, link := range links {
if err := c.Enqueue(link, depth+1); err != nil { if err := p.Enqueue(link, depth+1); err != nil {
return err return err
} }
} }
...@@ -127,7 +127,7 @@ func (h *warcSaveHandler) writeWARCRecord(typ, uri string, data []byte) error { ...@@ -127,7 +127,7 @@ func (h *warcSaveHandler) writeWARCRecord(typ, uri string, data []byte) error {
return w.Close() return w.Close()
} }
func (h *warcSaveHandler) Handle(c *crawl.Crawler, u string, depth int, resp *http.Response, _ error) error { func (h *warcSaveHandler) Handle(p crawl.Publisher, u string, depth int, resp *http.Response, _ error) error {
// Read the response body (so we can save it to the WARC // Read the response body (so we can save it to the WARC
// output) and replace it with a buffer. // output) and replace it with a buffer.
data, derr := ioutil.ReadAll(resp.Body) data, derr := ioutil.ReadAll(resp.Body)
...@@ -157,7 +157,7 @@ func (h *warcSaveHandler) Handle(c *crawl.Crawler, u string, depth int, resp *ht ...@@ -157,7 +157,7 @@ func (h *warcSaveHandler) Handle(c *crawl.Crawler, u string, depth int, resp *ht
h.numWritten++ h.numWritten++
return extractLinks(c, u, depth, resp, nil) return extractLinks(p, u, depth, resp, nil)
} }
func newWarcSaveHandler(w *warc.Writer) (crawl.Handler, error) { func newWarcSaveHandler(w *warc.Writer) (crawl.Handler, error) {
......
...@@ -20,7 +20,7 @@ var ( ...@@ -20,7 +20,7 @@ var (
validSchemes = flag.String("schemes", "http,https", "comma-separated list of allowed protocols") validSchemes = flag.String("schemes", "http,https", "comma-separated list of allowed protocols")
) )
func extractLinks(c *crawl.Crawler, u string, depth int, resp *http.Response, _ error) error { func extractLinks(p crawl.Publisher, u string, depth int, resp *http.Response, _ error) error {
links, err := analysis.GetLinks(resp) links, err := analysis.GetLinks(resp)
if err != nil { if err != nil {
// Not a fatal error, just a bad web page. // Not a fatal error, just a bad web page.
...@@ -28,7 +28,7 @@ func extractLinks(c *crawl.Crawler, u string, depth int, resp *http.Response, _ ...@@ -28,7 +28,7 @@ func extractLinks(c *crawl.Crawler, u string, depth int, resp *http.Response, _
} }
for _, link := range links { for _, link := range links {
if err := c.Enqueue(link, depth+1); err != nil { if err := p.Enqueue(link, depth+1); err != nil {
return err return err
} }
} }
......
...@@ -112,21 +112,27 @@ func (f FetcherFunc) Fetch(u string) (*http.Response, error) { ...@@ -112,21 +112,27 @@ func (f FetcherFunc) Fetch(u string) (*http.Response, error) {
// unless the handler returns the special error ErrRetryRequest. // unless the handler returns the special error ErrRetryRequest.
type Handler interface { type Handler interface {
// Handle the response from a URL. // Handle the response from a URL.
Handle(*Crawler, string, int, *http.Response, error) error Handle(Publisher, string, int, *http.Response, error) error
} }
// HandlerFunc wraps a function into the Handler interface. // HandlerFunc wraps a function into the Handler interface.
type HandlerFunc func(*Crawler, string, int, *http.Response, error) error type HandlerFunc func(Publisher, string, int, *http.Response, error) error
// Handle the response from a URL. // Handle the response from a URL.
func (f HandlerFunc) Handle(db *Crawler, u string, depth int, resp *http.Response, err error) error { func (f HandlerFunc) Handle(p Publisher, u string, depth int, resp *http.Response, err error) error {
return f(db, u, depth, resp, err) return f(p, u, depth, resp, err)
} }
// ErrRetryRequest is returned by a Handler when the request should be // ErrRetryRequest is returned by a Handler when the request should be
// retried after some time. // retried after some time.
var ErrRetryRequest = errors.New("retry_request") var ErrRetryRequest = errors.New("retry_request")
// Publisher is an interface to something with an Enqueue() method to
// add new potential URLs to crawl.
type Publisher interface {
Enqueue(Outlink, int) error
}
// The Crawler object contains the crawler state. // The Crawler object contains the crawler state.
type Crawler struct { type Crawler struct {
db *gobDB db *gobDB
...@@ -341,8 +347,8 @@ func (c *Crawler) Close() { ...@@ -341,8 +347,8 @@ func (c *Crawler) Close() {
// and adds them to the queue for crawling. It will call the wrapped // and adds them to the queue for crawling. It will call the wrapped
// handler on all requests regardless. // handler on all requests regardless.
func FollowRedirects(wrap Handler) Handler { func FollowRedirects(wrap Handler) Handler {
return HandlerFunc(func(c *Crawler, u string, depth int, resp *http.Response, err error) error { return HandlerFunc(func(p Publisher, u string, depth int, resp *http.Response, err error) error {
if herr := wrap.Handle(c, u, depth, resp, err); herr != nil { if herr := wrap.Handle(p, u, depth, resp, err); herr != nil {
return herr return herr
} }
...@@ -356,7 +362,7 @@ func FollowRedirects(wrap Handler) Handler { ...@@ -356,7 +362,7 @@ func FollowRedirects(wrap Handler) Handler {
if uerr != nil { if uerr != nil {
log.Printf("error parsing Location header: %v", uerr) log.Printf("error parsing Location header: %v", uerr)
} else { } else {
return c.Enqueue(Outlink{URL: locationURL, Tag: TagPrimary}, depth+1) return p.Enqueue(Outlink{URL: locationURL, Tag: TagPrimary}, depth+1)
} }
} }
return nil return nil
...@@ -367,14 +373,14 @@ func FollowRedirects(wrap Handler) Handler { ...@@ -367,14 +373,14 @@ func FollowRedirects(wrap Handler) Handler {
// "successful" HTTP status code (anything < 400). When using this // "successful" HTTP status code (anything < 400). When using this
// wrapper, subsequent Handle calls will always have err set to nil. // wrapper, subsequent Handle calls will always have err set to nil.
func FilterErrors(wrap Handler) Handler { func FilterErrors(wrap Handler) Handler {
return HandlerFunc(func(c *Crawler, u string, depth int, resp *http.Response, err error) error { return HandlerFunc(func(p Publisher, u string, depth int, resp *http.Response, err error) error {
if err != nil { if err != nil {
return nil return nil
} }
if resp.StatusCode >= 400 { if resp.StatusCode >= 400 {
return nil return nil
} }
return wrap.Handle(c, u, depth, resp, nil) return wrap.Handle(p, u, depth, resp, nil)
}) })
} }
...@@ -382,11 +388,11 @@ func FilterErrors(wrap Handler) Handler { ...@@ -382,11 +388,11 @@ func FilterErrors(wrap Handler) Handler {
// temporary errors (all transport-level errors are considered // temporary errors (all transport-level errors are considered
// temporary, as well as any HTTP status code >= 500). // temporary, as well as any HTTP status code >= 500).
func HandleRetries(wrap Handler) Handler { func HandleRetries(wrap Handler) Handler {
return HandlerFunc(func(c *Crawler, u string, depth int, resp *http.Response, err error) error { return HandlerFunc(func(p Publisher, u string, depth int, resp *http.Response, err error) error {
if err != nil || resp.StatusCode == http.StatusTooManyRequests || resp.StatusCode >= 500 { if err != nil || resp.StatusCode == http.StatusTooManyRequests || resp.StatusCode >= 500 {
return ErrRetryRequest return ErrRetryRequest
} }
return wrap.Handle(c, u, depth, resp, nil) return wrap.Handle(p, u, depth, resp, nil)
}) })
} }
......
...@@ -33,11 +33,11 @@ func TestCrawler(t *testing.T) { ...@@ -33,11 +33,11 @@ func TestCrawler(t *testing.T) {
) )
var crawledPages int var crawledPages int
h := HandlerFunc(func(c *Crawler, u string, depth int, resp *http.Response, err error) error { h := HandlerFunc(func(p Publisher, u string, depth int, resp *http.Response, err error) error {
crawledPages++ crawledPages++
next := fmt.Sprintf(srv.URL+"/page/%d", crawledPages) next := fmt.Sprintf(srv.URL+"/page/%d", crawledPages)
log.Printf("%s -> %s", u, next) log.Printf("%s -> %s", u, next)
c.Enqueue(Outlink{ p.Enqueue(Outlink{
URL: mustParseURL(next), URL: mustParseURL(next),
Tag: TagPrimary, Tag: TagPrimary,
}, depth+1) }, depth+1)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment