...
 
Commits (5)
......@@ -65,6 +65,13 @@ created, e.g.:
$ crawl --output=out-%s.warc.gz http://example.com/
The crawler will rate-limit its requests to avoid overloading the
target servers. You can select the desired rate of requests per second
with the *--qps* option. It is a floating point number, so you can use
values < 1 to space requests further apart than one second. Note that
rate limiting is currently applied separately for each hostname,
*before* DNS resolution.
## Limitations
Like most crawlers, this one has a number of limitations:
......
......@@ -22,6 +22,7 @@ import (
"syscall"
"time"
"git.autistici.org/ai3/jobqueue/queue"
"git.autistici.org/ale/crawl"
"git.autistici.org/ale/crawl/analysis"
"git.autistici.org/ale/crawl/warc"
......@@ -36,6 +37,7 @@ var (
excludeRelated = flag.Bool("exclude-related", false, "include related resources (css, images, etc) only if their URL is in scope")
outputFile = flag.String("output", "crawl.warc.gz", "output WARC file or pattern (patterns must include a \"%s\" literal token)")
warcFileSizeMB = flag.Int("output-max-size", 100, "maximum output WARC file size (in MB) when using patterns")
hostQPS = flag.Float64("qps", 3, "per-hostname qps limit")
cpuprofile = flag.String("cpuprofile", "", "create cpu profile")
excludes []*regexp.Regexp
......@@ -296,12 +298,20 @@ func main() {
log.Fatal(err)
}
var rl queue.RatelimiterFunc
if *hostQPS > 0 {
rl = func(_ []byte) queue.Ratelimiter {
return queue.NewSimpleRatelimiter(*hostQPS)
}
}
crawler, err := crawl.NewCrawler(
*dbPath,
seeds,
scope,
crawl.FetcherFunc(fetch),
crawl.HandleRetries(crawl.FollowRedirects(crawl.FilterErrors(saver))),
rl,
)
if err != nil {
log.Fatal(err)
......
......@@ -63,6 +63,7 @@ func TestCrawl(t *testing.T) {
scope,
crawl.FetcherFunc(fetch),
crawl.HandleRetries(crawl.FollowRedirects(crawl.FilterErrors(saver))),
nil,
)
if err != nil {
t.Fatal(err)
......
......@@ -52,6 +52,7 @@ func main() {
scope,
crawl.FetcherFunc(http.Get),
crawl.HandleRetries(crawl.FollowRedirects(crawl.FilterErrors(crawl.HandlerFunc(extractLinks)))),
nil,
)
if err != nil {
log.Fatal(err)
......
This diff is collapsed.
......@@ -44,7 +44,7 @@ func TestCrawler(t *testing.T) {
return nil
})
crawler, err := NewCrawler(dir+"/crawl.db", seeds, scope, FetcherFunc(http.Get), HandleRetries(FilterErrors(FollowRedirects(h))))
crawler, err := NewCrawler(dir+"/crawl.db", seeds, scope, FetcherFunc(http.Get), HandleRetries(FilterErrors(FollowRedirects(h))), nil)
if err != nil {
t.Fatal("NewCrawler", err)
}
......
package crawl
import (
"bytes"
"encoding/binary"
"errors"
"math/rand"
"sync/atomic"
"time"
"github.com/syndtr/goleveldb/leveldb"
)
type queue struct {
db *gobDB
numActive int32
}
var (
queuePrefix = []byte("queue")
activePrefix = []byte("queue_active")
queueKeySep = []byte{'/'}
)
type queuePair struct {
key []byte
URL string
Depth int
}
// Scan the pending queue and send items on 'ch'. Returns an error
// when the queue is empty (work is done).
func (q *queue) Scan(ch chan<- queuePair) error {
n := 0
startKey, endKey := queueScanRange()
iter := q.db.NewRangeIterator(startKey, endKey)
defer iter.Release()
for iter.Next() {
var p queuePair
if err := iter.Value(&p); err != nil {
continue
}
p.key = iter.Key()
if err := q.acquire(p); err != nil {
return err
}
ch <- p
n++
}
if n == 0 && q.numActive == 0 {
return errors.New("EOF")
}
return nil
}
// Add an item to the pending work queue.
func (q *queue) Add(wb *leveldb.Batch, urlStr string, depth int, when time.Time) error {
t := uint64(when.UnixNano())
qkey := bytes.Join([][]byte{queuePrefix, encodeUint64(t), encodeUint64(uint64(rand.Int63()))}, queueKeySep)
return q.db.PutObjBatch(wb, qkey, &queuePair{URL: urlStr, Depth: depth})
}
func (q *queue) acquire(qp queuePair) error {
wb := new(leveldb.Batch)
if err := q.db.PutObjBatch(wb, activeQueueKey(qp.key), qp); err != nil {
return err
}
wb.Delete(qp.key)
if err := q.db.Write(wb, nil); err != nil {
return err
}
atomic.AddInt32(&q.numActive, 1)
return nil
}
// Release an item from the queue. Processing for this item is done.
func (q *queue) Release(wb *leveldb.Batch, qp queuePair) {
wb.Delete(activeQueueKey(qp.key))
atomic.AddInt32(&q.numActive, -1)
}
// Retry processing this item at a later time.
func (q *queue) Retry(wb *leveldb.Batch, qp queuePair, delay time.Duration) error {
wb.Delete(activeQueueKey(qp.key))
if err := q.Add(wb, qp.URL, qp.Depth, time.Now().Add(delay)); err != nil {
return err
}
atomic.AddInt32(&q.numActive, -1)
return nil
}
// Recover moves all active tasks to the pending queue. To be
// called at startup to recover tasks that were active when the
// previous run terminated.
func (q *queue) Recover() error {
wb := new(leveldb.Batch)
prefix := bytes.Join([][]byte{activePrefix, []byte{}}, queueKeySep)
iter := q.db.NewPrefixIterator(prefix)
defer iter.Release()
for iter.Next() {
var p queuePair
if err := iter.Value(&p); err != nil {
continue
}
p.key = iter.Key()[len(activePrefix)+1:]
if err := q.db.PutObjBatch(wb, p.key, &p); err != nil {
return err
}
wb.Delete(iter.Key())
}
return q.db.Write(wb, nil)
}
func encodeUint64(n uint64) []byte {
var b [8]byte
binary.BigEndian.PutUint64(b[:], n)
return b[:]
}
func activeQueueKey(key []byte) []byte {
return bytes.Join([][]byte{activePrefix, key}, queueKeySep)
}
func queueScanRange() ([]byte, []byte) {
tlim := uint64(time.Now().UnixNano() + 1)
startKey := bytes.Join([][]byte{queuePrefix, []byte{}}, queueKeySep)
endKey := bytes.Join([][]byte{queuePrefix, encodeUint64(tlim)}, queueKeySep)
return startKey, endKey
}
MIT License
Copyright (c) 2018 <ale@incal.net>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
Job queue
===
An efficient and relatively fast and robust queue with the following
characteristics:
* persistent, must survive restarts and recover;
* keyed by time, elements added to the queue can specify a minimum
timestamp for scheduling - this allows us to retry jobs on error
after a delay;
* rate-limiting by domain, every element can have an associated
rate-limiting domain, and the queue will pop elements according to
the desired rate limit. The list of possible domains is not known in
advance;
* detect dead jobs using *leases* with keepalives, which works both
for short-running and long-running jobs without making assumptions
on their expected duration.
This package offers a Go API to embed queues inside your application.
## Usage
The API attempts to *scale transparently from in-process queues to
remote, distributed ones*. By this we mean using interfaces to
decouple the environment setup from the application logic.
For instance, this would be a typical "worker" flow:
```go
func worker(ctx context.Context, queue jobqueue.Queue) error {
job, err := queue.Next(ctx)
if err != nil {
return err
}
// Do work with 'job'...
return job.Done(ctx, nil)
}
```
The setup is then handled when creating the *Queue* client object, for
which we have various alternatives.
### In-process queue
An in-process queue has minimal overhead: every client interacts
directly with the database, the queues are implemented with Go
channels.
Clearly in this case you will also need to create the actual queue
implementation *queue.Queue* object within the application.
Example setup:
```go
// Create the in-process Queue implementation.
q, _ := queue.NewQueue(db)
// Use its Client() method to get a client that we can pass to worker().
client := q.Client()
```
### Remote queue
For distributed setups (meaning: distributed workers, but still a
centralized queue scheduler), we're using GRPC for inter-process
communication.
Example setup:
```go
// This returns a Queue client object that we can pass to worker().
client, err := net.NewClient("myserver:1234")
```
While on the server:
```go
q, _ := queue.NewLeaseQueue()
grpcServer := grpc.NewServer()
pb.RegisterQueueServiceServer(grpcServer, net.NewServer(q))
grpcServer.Serve(listener)
```
Note that the server must use the *queue.LeaseQueue* implementation in
order to properly support dead worker detection etc.
## Design
The queue is built on top of a key-value store with fast range
iteration (LevelDB is a reasonable starting point).
Domain ratelimiting can be implemented with separate per-domain
queues, and discovering new domains at element insertion time. Then
we can maintain a domain map at runtime (conveniently overlaps with
the rate-limiter buckets storage):
> *domain* -> { *ratelimiter*, *queue\_head* }
> *ratelimiter* -> { *next\_available\_timestamp* }
The domain map can then be kept ordered by *next_available_timestamp*,
so that the queue *next()* method can quickly find a non-ratelimited,
non-empty queue. In pseudo-code:
```
for domain in ordered_domains:
if domain.next_available_timestamp() < now() and not domain.empty():
domain.ratelimit_incr()
return domain.next_nonblock()
```
The queue key structure must then be the following:
> *domain* / *timestamp* / *unique\_id*
package queue
import (
"context"
"io"
"log"
"time"
jq "git.autistici.org/ai3/jobqueue"
)
// Local queue client.
type localClient struct {
q *Queue
}
func (c *localClient) Add(_ context.Context, qname, data []byte) error {
_, err := c.q.Add(qname, data, time.Time{})
return err
}
func (c *localClient) Next(ctx context.Context) (jq.Job, error) {
var item Item
select {
case item = <-c.q.Subscribe():
case <-ctx.Done():
return nil, ctx.Err()
}
if item == nil {
return nil, io.EOF
}
return &localClientJob{
Item: item,
q: c.q,
}, nil
}
func (c *localClient) Close() {}
type localClientJob struct {
Item
q *Queue
}
func (j *localClientJob) Done(_ context.Context, err error) error {
if err != nil {
return j.q.Nack(j.Item)
}
return j.q.Ack(j.Item)
}
// Client returns a local, in-process client for this queue.
func (q *Queue) Client() jq.Queue {
return &localClient{q: q}
}
// Client for local LeaseQueue.
type localLeaseClient struct {
q *LeaseQueue
keepaliveInterval time.Duration
}
func (c *localLeaseClient) Add(_ context.Context, qname, data []byte) error {
_, err := c.q.Add(qname, data, time.Time{})
return err
}
func (c *localLeaseClient) Next(ctx context.Context) (jq.Job, error) {
var item Item
select {
case item = <-c.q.Subscribe():
case <-ctx.Done():
return nil, ctx.Err()
}
if item == nil {
return nil, io.EOF
}
leaseID, err := c.q.Ack(item)
if err != nil {
return nil, err
}
j := &localLeaseClientJob{
Item: item,
leaseID: leaseID,
q: c.q,
stopCh: make(chan struct{}),
keepaliveDone: make(chan struct{}),
}
go j.keepalive(c.keepaliveInterval)
return j, nil
}
func (c *localLeaseClient) Close() {}
type localLeaseClientJob struct {
Item
leaseID uint64
q *LeaseQueue
stopCh chan struct{}
keepaliveDone chan struct{}
}
func (j *localLeaseClientJob) keepalive(keepaliveInterval time.Duration) {
defer close(j.keepaliveDone)
ticker := time.NewTicker(keepaliveInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := j.q.Keepalive(j.leaseID); err != nil {
log.Printf("lease %016x: keepalive error: %v", j.leaseID, err)
}
case <-j.stopCh:
return
}
}
}
func (j *localLeaseClientJob) Done(_ context.Context, err error) error {
close(j.stopCh)
<-j.keepaliveDone
if err == nil {
return j.q.Done(j.leaseID)
}
return nil
}
// Client returns a local, in-process client for this queue.
func (q *LeaseQueue) Client() jq.Queue {
return &localLeaseClient{
q: q,
keepaliveInterval: q.Queue.retryInterval / 3,
}
}
package queue
import (
"encoding/binary"
"fmt"
"math/rand"
"time"
"github.com/syndtr/goleveldb/leveldb"
)
var keyPrefixLeases = []byte("l")
// Associate a lease with the key of a future event that will cause
// its expiration. The lease mechanism is very simple and works like
// this:
//
// - when returning an entry from the queue, we create a new lease
// ID and return it to the caller along with the entry
// - we write the same entry back in the queue with a timestamp
// in the future
// - we also write a row in the db with the lease ID
// - the caller can call ping(leaseID), in which case we
// remove the previous entry from the queue, and we add it
// back with a timestamp a bit more in the future
// - if we don't hear back, the entry will be automatically
// reassigned
// - periodically, we must get rid of old leaseIDs...
func newLeaseID() uint64 {
return rand.Uint64()
}
func makeLeaseKey(id uint64) []byte {
return appendBytes(keyPrefixLeases, keySeparator, encodeUint64(id))
}
func encodeUint64(i uint64) []byte {
var b [8]byte
binary.BigEndian.PutUint64(b[:], i)
return b[:]
}
// LeaseQueue is a Queue extended with the concept of 'leases', meant
// to support remote (or otherwise unreliable) subscribers, and to
// provide retry semantics that work reliably in face of varying task
// complexities.
//
// Whenever a subscriber acknowledges receipt of an item by calling
// Ack(), a new 'retry' item is added to the queue scheduled at some
// point in the future. If the subscriber fails to call Done() within
// that time, the task is automatically retried. The lease duration
// can be periodically extended by calling Keepalive().
//
// The same mechanism used to handle dead subscribers can be used to
// deal with temporary failures: just abort the task without calling
// Done() and it will be retried in the future.
//
// The lease expiration time is set globally for the entire LeaseQueue
// (default 5 minutes).
type LeaseQueue struct {
*Queue
}
// NewLeaseQueue creates a new LeaseQueue.
func NewLeaseQueue(db *leveldb.DB, opts ...Option) (*LeaseQueue, error) {
q, err := NewQueue(db, opts...)
if err != nil {
return nil, err
}
return &LeaseQueue{Queue: q}, nil
}
// Ack is called when the consumer has received an item, creating a
// new lease for it.
func (q *LeaseQueue) Ack(item Item) (uint64, error) {
qitem := item.(*queueItem)
retryKey, err := q.Queue.Add(qitem.qname(), qitem.data, time.Now().Add(q.retryInterval))
if err != nil {
return 0, err
}
leaseID := newLeaseID()
if err := q.db.Put(makeLeaseKey(leaseID), retryKey, nil); err != nil {
return 0, err
}
return leaseID, q.Queue.Ack(item)
}
// Keepalive extends a lease when the consumer is still working on an item.
func (q *LeaseQueue) Keepalive(leaseID uint64) error {
oldRetryKey, err := q.db.Get(makeLeaseKey(leaseID), nil)
if err != nil {
return fmt.Errorf("lease %x does not exist: %v", leaseID, err)
}
data, err := q.db.Get(oldRetryKey, nil)
if err != nil {
return fmt.Errorf("row pointed by lease (%q) does not exist: %v", oldRetryKey, err)
}
qname := qnameFromKey(oldRetryKey)
retryKey, err := q.Queue.Add(qname, data, time.Now().Add(q.retryInterval))
if err != nil {
return err
}
if err := q.db.Put(makeLeaseKey(leaseID), retryKey, nil); err != nil {
return err
}
return q.Queue.remove(oldRetryKey)
}
// Done is called when a consumer is done working on the item.
func (q *LeaseQueue) Done(leaseID uint64) error {
oldRetryKey, err := q.db.Get(makeLeaseKey(leaseID), nil)
if err != nil {
return fmt.Errorf("lease %x does not exist: %v", leaseID, err)
}
if err := q.db.Delete(makeLeaseKey(leaseID), nil); err != nil {
return err
}
return q.Queue.remove(oldRetryKey)
}
This diff is collapsed.
package queue
import "time"
// Ratelimiter interface. Inc() increments the internal counter,
// NextDeadline() returns the next possible time for scheduling an
// event. NextDeadline() can be called multiple times, so it should
// probably just return an internal value that is modified by Inc().
type Ratelimiter interface {
Inc()
NextDeadline() time.Time
}
// RatelimiterFunc is a function that returns a new Ratelimiter given
// a specific domain. There is no need to cache results, the function
// will only be called once for each domain.
type RatelimiterFunc func([]byte) Ratelimiter
// The nullRatelimiter does not do anything.
type nullRatelimiter struct{}
func (l nullRatelimiter) Inc() {}
func (l nullRatelimiter) NextDeadline() time.Time { return time.Time{} }
// Keep a shared nullRatelimiter object around to be reused.
var defaultRateLimiter = new(nullRatelimiter)
// Simple rate limiter, waits 1/qps seconds.
type simpleRatelimiter struct {
period time.Duration
next time.Time
}
// NewSimpleRatelimiter returns a Ratelimiter that emits a continuous
// stream of entries at the desired qps. Probably loses accuracy at
// very high qps.
func NewSimpleRatelimiter(qps float64) Ratelimiter {
return &simpleRatelimiter{
period: time.Duration(1000000000/qps) * time.Nanosecond,
}
}
func (r *simpleRatelimiter) Inc() {
now := time.Now()
next := r.next.Add(r.period)
if next.Before(now) {
next = now
}
r.next = next
}
func (r *simpleRatelimiter) NextDeadline() time.Time { return r.next }
package queue
import (
"bytes"
"container/heap"
"fmt"
"log"
"strings"
"time"
"github.com/syndtr/goleveldb/leveldb"
ldbopt "github.com/syndtr/goleveldb/leveldb/opt"
ldbutil "github.com/syndtr/goleveldb/leveldb/util"
)
// This needs to be small, there might be millions of such queues.
//
// Some of the fields are meant to be used by the priority queue
// implementation sqPriorityQueue.
type subQueue struct {
head []byte
rl Ratelimiter
qname string
index int
nextDeadline time.Time
}
func newSubqueue(db *leveldb.DB, qname []byte, rl Ratelimiter) (*subQueue, error) {
q := &subQueue{
rl: rl,
qname: string(qname),
}
// Find the head of the queue.
qr := ldbutil.BytesPrefix(
appendBytes(keyPrefixMain, keySeparator, qname, keySeparator))
iter := db.NewIterator(qr, nil)
defer iter.Release()
if iter.First() {
q.head = copyBuffer(iter.Key())
}
return q, iter.Error()
}
func (q *subQueue) pop(db *leveldb.DB, wb *leveldb.Batch, qname []byte) ([]byte, []byte, bool) {
if q.head == nil {
return nil, nil, false
}
key, value := q.incrementHead(db, qname)
wb.Delete(key)
q.rl.Inc()
return key, value, true
}
func (q *subQueue) incrementHead(db *leveldb.DB, qname []byte) ([]byte, []byte) {
// Use an iterator to retrieve the head row, as well as finding the
// following one.
iter := db.NewIterator(&ldbutil.Range{
Start: q.head,
Limit: appendBytes(
keyPrefixMain, keySeparator, qname, keySeparatorPlus1),
}, nil)
if !iter.Next() || !bytes.Equal(iter.Key(), q.head) {
// VERY BAD right now: the head row must exist.
panic("can't fetch the head row")
}
// Pop the head value.
key := q.head
value := copyBuffer(iter.Value())
// Find the next head (element after the current head).
if iter.Next() {
q.head = copyBuffer(iter.Key())
} else {
q.head = nil
}
iter.Release()
return key, value
}
func (q *subQueue) getNextDeadline() (t time.Time) {
if q.head != nil {
t = timestampFromKey(q.head)
if rlt := q.rl.NextDeadline(); rlt.After(t) {
t = rlt
}
}
return
}
func (q *subQueue) add(wb *leveldb.Batch, qname, data []byte, when time.Time) (key []byte, headUpdated bool) {
key = makeElementKey(qname, when)
if q.head == nil || bytes.Compare(key, q.head) < 0 {
q.head = key
headUpdated = true
}
wb.Put(key, data)
return
}
// Remove an item from the subqueue, when we already know the full key.
func (q *subQueue) remove(db *leveldb.DB, wb *leveldb.Batch, qname, key []byte) bool {
wb.Delete(key)
if q.head != nil && bytes.Equal(q.head, key) {
// Oh shit we are dropping the head, need to pop it instead.
q.incrementHead(db, qname)
return true
}
return false
}
// The subqueue manager keeps a list of all known isolation domains
// and the associated subQueues. It knows which queue has the 'next'
// item.
//
// Internally it is implemented as a priority queue using
// container/heap. The additional map is used to find subqueues by
// name.
//
// The type isn't goroutine-safe, you need an external lock to use it
// safely.
type sqManager struct {
db *leveldb.DB
pq sqPriorityQueue
sub map[string]*subQueue
}
func newSqManager(db *leveldb.DB, rlFn RatelimiterFunc) (*sqManager, error) {
p := &sqManager{
db: db,
sub: make(map[string]*subQueue),
}
if err := p.load(rlFn); err != nil {
return nil, err
}
return p, nil
}
func (p *sqManager) load(rlFn RatelimiterFunc) error {
iter := p.db.NewIterator(ldbutil.BytesPrefix(
appendBytes(keyPrefixQueueList, keySeparator)),
&ldbopt.ReadOptions{
DontFillCache: true,
})
defer iter.Release()
for iter.Next() {
qname := iter.Value()
sq, err := newSubqueue(p.db, qname, rlFn(qname))
if err != nil {
return err
}
p.addMem(iter.Value(), sq)
}
if len(p.pq) > 0 {
log.Printf("queue manager: loaded %d domains", len(p.pq))
}
return iter.Error()
}
func (p *sqManager) String() string {
var parts []string
for _, sq := range p.pq {
parts = append(parts, fmt.Sprintf("head=%q next_deadline=%v", sq.head, sq.nextDeadline))
}
return strings.Join(parts, ", ")
}
// List of subqueues that satisfies the Heap interface.
type sqPriorityQueue []*subQueue
func (l sqPriorityQueue) Len() int { return len(l) }
func (l sqPriorityQueue) Swap(i, j int) {
l[i], l[j] = l[j], l[i]
l[i].index = i
l[j].index = j
}
func (l *sqPriorityQueue) Push(x interface{}) {
n := len(*l)
item := x.(*subQueue)
item.index = n
*l = append(*l, item)
}
func (l *sqPriorityQueue) Pop() interface{} {
old := *l
n := len(old)
item := old[n-1]
item.index = -1 // for safety
*l = old[0 : n-1]
return item
}
func (l sqPriorityQueue) Less(i, j int) bool {
// Elements with nil heads (empty queues) come last.
if l[i].head == nil {
return false
}
if l[j].head == nil {
return true
}
// Order by next execution deadline.
di := l[i].nextDeadline
dj := l[j].nextDeadline
return di.Before(dj)
}
// Add a subQueue to the in-memory registry.
func (p *sqManager) addMem(qname []byte, sq *subQueue) {
sq.nextDeadline = sq.getNextDeadline()
heap.Push(&p.pq, sq)
p.sub[string(qname)] = sq
}
// Add a new subQueue to the in-memory registry and to the on-disk
// queue list.
func (p *sqManager) add(qname []byte, sq *subQueue) error {
p.addMem(qname, sq)
return p.db.Put(makeQueueListKey(qname), qname, nil)
}
// Get the subQueue given its name.
func (p *sqManager) get(qname []byte) (*subQueue, bool) {
sq, ok := p.sub[string(qname)]
if !ok {
return nil, false
}
return sq, true
}
// Returns information on the next available queue (the head of the priority
// queue): name, subQueue, and next deadline.
func (p *sqManager) head() ([]byte, *subQueue, time.Time) {
sq := p.pq[0]
return []byte(sq.qname), sq, sq.nextDeadline
}
// Update a subQueue, reshuffling the priority queue given its next deadline.
// Returns a boolean indicating whether the head element nextDeadline has
// changed or not.
func (p *sqManager) update(qname []byte, sq *subQueue) bool {
oldHeadDeadline := p.pq[0].nextDeadline
sq.nextDeadline = sq.getNextDeadline()
heap.Fix(&p.pq, sq.index)
return p.pq[0].nextDeadline != oldHeadDeadline
}
func (p *sqManager) empty() bool { return len(p.pq) == 0 }
// Package jobqueue offers a simple interface to a (persistent) queue
// that can transparently scale, in API terms, from the embedded case
// to the remote distributed service scenario.
//
package jobqueue
import "context"
// Queue is a client interface to a (local or remote) queue. The
// interface is targeted towards job-based workflows
type Queue interface {
// Add a new tagged element to the queue.
Add(context.Context, []byte, []byte) error
// Retrieve an element from the queue. This call will block.
Next(context.Context) (Job, error)
// Close this client and all associated resources.
Close()
}
type Job interface {
// Job data.
Data() []byte
// Call when processing is done. All errors are temporary.
Done(context.Context, error) error
}
......@@ -37,7 +37,6 @@ Please note that because of the net/html dependency, goquery requires Go1.1+.
**Note that goquery's API is now stable, and will not break.**
* **2018-11-15 (v1.5.0)** : Go module support (thanks @Zaba505).
* **2018-06-07 (v1.4.1)** : Add `NewDocumentFromReader` examples.
* **2018-03-24 (v1.4.0)** : Deprecate `NewDocument(url)` and `NewDocumentFromResponse(response)`.
* **2018-01-28 (v1.3.0)** : Add `ToEnd` constant to `Slice` until the end of the selection (thanks to @davidjwilkins for raising the issue).
......@@ -140,7 +139,6 @@ func main() {
- [suntong/cascadia][cascadiacli], a command-line interface to the cascadia CSS selector library, useful to test selectors.
- [asciimoo/colly](https://github.com/asciimoo/colly), a lightning fast and elegant Scraping Framework
- [gnulnx/goperf](https://github.com/gnulnx/goperf), a website performance test tool that also fetches static assets.
- [MontFerret/ferret](https://github.com/MontFerret/ferret), declarative web scraping.
## Support
......
module github.com/PuerkitoBio/goquery
require (
github.com/andybalholm/cascadia v1.0.0
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a
)
github.com/andybalholm/cascadia v1.0.0 h1:hOCXnnZ5A+3eVDX8pvgl4kofXv2ELss0bKcqRySc45o=
github.com/andybalholm/cascadia v1.0.0/go.mod h1:GsXiBklL0woXo1j/WYWtSYYC4ouU9PqHO0sqidkEA4Y=
golang.org/x/net v0.0.0-20180218175443-cbe0f9307d01/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a h1:gOpx8G595UYyvj8UK4+OFyY4rx037g3fmfhe5SasG3U=
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
......@@ -5,5 +5,3 @@
The Cascadia package implements CSS selectors for use with the parse trees produced by the html package.
To test CSS selectors without writing Go code, check out [cascadia](https://github.com/suntong/cascadia) the command line tool, a thin wrapper around this package.
[Refer to godoc here](https://godoc.org/github.com/andybalholm/cascadia).
# How to contribute
We definitely welcome patches and contribution to this project!
### Legal requirements
In order to protect both you and ourselves, you will need to sign the
[Contributor License Agreement](https://cla.developers.google.com/clas).
You may have already signed it for other Google projects.
Paul Borman <borman@google.com>
bmatsuo
shawnps
theory
jboverfelt
dsymonds
cd1
wallclockbuilder
dansouza
Copyright (c) 2009,2014 Google Inc. All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following disclaimer
in the documentation and/or other materials provided with the
distribution.
* Neither the name of Google Inc. nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
# uuid ![build status](https://travis-ci.org/google/uuid.svg?branch=master)
The uuid package generates and inspects UUIDs based on
[RFC 4122](http://tools.ietf.org/html/rfc4122)
and DCE 1.1: Authentication and Security Services.
This package is based on the github.com/pborman/uuid package (previously named
code.google.com/p/go-uuid). It differs from these earlier packages in that
a UUID is a 16 byte array rather than a byte slice. One loss due to this
change is the ability to represent an invalid UUID (vs a NIL UUID).
###### Install
`go get github.com/google/uuid`
###### Documentation
[![GoDoc](https://godoc.org/github.com/google/uuid?status.svg)](http://godoc.org/github.com/google/uuid)
Full `go doc` style documentation for the package can be viewed online without
installing this package by using the GoDoc site here:
http://godoc.org/github.com/google/uuid
// Copyright 2016 Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package uuid
import (
"encoding/binary"
"fmt"
"os"
)
// A Domain represents a Version 2 domain
type Domain byte
// Domain constants for DCE Security (Version 2) UUIDs.
const (
Person = Domain(0)
Group = Domain(1)
Org = Domain(2)
)
// NewDCESecurity returns a DCE Security (Version 2) UUID.
//
// The domain should be one of Person, Group or Org.
// On a POSIX system the id should be the users UID for the Person
// domain and the users GID for the Group. The meaning of id for
// the domain Org or on non-POSIX systems is site defined.
//
// For a given domain/id pair the same token may be returned for up to
// 7 minutes and 10 seconds.
func NewDCESecurity(domain Domain, id uint32) (UUID, error) {
uuid, err := NewUUID()
if err == nil {
uuid[6] = (uuid[6] & 0x0f) | 0x20 // Version 2
uuid[9] = byte(domain)
binary.BigEndian.PutUint32(uuid[0:], id)
}
return uuid, err
}
// NewDCEPerson returns a DCE Security (Version 2) UUID in the person
// domain with the id returned by os.Getuid.
//
// NewDCESecurity(Person, uint32(os.Getuid()))
func NewDCEPerson() (UUID, error) {
return NewDCESecurity(Person, uint32(os.Getuid()))
}
// NewDCEGroup returns a DCE Security (Version 2) UUID in the group
// domain with the id returned by os.Getgid.
//
// NewDCESecurity(Group, uint32(os.Getgid()))
func NewDCEGroup() (UUID, error) {
return NewDCESecurity(Group, uint32(os.Getgid()))
}
// Domain returns the domain for a Version 2 UUID. Domains are only defined
// for Version 2 UUIDs.
func (uuid UUID) Domain() Domain {
return Domain(uuid[9])
}
// ID returns the id for a Version 2 UUID. IDs are only defined for Version 2
// UUIDs.
func (uuid UUID) ID() uint32 {
return binary.BigEndian.Uint32(uuid[0:4])
}
func (d Domain) String() string {
switch d {
case Person:
return "Person"
case Group:
return "Group"
case Org:
return "Org"
}
return fmt.Sprintf("Domain%d", int(d))
}
// Copyright 2016 Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package uuid generates and inspects UUIDs.
//
// UUIDs are based on RFC 4122 and DCE 1.1: Authentication and Security
// Services.
//
// A UUID is a 16 byte (128 bit) array. UUIDs may be used as keys to
// maps or compared directly.
package uuid
module github.com/google/uuid
// Copyright 2016 Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package uuid
import (
"crypto/md5"
"crypto/sha1"
"hash"
)
// Well known namespace IDs and UUIDs
var (
NameSpaceDNS = Must(Parse("6ba7b810-9dad-11d1-80b4-00c04fd430c8"))
NameSpaceURL = Must(Parse("6ba7b811-9dad-11d1-80b4-00c04fd430c8"))
NameSpaceOID = Must(Parse("6ba7b812-9dad-11d1-80b4-00c04fd430c8"))
NameSpaceX500 = Must(Parse("6ba7b814-9dad-11d1-80b4-00c04fd430c8"))
Nil UUID // empty UUID, all zeros
)
// NewHash returns a new UUID derived from the hash of space concatenated with
// data generated by h. The hash should be at least 16 byte in length. The
// first 16 bytes of the hash are used to form the UUID. The version of the
// UUID will be the lower 4 bits of version. NewHash is used to implement
// NewMD5 and NewSHA1.
func NewHash(h hash.Hash, space UUID, data []byte, version int) UUID {
h.Reset()
h.Write(space[:])
h.Write(data)
s := h.Sum(nil)
var uuid UUID
copy(uuid[:], s)
uuid[6] = (uuid[6] & 0x0f) | uint8((version&0xf)<<4)
uuid[8] = (uuid[8] & 0x3f) | 0x80 // RFC 4122 variant
return uuid
}
// NewMD5 returns a new MD5 (Version 3) UUID based on the
// supplied name space and data. It is the same as calling:
//
// NewHash(md5.New(), space, data, 3)
func NewMD5(space UUID, data []byte) UUID {
return NewHash(md5.New(), space, data, 3)
}
// NewSHA1 returns a new SHA1 (Version 5) UUID based on the
// supplied name space and data. It is the same as calling:
//
// NewHash(sha1.New(), space, data, 5)
func NewSHA1(space UUID, data []byte) UUID {
return NewHash(sha1.New(), space, data, 5)
}
// Copyright 2016 Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package uuid
import "fmt"
// MarshalText implements encoding.TextMarshaler.
func (uuid UUID) MarshalText() ([]byte, error) {
var js [36]byte
encodeHex(js[:], uuid)
return js[:], nil
}
// UnmarshalText implements encoding.TextUnmarshaler.
func (uuid *UUID) UnmarshalText(data []byte) error {
id, err := ParseBytes(data)
if err == nil {
*uuid = id
}
return err
}
// MarshalBinary implements encoding.BinaryMarshaler.
func (uuid UUID) MarshalBinary() ([]byte, error) {
return uuid[:], nil
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler.
func (uuid *UUID) UnmarshalBinary(data []byte) error {
if len(data) != 16 {
return fmt.Errorf("invalid UUID (got %d bytes)", len(data))
}
copy(uuid[:], data)
return nil
}
// Copyright 2016 Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package uuid
import (
"sync"
)
var (
nodeMu sync.Mutex
ifname string // name of interface being used
nodeID [6]byte // hardware for version 1 UUIDs
zeroID [6]byte // nodeID with only 0's
)
// NodeInterface returns the name of the interface from which the NodeID was
// derived. The interface "user" is returned if the NodeID was set by
// SetNodeID.
func NodeInterface() string {
defer nodeMu.Unlock()
nodeMu.Lock()
return ifname
}
// SetNodeInterface selects the hardware address to be used for Version 1 UUIDs.
// If name is "" then the first usable interface found will be used or a random
// Node ID will be generated. If a named interface cannot be found then false
// is returned.
//
// SetNodeInterface never fails when name is "".
func SetNodeInterface(name string) bool {
defer nodeMu.Unlock()
nodeMu.Lock()
return setNodeInterface(name)
}
func setNodeInterface(name string) bool {
iname, addr := getHardwareInterface(name) // null implementation for js
if iname != "" && addr != nil {
ifname = iname
copy(nodeID[:], addr)
return true
}
// We found no interfaces with a valid hardware address. If name
// does not specify a specific interface generate a random Node ID
// (section 4.1.6)
if name == "" {
randomBits(nodeID[:])
return true
}
return false
}
// NodeID returns a slice of a copy of the current Node ID, setting the Node ID
// if not already set.
func NodeID() []byte {
defer nodeMu.Unlock()
nodeMu.Lock()
if nodeID == zeroID {
setNodeInterface("")
}
nid := nodeID
return nid[:]
}
// SetNodeID sets the Node ID to be used for Version 1 UUIDs. The first 6 bytes
// of id are used. If id is less than 6 bytes then false is returned and the
// Node ID is not set.
func SetNodeID(id []byte) bool {
if len(id) < 6 {
return false
}
defer nodeMu.Unlock()
nodeMu.Lock()
copy(nodeID[:], id)
ifname = "user"
return true
}
// NodeID returns the 6 byte node id encoded in uuid. It returns nil if uuid is
// not valid. The NodeID is only well defined for version 1 and 2 UUIDs.
func (uuid UUID) NodeID() []byte {
var node [6]byte
copy(node[:], uuid[10:])
return node[:]
}
......@@ -26,10 +26,7 @@ func getHardwareInterface(name string) (string, []byte) {
}
for _, ifs := range interfaces {
if len(ifs.HardwareAddr) >= 6 && (name == "" || name == ifs.Name) {
if setNodeID(ifs.HardwareAddr) {
ifname = ifs.Name
return ifname, nodeID
}
return ifs.Name, ifs.HardwareAddr
}
}
return "", nil
......
// Copyright 2016 Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package uuid
import (
"database/sql/driver"
"fmt"
)
// Scan implements sql.Scanner so UUIDs can be read from databases transparently
// Currently, database types that map to string and []byte are supported. Please
// consult database-specific driver documentation for matching types.
func (uuid *UUID) Scan(src interface{}) error {
switch src := src.(type) {
case nil:
return nil
case string:
// if an empty UUID comes from a table, we return a null UUID
if src == "" {
return nil
}
// see Parse for required string format
u, err := Parse(src)
if err != nil {
return fmt.Errorf("Scan: %v", err)
}
*uuid = u
case []byte:
// if an empty UUID comes from a table, we return a null UUID
if len(src) == 0 {
return nil
}
// assumes a simple slice of bytes if 16 bytes
// otherwise attempts to parse
if len(src) != 16 {
return uuid.Scan(string(src))
}
copy((*uuid)[:], src)
default:
return fmt.Errorf("Scan: unable to scan type %T into UUID", src)
}
return nil
}
// Value implements sql.Valuer so that UUIDs can be written to databases
// transparently. Currently, UUIDs map to strings. Please consult
// database-specific driver documentation for matching types.
func (uuid UUID) Value() (driver.Value, error) {
return uuid.String(), nil
}
// Copyright 2016 Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package uuid
import (
"encoding/binary"
"sync"
"time"
)
// A Time represents a time as the number of 100's of nanoseconds since 15 Oct
// 1582.
type Time int64
const (
lillian = 2299160 // Julian day of 15 Oct 1582
unix = 2440587 // Julian day of 1 Jan 1970
epoch = unix - lillian // Days between epochs
g1582 = epoch * 86400 // seconds between epochs
g1582ns100 = g1582 * 10000000 // 100s of a nanoseconds between epochs
)
var (
timeMu sync.Mutex
lasttime uint64 // last time we returned
clockSeq uint16 // clock sequence for this run
timeNow = time.Now // for testing
)
// UnixTime converts t the number of seconds and nanoseconds using the Unix
// epoch of 1 Jan 1970.
func (t Time) UnixTime() (sec, nsec int64) {
sec = int64(t - g1582ns100)
nsec = (sec % 10000000) * 100
sec /= 10000000
return sec, nsec
}
// GetTime returns the current Time (100s of nanoseconds since 15 Oct 1582) and
// clock sequence as well as adjusting the clock sequence as needed. An error
// is returned if the current time cannot be determined.
func GetTime() (Time, uint16, error) {
defer timeMu.Unlock()
timeMu.Lock()
return getTime()
}
func getTime() (Time, uint16, error) {
t := timeNow()
// If we don't have a clock sequence already, set one.
if clockSeq == 0 {
setClockSequence(-1)
}
now := uint64(t.UnixNano()/100) + g1582ns100
// If time has gone backwards with this clock sequence then we
// increment the clock sequence
if now <= lasttime {
clockSeq = ((clockSeq + 1) & 0x3fff) | 0x8000
}
lasttime = now
return Time(now), clockSeq, nil
}
// ClockSequence returns the current clock sequence, generating one if not
// already set. The clock sequence is only used for Version 1 UUIDs.
//
// The uuid package does not use global static storage for the clock sequence or
// the last time a UUID was generated. Unless SetClockSequence is used, a new
// random clock sequence is generated the first time a clock sequence is
// requested by ClockSequence, GetTime, or NewUUID. (section 4.2.1.1)
func ClockSequence() int {
defer timeMu.Unlock()
timeMu.Lock()
return clockSequence()
}
func clockSequence() int {
if clockSeq == 0 {
setClockSequence(-1)
}
return int(clockSeq & 0x3fff)
}
// SetClockSequence sets the clock sequence to the lower 14 bits of seq. Setting to
// -1 causes a new sequence to be generated.
func SetClockSequence(seq int) {
defer timeMu.Unlock()
timeMu.Lock()
setClockSequence(seq)
}
func setClockSequence(seq int) {
if seq == -1 {
var b [2]byte
randomBits(b[:]) // clock sequence
seq = int(b[0])<<8 | int(b[1])
}
oldSeq := clockSeq
clockSeq = uint16(seq&0x3fff) | 0x8000 // Set our variant
if oldSeq != clockSeq {
lasttime = 0
}
}
// Time returns the time in 100s of nanoseconds since 15 Oct 1582 encoded in
// uuid. The time is only defined for version 1 and 2 UUIDs.
func (uuid UUID) Time() Time {
time := int64(binary.BigEndian.Uint32(uuid[0:4]))
time |= int64(binary.BigEndian.Uint16(uuid[4:6])) << 32
time |= int64(binary.BigEndian.Uint16(uuid[6:8])&0xfff) << 48
return Time(time)
}
// ClockSequence returns the clock sequence encoded in uuid.
// The clock sequence is only well defined for version 1 and 2 UUIDs.
func (uuid UUID) ClockSequence