Commit e1745701 authored by Vojtech Vitek's avatar Vojtech Vitek

Merge pull request #3 from goware/waitjob

Implement Wait(job) and the remaining Add() Config options
parents ec6c2aa4 78b615f2
......@@ -15,33 +15,75 @@
## Producer
```go
jobs, _ := disque.Connect("127.0.0.1:7711")
import (
"github.com/goware/disque"
)
func main() {
// Connect to Disque pool.
jobs, _ := disque.Connect("127.0.0.1:7711") // Accepts more arguments.
// Enqueue some jobs.
job1, _ := jobs.Add(data1, "low")
job2, _ := jobs.Add(data2, "urgent")
job3, _ := jobs.Add(data3, "high")
// Enqueue three jobs with different priorities.
job1, _ := jobs.Add(data1, "high")
job2, _ := jobs.Add(data2, "low")
job3, _ := jobs.Add(data3, "urgent")
// Block until job3 is done.
jobs.Wait(job3)
}
```
## Consumer (worker)
```go
jobs, _ := disque.Connect("127.0.0.1:7711")
import (
"github.com/goware/disque"
)
func main() {
// Connect to Disque pool.
jobs, _ := disque.Connect("127.0.0.1:7711") // Accepts more arguments.
for {
// Dequeue a job (from higher priority queues first).
job, _ := jobs.Get("urgent", "high", "low")
for {
// Get job from highest priority queue possible. Blocks by default.
job, _ := jobs.Get("urgent", "high", "low") // Left-right priority.
// Do some hard work with the job data.
err := Process(job.Data)
if err != nil {
// Re-queue job.
jobs.Nack(job)
// Do some hard work with the job data.
if err := Process(job.Data); err != nil {
// Failed. Re-queue the job.
jobs.Nack(job)
}
// Acknowledge (dequeue) the job.
jobs.Ack(job)
}
}
```
## Custom config (Timeout, Replicate, Delay, Retry, TTL, MaxLen)
```go
jobs, _ := disque.Connect("127.0.0.1:7711")
// Acknowledge that we processed the job successfully.
jobs.Ack(job)
config := disque.Config{
Timeout: 500 * time.Second, // Each operation will fail after 1s. It blocks by default.
Replicate: 2, // Add(): Replicate job to at least two nodes before return.
Delay: time.Hour, // Add(): Schedule the job - enqueue after one hour.
RetryAfter: time.Minute, // Add(): Re-queue job after 1min (time between Get() and Ack()).
TTL: 24 * time.Hour, // Add(): Remove the job from queue after one day.
MaxLen: 1000, // Add(): Fail if there are more than 1000 jobs in the queue.
}
// Apply globally.
jobs.Use(config)
// Apply to a single operation.
jobs.With(config).Add(data, "queue")
// Apply single option to a single operation.
jobs.Timeout(time.Second).Get("queue", "queue2")
jobs.MaxLen(1000).RetryAfter(time.Minute).Add(data, "queue")
jobs.Timeout(time.Second).Add(data, "queue")
```
## License
......
package disque
import "time"
type Config struct {
Timeout time.Duration
Replicate int
Delay time.Duration
RetryAfter time.Duration
TTL time.Duration
MaxLen int
}
func (conn *Conn) Use(conf Config) *Conn {
conn.conf = conf
return conn
}
func (conn *Conn) With(conf Config) *Conn {
return &Conn{pool: conn.pool, conf: conf}
}
func (conn *Conn) Timeout(timeout time.Duration) *Conn {
conn.conf.Timeout = timeout
return &Conn{pool: conn.pool, conf: conn.conf}
}
func (conn *Conn) Replicate(replicate int) *Conn {
conn.conf.Replicate = replicate
return &Conn{pool: conn.pool, conf: conn.conf}
}
func (conn *Conn) Delay(delay time.Duration) *Conn {
conn.conf.Delay = delay
return &Conn{pool: conn.pool, conf: conn.conf}
}
func (conn *Conn) RetryAfter(after time.Duration) *Conn {
conn.conf.RetryAfter = after
return &Conn{pool: conn.pool, conf: conn.conf}
}
func (conn *Conn) TTL(ttl time.Duration) *Conn {
conn.conf.TTL = ttl
return &Conn{pool: conn.pool, conf: conn.conf}
}
func (conn *Conn) MaxLen(maxlen int) *Conn {
conn.conf.MaxLen = maxlen
return &Conn{pool: conn.pool, conf: conn.conf}
}
......@@ -11,7 +11,7 @@ import (
type Conn struct {
pool *redis.Pool
conf JobConfig
conf Config
}
func Connect(address string, extra ...string) (*Conn, error) {
......@@ -33,32 +33,13 @@ func Connect(address string, extra ...string) (*Conn, error) {
},
}
return &Conn{pool: pool, conf: defaultJobConfig}, nil
return &Conn{pool: pool}, nil
}
func (conn *Conn) Close() {
conn.pool.Close()
}
func (conn *Conn) Use(conf JobConfig) *Conn {
conn.conf = conf
return conn
}
func (conn *Conn) With(conf JobConfig) *Conn {
return &Conn{pool: conn.pool, conf: conf}
}
func (conn *Conn) RetryAfter(after time.Duration) *Conn {
conn.conf.RetryAfter = after
return &Conn{pool: conn.pool, conf: conn.conf}
}
func (conn *Conn) Timeout(timeout time.Duration) *Conn {
conn.conf.Timeout = timeout
return &Conn{pool: conn.pool, conf: conn.conf}
}
func (conn *Conn) Ping() error {
sess := conn.pool.Get()
defer sess.Close()
......@@ -111,13 +92,39 @@ func (conn *Conn) do(args []interface{}) (interface{}, error) {
}
func (conn *Conn) Add(data string, queue string) (*Job, error) {
args := []interface{}{
"ADDJOB", queue, data, conn.conf.Timeout.Nanoseconds() / 1000000,
"ADDJOB",
queue,
data,
int(conn.conf.Timeout.Nanoseconds() / 1000000),
}
if conn.conf.Replicate > 0 {
args = append(args, "REPLICATE", conn.conf.Replicate)
}
if conn.conf.Delay > 0 {
delay := int(conn.conf.Delay.Seconds())
if delay == 0 {
delay = 1
}
args = append(args, "DELAY", delay)
}
if conn.conf.RetryAfter > 0 {
args = append(args, "RETRY", conn.conf.RetryAfter.Seconds())
retry := int(conn.conf.RetryAfter.Seconds())
if retry == 0 {
retry = 1
}
args = append(args, "RETRY", retry)
}
if conn.conf.TTL > 0 {
ttl := int(conn.conf.TTL.Seconds())
if ttl == 0 {
ttl = 1
}
args = append(args, "TTL", ttl)
}
if conn.conf.MaxLen > 0 {
args = append(args, "MAXLEN", conn.conf.MaxLen)
}
reply, err := conn.do(args)
......@@ -137,17 +144,21 @@ func (conn *Conn) Add(data string, queue string) (*Job, error) {
}, nil
}
func (conn *Conn) Get(queue string, extra ...string) (*Job, error) {
func (conn *Conn) Get(queues ...string) (*Job, error) {
if len(queues) == 0 {
return nil, errors.New("expected at least one queue")
}
args := []interface{}{
"GETJOB",
"TIMEOUT",
int(conn.conf.Timeout.Nanoseconds() / 1000000),
"FROM",
queue,
}
for _, arg := range extra {
args = append(args, reflect.ValueOf(arg))
for _, arg := range queues {
args = append(args, arg)
}
reply, err := conn.do(args)
if err != nil {
return nil, err
......@@ -194,6 +205,7 @@ func (conn *Conn) Ack(job *Job) error {
return nil
}
// Native NACKJOB discussed upstream at https://github.com/antirez/disque/issues/43.
func (conn *Conn) Nack(job *Job) error {
sess := conn.pool.Get()
defer sess.Close()
......@@ -203,3 +215,23 @@ func (conn *Conn) Nack(job *Job) error {
}
return nil
}
// Native WAITJOB discussed upstream at https://github.com/antirez/disque/issues/43.
func (conn *Conn) Wait(job *Job) error {
sess := conn.pool.Get()
defer sess.Close()
for {
reply, err := sess.Do("SHOW", job.ID)
if err != nil {
return err
}
if reply == nil {
break
}
time.Sleep(10 * time.Millisecond)
}
return nil
}
......@@ -21,6 +21,60 @@ func TestPing(t *testing.T) {
}
}
func TestDelay(t *testing.T) {
// Connect to Disque.
jobs, err := disque.Connect("127.0.0.1:7711")
if err != nil {
t.Fatal(err)
}
defer jobs.Close()
// Enqueue job after one second.
_, err = jobs.Delay(time.Second).Add("data1", "test:delay")
if err != nil {
t.Error(err)
}
// The job should not exist yet.
_, err = jobs.Timeout(time.Millisecond).Get("test:delay")
if err == nil {
t.Fatal("expected error")
}
time.Sleep(time.Second)
// The job should exist now.
job, err := jobs.Timeout(500 * time.Millisecond).Get("test:delay")
if err != nil {
t.Fatal(err)
}
jobs.Ack(job)
}
func TestTTL(t *testing.T) {
// Connect to Disque.
jobs, err := disque.Connect("127.0.0.1:7711")
if err != nil {
t.Fatal(err)
}
defer jobs.Close()
// Enqueue job with TTL one second.
_, err = jobs.Timeout(time.Millisecond).TTL(time.Second).Add("data1", "test:ttl")
if err != nil {
t.Error(err)
}
time.Sleep(1500 * time.Millisecond)
// The job should no longer exist.
_, err = jobs.Get("test:ttl")
if err == nil {
t.Fatal("expected error")
}
}
func TestTimeoutRetryAfter(t *testing.T) {
// Connect to Disque.
jobs, err := disque.Connect("127.0.0.1:7711")
......@@ -35,7 +89,7 @@ func TestTimeoutRetryAfter(t *testing.T) {
t.Error(err)
}
// Dequeue job.
// Get the job.
_, err = jobs.Get("test:retry")
if err != nil {
t.Fatal(err)
......@@ -43,14 +97,14 @@ func TestTimeoutRetryAfter(t *testing.T) {
// Don't Ack() to pretend consumer failure.
// Try to dequeue job again..
// We should hit time-out fot the first time..
_, err = jobs.Timeout(750 * time.Millisecond).Get("test:retry")
// Try to get the job again..
// We should hit time-out for the first time..
_, err = jobs.Timeout(250 * time.Millisecond).Get("test:retry")
if err == nil {
t.Fatal("expected timeout error")
t.Fatal("expected error")
}
// and we should be successful for the second time..
job, err := jobs.Timeout(750 * time.Millisecond).Get("test:retry")
job, err := jobs.Timeout(time.Second).Get("test:retry")
if err != nil {
t.Fatal(err)
}
......@@ -84,7 +138,7 @@ func TestPriorityQueue(t *testing.T) {
t.Error(err)
}
// Dequeue first job.
// Get first job.
job, err := jobs.Get("test:urgent", "test:high", "test:low")
if err != nil {
t.Fatal(err)
......@@ -100,7 +154,7 @@ func TestPriorityQueue(t *testing.T) {
t.Fatalf("expected %s, got %s", e, job.Data)
}
// Dequeue second job.
// Get second job.
job, err = jobs.Get("test:urgent", "test:high", "test:low")
if err != nil {
t.Fatal(err)
......@@ -116,7 +170,7 @@ func TestPriorityQueue(t *testing.T) {
t.Fatalf("expected %s, got %s", e, job.Data)
}
// Dequeue third job and re-queue it again.
// Get third job and re-queue it again.
job, err = jobs.Get("test:urgent", "test:high", "test:low")
if err != nil {
t.Fatal(err)
......@@ -132,7 +186,7 @@ func TestPriorityQueue(t *testing.T) {
t.Fatalf("expected %s, got %s", e, job.Data)
}
// Dequeue third job again.
// Get third job again.
job, err = jobs.Get("test:urgent", "test:high", "test:low")
if err != nil {
t.Fatal(err)
......@@ -148,3 +202,65 @@ func TestPriorityQueue(t *testing.T) {
t.Fatalf("expected %s, got %s", e, job.Data)
}
}
func TestWait(t *testing.T) {
// Connect to Disque.
jobs, err := disque.Connect("127.0.0.1:7711")
if err != nil {
t.Fatal(err)
}
defer jobs.Close()
// Enqueue job.
start := time.Now()
job, err := jobs.Add("data1", "test:wait")
if err != nil {
t.Error(err)
}
go func() {
// Get the job.
job, err := jobs.Get("test:wait")
if err != nil {
t.Fatal(err)
}
// Sleep for 1 second before ACK.
time.Sleep(time.Second)
jobs.Ack(job)
}()
// Wait for the job to finish. Should take more than 1 second.
jobs.Wait(job)
duration := time.Since(start)
if duration < time.Second || duration > 1500*time.Millisecond {
t.Fatalf("expected 1.0s - 1.5s, got %v", time.Since(start))
}
}
func TestConfig(t *testing.T) {
// Connect to Disque.
jobs, err := disque.Connect("127.0.0.1:7711")
if err != nil {
t.Fatal(err)
}
defer jobs.Close()
config := disque.Config{
Timeout: time.Millisecond,
}
// Should fail on timeout.
_, err = jobs.With(config).Get("test:non-existant-queue")
if err == nil {
t.Fatal("expected error")
}
// Should fail on timeout.
jobs.Use(config)
_, err = jobs.Get("test:non-existant-queue")
if err == nil {
t.Fatal("expected error")
}
}
package disque
import "time"
type Job struct {
ID string
Data string
Queue string
}
type JobConfig struct {
Timeout time.Duration `redis:"TIMEOUT"`
Replicate int `redis:"REPLICATE"`
Delay time.Duration `redis:"DELAY"`
RetryAfter time.Duration `redis:"RETRY"`
TTL time.Duration `redis:"TTL"`
MaxLen int `redis:"MAXLEN"`
}
var defaultJobConfig = JobConfig{
Timeout: 500 * time.Millisecond,
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment