Commit b09f05f8 authored by ale's avatar ale

initial commit

parents
// A restartable crawler that dumps everything to a WARC file.
package main
import (
"bytes"
"flag"
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"net/url"
"os"
"regexp"
"strconv"
"strings"
"git.autistici.org/ale/crawl"
"github.com/PuerkitoBio/goquery"
)
var (
dbPath = flag.String("state", "crawldb", "crawl state database path")
concurrency = flag.Int("c", 10, "concurrent workers")
depth = flag.Int("depth", 10, "maximum link depth")
validSchemes = flag.String("schemes", "http,https", "comma-separated list of allowed protocols")
outputFile = flag.String("output", "crawl.warc.gz", "output WARC file")
urlcssRx = regexp.MustCompile(`background.*:.*url\(["']?([^'"\)]+)["']?\)`)
)
var linkMatches = []struct {
tag string
attr string
}{
{"a", "href"},
{"link", "href"},
{"img", "src"},
{"script", "src"},
}
func extractLinks(c *crawl.Crawler, u string, depth int, resp *http.Response, err error) error {
var outlinks []string
ctype := resp.Header.Get("Content-Type")
if strings.HasPrefix(ctype, "text/html") {
doc, err := goquery.NewDocumentFromResponse(resp)
if err != nil {
return err
}
for _, lm := range linkMatches {
doc.Find(fmt.Sprintf("%s[%s]", lm.tag, lm.attr)).Each(func(i int, s *goquery.Selection) {
val, _ := s.Attr(lm.attr)
outlinks = append(outlinks, val)
})
}
} else if strings.HasPrefix(ctype, "text/css") {
if data, err := ioutil.ReadAll(resp.Body); err == nil {
for _, val := range urlcssRx.FindAllStringSubmatch(string(data), -1) {
outlinks = append(outlinks, val[1])
}
}
}
// Uniquify and parse outbound links.
links := make(map[string]*url.URL)
for _, val := range outlinks {
if linkurl, err := resp.Request.URL.Parse(val); err == nil {
links[linkurl.String()] = linkurl
}
}
for _, link := range links {
//log.Printf("%s -> %s", u, link.String())
c.Enqueue(link, depth+1)
}
return nil
}
type fakeCloser struct {
io.Reader
}
func (f *fakeCloser) Close() error {
return nil
}
func hdr2str(h http.Header) []byte {
var b bytes.Buffer
h.Write(&b)
return b.Bytes()
}
type warcSaveHandler struct {
warc *crawl.WarcWriter
warcInfoID string
}
func (h *warcSaveHandler) Handle(c *crawl.Crawler, u string, depth int, resp *http.Response, err error) error {
data, derr := ioutil.ReadAll(resp.Body)
if derr != nil {
return err
}
resp.Body = &fakeCloser{bytes.NewReader(data)}
// Dump the request.
var b bytes.Buffer
resp.Request.Write(&b)
hdr := crawl.NewWarcHeader()
hdr.Set("WARC-Type", "request")
hdr.Set("WARC-Target-URI", resp.Request.URL.String())
hdr.Set("WARC-Warcinfo-ID", h.warcInfoID)
hdr.Set("Content-Length", strconv.Itoa(b.Len()))
w := h.warc.NewRecord(hdr)
w.Write(b.Bytes())
w.Close()
// Dump the response.
statusLine := fmt.Sprintf("HTTP/1.1 %s", resp.Status)
respPayload := bytes.Join([][]byte{
[]byte(statusLine), hdr2str(resp.Header), data},
[]byte{'\r', '\n'})
hdr = crawl.NewWarcHeader()
hdr.Set("WARC-Type", "response")
hdr.Set("WARC-Target-URI", resp.Request.URL.String())
hdr.Set("WARC-Warcinfo-ID", h.warcInfoID)
hdr.Set("Content-Length", strconv.Itoa(len(respPayload)))
w = h.warc.NewRecord(hdr)
w.Write(respPayload)
w.Close()
return extractLinks(c, u, depth, resp, err)
}
func NewSaveHandler(w *crawl.WarcWriter) crawl.Handler {
info := strings.Join([]string{
"Software: crawl/1.0\r\n",
"Format: WARC File Format 1.0\r\n",
"Conformsto: http://bibnum.bnf.fr/WARC/WARC_ISO_28500_version1_latestdraft.pdf\r\n",
}, "")
hdr := crawl.NewWarcHeader()
hdr.Set("WARC-Type", "warcinfo")
hdr.Set("WARC-Warcinfo-ID", hdr.Get("WARC-Record-ID"))
hdr.Set("Content-Length", strconv.Itoa(len(info)))
hdrw := w.NewRecord(hdr)
io.WriteString(hdrw, info)
hdrw.Close()
return &warcSaveHandler{
warc: w,
warcInfoID: hdr.Get("WARC-Record-ID"),
}
}
func main() {
flag.Parse()
outf, err := os.Create(*outputFile)
if err != nil {
log.Fatal(err)
}
seeds := crawl.MustParseURLs(flag.Args())
scope := crawl.NewSeedScope(seeds, *depth, strings.Split(*validSchemes, ","))
w := crawl.NewWarcWriter(outf)
defer w.Close()
saver := NewSaveHandler(w)
crawler, err := crawl.NewCrawler("crawldb", seeds, scope, crawl.FetcherFunc(http.Get), saver)
if err != nil {
log.Fatal(err)
}
crawler.Run()
}
// A restartable crawler that extracts links from HTML pages and
// simply prints them.
//
package main
import (
"flag"
"fmt"
"log"
"net/http"
"net/url"
"strings"
"git.autistici.org/ale/crawl"
"github.com/PuerkitoBio/goquery"
)
var (
dbPath = flag.String("state", "crawldb", "crawl state database path")
concurrency = flag.Int("c", 10, "concurrent workers")
depth = flag.Int("depth", 10, "maximum link depth")
validSchemes = flag.String("schemes", "http,https", "comma-separated list of allowed protocols")
)
var linkMatches = []struct {
tag string
attr string
}{
{"a", "href"},
{"link", "href"},
{"img", "src"},
{"script", "src"},
}
func extractLinks(c *crawl.Crawler, u string, depth int, resp *http.Response, err error) error {
if !strings.HasPrefix(resp.Header.Get("Content-Type"), "text/html") {
return nil
}
doc, err := goquery.NewDocumentFromResponse(resp)
if err != nil {
return err
}
links := make(map[string]*url.URL)
for _, lm := range linkMatches {
doc.Find(fmt.Sprintf("%s[%s]", lm.tag, lm.attr)).Each(func(i int, s *goquery.Selection) {
val, _ := s.Attr(lm.attr)
if linkurl, err := resp.Request.URL.Parse(val); err == nil {
links[linkurl.String()] = linkurl
}
})
}
for _, link := range links {
//log.Printf("%s -> %s", u, link.String())
c.Enqueue(link, depth+1)
}
return nil
}
func main() {
flag.Parse()
seeds := crawl.MustParseURLs(flag.Args())
scope := crawl.NewSeedScope(seeds, *depth, strings.Split(*validSchemes, ","))
crawler, err := crawl.NewCrawler("crawldb", seeds, scope, crawl.FetcherFunc(http.Get), crawl.HandlerFunc(extractLinks))
if err != nil {
log.Fatal(err)
}
crawler.Run()
}
package crawl
import (
"bytes"
"encoding/gob"
"errors"
"fmt"
"log"
"net/http"
"net/url"
"strings"
"sync"
"time"
"github.com/PuerkitoBio/purell"
"github.com/jmhodges/levigo"
)
type gobDB struct {
*levigo.DB
}
func newGobDB(path string) (*gobDB, error) {
opts := levigo.NewOptions()
opts.SetCreateIfMissing(true)
opts.SetCache(levigo.NewLRUCache(2 << 20))
opts.SetFilterPolicy(levigo.NewBloomFilter(10))
db, err := levigo.Open(path, opts)
if err != nil {
return nil, err
}
return &gobDB{db}, nil
}
func (db *gobDB) PutObj(wo *levigo.WriteOptions, key []byte, obj interface{}) error {
var b bytes.Buffer
if err := gob.NewEncoder(&b).Encode(obj); err != nil {
return err
}
return db.Put(wo, key, b.Bytes())
}
func (db *gobDB) GetObj(ro *levigo.ReadOptions, key []byte, obj interface{}) error {
data, err := db.Get(ro, key)
if err != nil {
return err
}
if err := gob.NewDecoder(bytes.NewBuffer(data)).Decode(obj); err != nil {
return err
}
return nil
}
type URLInfo struct {
URL string
StatusCode int
CrawledAt time.Time
Error error
}
type Scope interface {
Check(*url.URL, int) bool
}
type Fetcher interface {
Fetch(string) (*http.Response, error)
}
type FetcherFunc func(string) (*http.Response, error)
func (f FetcherFunc) Fetch(u string) (*http.Response, error) {
return f(u)
}
type Handler interface {
Handle(*Crawler, string, int, *http.Response, error) error
}
type HandlerFunc func(*Crawler, string, int, *http.Response, error) error
func (f HandlerFunc) Handle(db *Crawler, u string, depth int, resp *http.Response, err error) error {
return f(db, u, depth, resp, err)
}
// UrlDb is the database of crawls (either pending or done).
type Crawler struct {
db *gobDB
seeds []*url.URL
scope Scope
fetcher Fetcher
handler Handler
enqueueMx sync.Mutex
}
type QueuePair struct {
Key []byte
URL string
Depth int
}
// Update this URLInfo entry in the crawl database.
func (c *Crawler) UpdateURL(info *URLInfo) error {
wo := levigo.NewWriteOptions()
defer wo.Close()
return c.db.PutObj(wo, []byte(fmt.Sprintf("url/%s", info.URL)), info)
}
// Enqueue a (possibly new) URL for processing.
func (c *Crawler) Enqueue(u *url.URL, depth int) {
// Normalize the URL.
urlStr := purell.NormalizeURL(u, purell.FlagsSafe|purell.FlagRemoveDotSegments|purell.FlagRemoveDuplicateSlashes|purell.FlagRemoveFragment|purell.FlagRemoveDirectoryIndex|purell.FlagSortQuery)
// See if it's in scope.
if !c.scope.Check(u, depth) {
return
}
c.enqueueMx.Lock()
defer c.enqueueMx.Unlock()
// Check if we've already seen it.
var info URLInfo
ro := levigo.NewReadOptions()
defer ro.Close()
ukey := []byte(fmt.Sprintf("url/%s", urlStr))
if err := c.db.GetObj(ro, ukey, &info); err == nil {
return
}
// Create a unique key using the URL and the current timestamp.
qkey := []byte(fmt.Sprintf("queue/%d/%s", time.Now().Unix(), urlStr))
// Store the URL in the queue, and store an empty URLInfo to
// make sure that subsequent calls to Enqueue with the same
// URL will fail.
wo := levigo.NewWriteOptions()
defer wo.Close()
c.db.PutObj(wo, qkey, &QueuePair{Key: qkey, URL: urlStr, Depth: depth})
c.db.PutObj(wo, ukey, &info)
}
// Scan the queue for URLs until there are no more.
func (c *Crawler) process() <-chan QueuePair {
ch := make(chan QueuePair)
go func() {
queuePrefix := []byte("queue/")
for range time.Tick(2 * time.Second) {
n := 0
// Scan the queue using a snapshot, to ignore
// new URLs that might be added after this.
s := c.db.NewSnapshot()
ro := levigo.NewReadOptions()
ro.SetSnapshot(s)
iter := c.db.NewIterator(ro)
for iter.Seek(queuePrefix); iter.Valid() && bytes.HasPrefix(iter.Key(), queuePrefix); iter.Next() {
var p QueuePair
if err := gob.NewDecoder(bytes.NewBuffer(iter.Value())).Decode(&p); err != nil {
continue
}
ch <- p
n++
}
iter.Close()
ro.Close()
c.db.ReleaseSnapshot(s)
if n == 0 {
break
}
}
close(ch)
}()
return ch
}
// Main worker loop.
func (c *Crawler) urlHandler(queue <-chan QueuePair) {
for p := range queue {
// Fetch the URL and handle it. Make sure to Close the
// response body (even if it gets replaced in the
// Response object).
fmt.Printf("%s\n", p.URL)
httpResp, httpErr := c.fetcher.Fetch(p.URL)
respBody := httpResp.Body
err := c.handler.Handle(c, p.URL, p.Depth, httpResp, httpErr)
if httpErr == nil {
respBody.Close()
}
// Remove the URL from the queue if the handler was successful.
if err == nil {
wo := levigo.NewWriteOptions()
c.db.Delete(wo, p.Key)
wo.Close()
} else {
log.Printf("error handling %s: %v", p.URL, err)
}
}
}
type seedScope struct {
seeds []*url.URL
schemes map[string]struct{}
maxDepth int
}
func (s *seedScope) Check(u *url.URL, depth int) bool {
// Ignore non-allowed schemes.
if _, ok := s.schemes[u.Scheme]; !ok {
return false
}
// Do not crawl beyond maxDepth.
if depth > s.maxDepth {
return false
}
// Check each seed prefix.
for _, seed := range s.seeds {
if u.Host == seed.Host && strings.HasPrefix(u.Path, seed.Path) {
return true
}
}
return false
}
// NewSeedScope returns a Scope that will only allow crawling the seed
// domains, and not beyond the specified maximum link depth.
func NewSeedScope(seeds []*url.URL, maxDepth int, allowedSchemes []string) Scope {
scope := &seedScope{
seeds: seeds,
maxDepth: maxDepth,
schemes: make(map[string]struct{}),
}
for _, s := range allowedSchemes {
scope.schemes[s] = struct{}{}
}
return scope
}
func MustParseURLs(urls []string) []*url.URL {
// Parse the seed URLs.
var parsed []*url.URL
for _, s := range urls {
u, err := url.Parse(s)
if err != nil {
log.Fatalf("error parsing seed \"%s\": %v", s, err)
}
parsed = append(parsed, u)
}
return parsed
}
// NewCrawler creates a new Crawler object with the specified behavior.
func NewCrawler(path string, seeds []*url.URL, scope Scope, f Fetcher, h Handler) (*Crawler, error) {
// Open the crawl database.
db, err := newGobDB(path)
if err != nil {
return nil, err
}
c := &Crawler{
db: db,
fetcher: f,
handler: &standardPageHandler{h},
seeds: seeds,
scope: scope,
}
return c, nil
}
// Run the crawl, does not exit until it is done.
func (c *Crawler) Run() {
// Load initial seeds into the queue.
for _, u := range c.seeds {
c.Enqueue(u, 0)
}
// Start some runners and wait until they're done.
var wg sync.WaitGroup
ch := c.process()
for i := 0; i < 3; i++ {
wg.Add(1)
go func() {
c.urlHandler(ch)
wg.Done()
}()
}
wg.Wait()
}
// Standard page handler, follows redirects and invokes a child
// handler when status == 200.
type standardPageHandler struct {
h Handler
}
func (wrap *standardPageHandler) Handle(c *Crawler, u string, depth int, resp *http.Response, err error) error {
info := &URLInfo{URL: u, CrawledAt: time.Now()}
if err == nil {
info.StatusCode = resp.StatusCode
if resp.StatusCode == 200 {
err = wrap.h.Handle(c, u, depth, resp, err)
} else if resp.StatusCode > 300 && resp.StatusCode < 400 {
location := resp.Header.Get("Location")
if location != "" {
locationUrl, err := resp.Request.URL.Parse(location)
if err != nil {
log.Printf("error parsing Location header: %v", err)
} else {
c.Enqueue(locationUrl, depth+1)
}
}
} else {
err = errors.New(resp.Status)
}
}
info.Error = err
//log.Printf("[CRAWL] %+v", info)
c.UpdateURL(info)
return nil
}
package crawl
import (
"fmt"
"io"
"time"
"compress/gzip"
"code.google.com/p/go-uuid/uuid"
)
var (
warcTimeFmt = time.RFC3339
warcVersion = "WARC/1.0"
warcContentTypes = map[string]string{
"warcinfo": "application/warc-fields",
"response": "application/http; msgtype=response",
"request": "application/http; msgtype=request",
"metadata": "application/warc-fields",
}
)
// A Warc header. Header field names are case-sensitive.
type WarcHeader map[string]string
// Set a header to the specified value. Multiple values are not
// supported.
func (h WarcHeader) Set(key, value string) {
h[key] = value
// Keep Content-Type in sync with WARC-Type.
if key == "WARC-Type" {
if ct, ok := warcContentTypes[value]; ok {
h["Content-Type"] = ct
} else {
h["Content-Type"] = "application/octet-stream"
}
}
}
// Get the value of a header. If not found, returns an empty string.
func (h WarcHeader) Get(key string) string {
return h[key]
}
// Encode the header to a Writer.
func (h WarcHeader) Encode(w io.Writer) {
fmt.Fprintf(w, "%s\r\n", warcVersion)
for hdr, value := range h {
fmt.Fprintf(w, "%s: %s\r\n", hdr, value)
}
fmt.Fprintf(w, "\r\n")
}
// NewWarcHeader returns a WarcHeader with its own unique ID and the
// current timestamp.
func NewWarcHeader() WarcHeader {
h := make(WarcHeader)
h.Set("WARC-Record-ID", fmt.Sprintf("<%s>", uuid.NewUUID().URN()))
h.Set("WARC-Date", time.Now().Format(warcTimeFmt))
h.Set("Content-Type", "application/octet-stream")
return h
}
// WarcWriter can write records to a file in WARC format.
type WarcWriter struct {
writer io.WriteCloser
}
type recordWriter struct {
io.Writer
}
func (rw *recordWriter) Close() error {
// Add the end-of-record marker.
fmt.Fprintf(rw, "\r\n\r\n")
return nil
}
// NewRecord starts a new WARC record with the provided header. The
// caller must call Close on the returned writer before creating the
// next record.
func (w *WarcWriter) NewRecord(hdr WarcHeader) io.WriteCloser {
hdr.Encode(w.writer)
return &recordWriter{w.writer}
}
// Close the WARC writer and flush all buffers.
func (w *WarcWriter) Close() error {
return w.writer.Close()
}
func NewWarcWriter(w io.WriteCloser) *WarcWriter {
return &WarcWriter{gzip.NewWriter(w)}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment