Commit 7ea5c85c authored by Vojtech Vitek (V-Teq)'s avatar Vojtech Vitek (V-Teq)
Browse files

Add Godoc

parent e1745701
...@@ -3,15 +3,13 @@ ...@@ -3,15 +3,13 @@
[Golang](http://golang.org/) client for [Disque](https://github.com/antirez/disque), the Persistent Distributed Job Priority Queue. [Golang](http://golang.org/) client for [Disque](https://github.com/antirez/disque), the Persistent Distributed Job Priority Queue.
- **Persistent** - Jobs can be either in-memory or persisted on disk<sup>[[1]](https://github.com/antirez/disque#disque-and-disk-persistence)</sup>. - **Persistent** - Jobs can be either in-memory or persisted on disk<sup>[[1]](https://github.com/antirez/disque#disque-and-disk-persistence)</sup>.
- **Distributed** - Multiple producers, multiple consumers. - **Distributed** - Disque pool. Multiple producers, multiple consumers.
- **Job Priority Queue** - Multiple queues. Consumers Dequeue() from higher priority queues first. - **Job Priority Queue** - Multiple queues. Consumers `Get()` from higher priority queues first.
- **Fault tolerant** - Jobs must be replicated to N nodes before Enqueue() returns. Jobs must be ACKed or they'll be re-queued automatically within a specified Retry Timeout. - **Fault tolerant** - Jobs must be replicated to N nodes before `Add()` returns. Jobs must be `ACK()`ed after `Get()` or they'll be re-queued automatically within a specified `RetryAfter` timeout.
[![GoDoc](https://godoc.org/github.com/goware/disque?status.png)](https://godoc.org/github.com/goware/disque) [![GoDoc](https://godoc.org/github.com/goware/disque?status.png)](https://godoc.org/github.com/goware/disque)
[![Travis](https://travis-ci.org/goware/disque.svg?branch=master)](https://travis-ci.org/goware/disque) [![Travis](https://travis-ci.org/goware/disque.svg?branch=master)](https://travis-ci.org/goware/disque)
**This project is in early development stage. You can expect changes to both functionality and the API. Feedback welcome!**
## Producer ## Producer
```go ```go
...@@ -60,18 +58,18 @@ func main() { ...@@ -60,18 +58,18 @@ func main() {
} }
``` ```
## Custom config (Timeout, Replicate, Delay, Retry, TTL, MaxLen) ## Config (Timeout, Replicate, Delay, Retry, TTL, MaxLen)
```go ```go
jobs, _ := disque.Connect("127.0.0.1:7711") jobs, _ := disque.Connect("127.0.0.1:7711")
config := disque.Config{ config := disque.Config{
Timeout: 500 * time.Second, // Each operation will fail after 1s. It blocks by default. Timeout: time.Second, // Each operation will fail after 1s. It blocks by default.
Replicate: 2, // Add(): Replicate job to at least two nodes before return. Replicate: 2, // Add(): Replicate job to at least two nodes before return.
Delay: time.Hour, // Add(): Schedule the job - enqueue after one hour. Delay: time.Hour, // Add(): Schedule the job - enqueue after one hour.
RetryAfter: time.Minute, // Add(): Re-queue job after 1min (time between Get() and Ack()). RetryAfter: time.Minute, // Add(): Re-queue job after 1min (time between Get() and Ack()).
TTL: 24 * time.Hour, // Add(): Remove the job from queue after one day. TTL: 24 * time.Hour, // Add(): Remove the job from queue after one day.
MaxLen: 1000, // Add(): Fail if there are more than 1000 jobs in the queue. MaxLen: 1000, // Add(): Fail if there are more than 1000 jobs in the queue.
} }
// Apply globally. // Apply globally.
......
...@@ -2,49 +2,58 @@ package disque ...@@ -2,49 +2,58 @@ package disque
import "time" import "time"
// Config represents Disque configuration for certain operations.
type Config struct { type Config struct {
Timeout time.Duration Timeout time.Duration // Each operation will fail after a specified timeout. Blocks by default.
Replicate int Replicate int // Add(): Replicate job to at least N nodes before return.
Delay time.Duration Delay time.Duration // Add(): Schedule the job - enqueue after a specified time.
RetryAfter time.Duration RetryAfter time.Duration // Add(): Re-queue job after a specified time (between Get() and Ack()).
TTL time.Duration TTL time.Duration // Add(): Remove the job from queue after a specified time.
MaxLen int MaxLen int // Add(): Fail if there are more than N jobs in the queue.
} }
// Use applies given config to every subsequent operation of this connection.
func (conn *Conn) Use(conf Config) *Conn { func (conn *Conn) Use(conf Config) *Conn {
conn.conf = conf conn.conf = conf
return conn return conn
} }
// With applies given config to a single operation.
func (conn *Conn) With(conf Config) *Conn { func (conn *Conn) With(conf Config) *Conn {
return &Conn{pool: conn.pool, conf: conf} return &Conn{pool: conn.pool, conf: conf}
} }
// With applies Timeout to a single operation.
func (conn *Conn) Timeout(timeout time.Duration) *Conn { func (conn *Conn) Timeout(timeout time.Duration) *Conn {
conn.conf.Timeout = timeout conn.conf.Timeout = timeout
return &Conn{pool: conn.pool, conf: conn.conf} return &Conn{pool: conn.pool, conf: conn.conf}
} }
// With applies Replicate to a single operation.
func (conn *Conn) Replicate(replicate int) *Conn { func (conn *Conn) Replicate(replicate int) *Conn {
conn.conf.Replicate = replicate conn.conf.Replicate = replicate
return &Conn{pool: conn.pool, conf: conn.conf} return &Conn{pool: conn.pool, conf: conn.conf}
} }
// With applies Delay to a single operation.
func (conn *Conn) Delay(delay time.Duration) *Conn { func (conn *Conn) Delay(delay time.Duration) *Conn {
conn.conf.Delay = delay conn.conf.Delay = delay
return &Conn{pool: conn.pool, conf: conn.conf} return &Conn{pool: conn.pool, conf: conn.conf}
} }
// With applies RetryAfter to a single operation.
func (conn *Conn) RetryAfter(after time.Duration) *Conn { func (conn *Conn) RetryAfter(after time.Duration) *Conn {
conn.conf.RetryAfter = after conn.conf.RetryAfter = after
return &Conn{pool: conn.pool, conf: conn.conf} return &Conn{pool: conn.pool, conf: conn.conf}
} }
// With applies TTL to a single operation.
func (conn *Conn) TTL(ttl time.Duration) *Conn { func (conn *Conn) TTL(ttl time.Duration) *Conn {
conn.conf.TTL = ttl conn.conf.TTL = ttl
return &Conn{pool: conn.pool, conf: conn.conf} return &Conn{pool: conn.pool, conf: conn.conf}
} }
// With applies MaxLen to a single operation.
func (conn *Conn) MaxLen(maxlen int) *Conn { func (conn *Conn) MaxLen(maxlen int) *Conn {
conn.conf.MaxLen = maxlen conn.conf.MaxLen = maxlen
return &Conn{pool: conn.pool, conf: conn.conf} return &Conn{pool: conn.pool, conf: conn.conf}
......
...@@ -9,11 +9,13 @@ import ( ...@@ -9,11 +9,13 @@ import (
"github.com/garyburd/redigo/redis" "github.com/garyburd/redigo/redis"
) )
// Conn represent a connection to a Disque Pool.
type Conn struct { type Conn struct {
pool *redis.Pool pool *redis.Pool
conf Config conf Config
} }
// Connect creates a connection to a given Disque Pool.
func Connect(address string, extra ...string) (*Conn, error) { func Connect(address string, extra ...string) (*Conn, error) {
pool := &redis.Pool{ pool := &redis.Pool{
MaxIdle: 64, MaxIdle: 64,
...@@ -36,10 +38,12 @@ func Connect(address string, extra ...string) (*Conn, error) { ...@@ -36,10 +38,12 @@ func Connect(address string, extra ...string) (*Conn, error) {
return &Conn{pool: pool}, nil return &Conn{pool: pool}, nil
} }
// Close closes connection to the Disque Pool.
func (conn *Conn) Close() { func (conn *Conn) Close() {
conn.pool.Close() conn.pool.Close()
} }
// Ping returns nil if Disque
func (conn *Conn) Ping() error { func (conn *Conn) Ping() error {
sess := conn.pool.Get() sess := conn.pool.Get()
defer sess.Close() defer sess.Close()
...@@ -50,17 +54,18 @@ func (conn *Conn) Ping() error { ...@@ -50,17 +54,18 @@ func (conn *Conn) Ping() error {
return nil return nil
} }
// Eh, none of the following builds successfully: // do is a helper function that workarounds redigo/redis API flaws
// with reflect pkg.
// //
// reply, err := sess.Do("GETJOB", "FROM", queue, redis.Args{}) // None of the following builds successfully:
// reply, err := sess.Do("GETJOB", "FROM", queue, redis.Args{}...)
// reply, err := sess.Do("GETJOB", "FROM", queue, extraQueues)
// reply, err := sess.Do("GETJOB", "FROM", queue, extraQueues...)
// //
// > build error: // reply, err := sess.Do("GETJOB", "FROM", queue, redis.Args{})
// > too many arguments in call to sess.Do // reply, err := sess.Do("GETJOB", "FROM", queue, redis.Args{}...)
// reply, err := sess.Do("GETJOB", "FROM", queue, extraQueues)
// reply, err := sess.Do("GETJOB", "FROM", queue, extraQueues...)
//
// > Error: "too many arguments in call to sess.Do"
// //
// So.. let's work around this with reflect pkg.
func (conn *Conn) do(args []interface{}) (interface{}, error) { func (conn *Conn) do(args []interface{}) (interface{}, error) {
sess := conn.pool.Get() sess := conn.pool.Get()
defer sess.Close() defer sess.Close()
...@@ -91,6 +96,7 @@ func (conn *Conn) do(args []interface{}) (interface{}, error) { ...@@ -91,6 +96,7 @@ func (conn *Conn) do(args []interface{}) (interface{}, error) {
return reply, nil return reply, nil
} }
// Add enqueues new Job with specified data to a given queue.
func (conn *Conn) Add(data string, queue string) (*Job, error) { func (conn *Conn) Add(data string, queue string) (*Job, error) {
args := []interface{}{ args := []interface{}{
"ADDJOB", "ADDJOB",
...@@ -144,6 +150,8 @@ func (conn *Conn) Add(data string, queue string) (*Job, error) { ...@@ -144,6 +150,8 @@ func (conn *Conn) Add(data string, queue string) (*Job, error) {
}, nil }, nil
} }
// Get returns the first available job from the highest priority
// queue (left-to-right).
func (conn *Conn) Get(queues ...string) (*Job, error) { func (conn *Conn) Get(queues ...string) (*Job, error) {
if len(queues) == 0 { if len(queues) == 0 {
return nil, errors.New("expected at least one queue") return nil, errors.New("expected at least one queue")
...@@ -195,6 +203,7 @@ func (conn *Conn) Get(queues ...string) (*Job, error) { ...@@ -195,6 +203,7 @@ func (conn *Conn) Get(queues ...string) (*Job, error) {
}, nil }, nil
} }
// Ack acknowledges (dequeues) the job from its job queue.
func (conn *Conn) Ack(job *Job) error { func (conn *Conn) Ack(job *Job) error {
sess := conn.pool.Get() sess := conn.pool.Get()
defer sess.Close() defer sess.Close()
...@@ -205,6 +214,7 @@ func (conn *Conn) Ack(job *Job) error { ...@@ -205,6 +214,7 @@ func (conn *Conn) Ack(job *Job) error {
return nil return nil
} }
// Nack re-queues job into its job queue.
// Native NACKJOB discussed upstream at https://github.com/antirez/disque/issues/43. // Native NACKJOB discussed upstream at https://github.com/antirez/disque/issues/43.
func (conn *Conn) Nack(job *Job) error { func (conn *Conn) Nack(job *Job) error {
sess := conn.pool.Get() sess := conn.pool.Get()
...@@ -216,6 +226,7 @@ func (conn *Conn) Nack(job *Job) error { ...@@ -216,6 +226,7 @@ func (conn *Conn) Nack(job *Job) error {
return nil return nil
} }
// Wait waits for the job to finish (blocks until it's ACKed).
// Native WAITJOB discussed upstream at https://github.com/antirez/disque/issues/43. // Native WAITJOB discussed upstream at https://github.com/antirez/disque/issues/43.
func (conn *Conn) Wait(job *Job) error { func (conn *Conn) Wait(job *Job) error {
sess := conn.pool.Get() sess := conn.pool.Get()
......
package disque package disque
// Job represents job (and its data) belonging to a queue.
type Job struct { type Job struct {
ID string ID string
Data string Data string
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment