Commit 50460515 authored by Vojtech Vitek (V-Teq)'s avatar Vojtech Vitek (V-Teq)
Browse files

Implement Replicate, Delay, RetryAfter, TTL, MaxLen

parent 01872c6d
......@@ -17,10 +17,17 @@
```go
jobs, _ := disque.Connect("127.0.0.1:7711")
// Enqueue some jobs.
job1, _ := jobs.Add(data1, "low")
job2, _ := jobs.Add(data2, "urgent")
job3, _ := jobs.Add(data3, "high")
// Enqueue "high" priority job.
job1, _ := jobs.Add(data1, "high")
// Enqueue "low" priority jobs.
job2, _ := jobs.TTL(24 * time.Hour).Add(data2, "low")
// Enqueue "urgent" priority job. Re-queue if not ACKed within one minute.
job3, err := jobs.RetryAfter(time.Minute).Add(data3, "urgent")
if err != nil {
jobs.Wait(job3)
}
```
## Consumer (worker)
......@@ -29,13 +36,13 @@ job3, _ := jobs.Add(data3, "high")
jobs, _ := disque.Connect("127.0.0.1:7711")
for {
// Dequeue a job (from higher priority queues first).
// Dequeue a job from highest priority queue (priority left to right).
job, _ := jobs.Get("urgent", "high", "low")
// Do some hard work with the job data.
err := Process(job.Data)
if err != nil {
// Re-queue job.
// Re-queue the job. This may be triggered by Panic/Recover.
jobs.Nack(job)
}
......
package disque
import "time"
type Config struct {
Timeout time.Duration
Replicate int
Delay time.Duration
RetryAfter time.Duration
TTL time.Duration
MaxLen int
}
var defaultConfig = Config{
Timeout: 500 * time.Millisecond,
}
func (conn *Conn) Use(conf Config) *Conn {
conn.conf = conf
return conn
}
func (conn *Conn) With(conf Config) *Conn {
return &Conn{pool: conn.pool, conf: conf}
}
func (conn *Conn) Timeout(timeout time.Duration) *Conn {
conn.conf.Timeout = timeout
return &Conn{pool: conn.pool, conf: conn.conf}
}
func (conn *Conn) Replicate(replicate int) *Conn {
conn.conf.Replicate = replicate
return &Conn{pool: conn.pool, conf: conn.conf}
}
func (conn *Conn) Delay(delay time.Duration) *Conn {
conn.conf.Delay = delay
return &Conn{pool: conn.pool, conf: conn.conf}
}
func (conn *Conn) RetryAfter(after time.Duration) *Conn {
conn.conf.RetryAfter = after
return &Conn{pool: conn.pool, conf: conn.conf}
}
func (conn *Conn) TTL(ttl time.Duration) *Conn {
conn.conf.TTL = ttl
return &Conn{pool: conn.pool, conf: conn.conf}
}
func (conn *Conn) MaxLen(maxlen int) *Conn {
conn.conf.MaxLen = maxlen
return &Conn{pool: conn.pool, conf: conn.conf}
}
......@@ -11,7 +11,7 @@ import (
type Conn struct {
pool *redis.Pool
conf JobConfig
conf Config
}
func Connect(address string, extra ...string) (*Conn, error) {
......@@ -33,32 +33,13 @@ func Connect(address string, extra ...string) (*Conn, error) {
},
}
return &Conn{pool: pool, conf: defaultJobConfig}, nil
return &Conn{pool: pool, conf: defaultConfig}, nil
}
func (conn *Conn) Close() {
conn.pool.Close()
}
func (conn *Conn) Use(conf JobConfig) *Conn {
conn.conf = conf
return conn
}
func (conn *Conn) With(conf JobConfig) *Conn {
return &Conn{pool: conn.pool, conf: conf}
}
func (conn *Conn) RetryAfter(after time.Duration) *Conn {
conn.conf.RetryAfter = after
return &Conn{pool: conn.pool, conf: conn.conf}
}
func (conn *Conn) Timeout(timeout time.Duration) *Conn {
conn.conf.Timeout = timeout
return &Conn{pool: conn.pool, conf: conn.conf}
}
func (conn *Conn) Ping() error {
sess := conn.pool.Get()
defer sess.Close()
......@@ -111,13 +92,39 @@ func (conn *Conn) do(args []interface{}) (interface{}, error) {
}
func (conn *Conn) Add(data string, queue string) (*Job, error) {
args := []interface{}{
"ADDJOB", queue, data, conn.conf.Timeout.Nanoseconds() / 1000000,
"ADDJOB",
queue,
data,
int(conn.conf.Timeout.Nanoseconds() / 1000000),
}
if conn.conf.Replicate > 0 {
args = append(args, "REPLICATE", conn.conf.Replicate)
}
if conn.conf.Delay > 0 {
delay := int(conn.conf.Delay.Seconds())
if delay == 0 {
delay = 1
}
args = append(args, "DELAY", delay)
}
if conn.conf.RetryAfter > 0 {
args = append(args, "RETRY", conn.conf.RetryAfter.Seconds())
retry := int(conn.conf.RetryAfter.Seconds())
if retry == 0 {
retry = 1
}
args = append(args, "RETRY", retry)
}
if conn.conf.TTL > 0 {
ttl := int(conn.conf.TTL.Seconds())
if ttl == 0 {
ttl = 1
}
args = append(args, "TTL", ttl)
}
if conn.conf.MaxLen > 0 {
args = append(args, "MAXLEN", conn.conf.MaxLen)
}
reply, err := conn.do(args)
......@@ -137,7 +144,7 @@ func (conn *Conn) Add(data string, queue string) (*Job, error) {
}, nil
}
func (conn *Conn) Get(queue string, extra ...string) (*Job, error) {
func (conn *Conn) Get(queue string, extraQueue ...string) (*Job, error) {
args := []interface{}{
"GETJOB",
"TIMEOUT",
......@@ -145,9 +152,10 @@ func (conn *Conn) Get(queue string, extra ...string) (*Job, error) {
"FROM",
queue,
}
for _, arg := range extra {
args = append(args, reflect.ValueOf(arg))
for _, arg := range extraQueue {
args = append(args, arg)
}
reply, err := conn.do(args)
if err != nil {
return nil, err
......
......@@ -21,6 +21,60 @@ func TestPing(t *testing.T) {
}
}
func TestDelay(t *testing.T) {
// Connect to Disque.
jobs, err := disque.Connect("127.0.0.1:7711")
if err != nil {
t.Fatal(err)
}
defer jobs.Close()
// Enqueue job after one second.
_, err = jobs.Delay(time.Second).Add("data1", "test:delay")
if err != nil {
t.Error(err)
}
// The job should not exist yet.
_, err = jobs.Get("test:delay")
if err == nil {
t.Fatal("expected error")
}
time.Sleep(time.Second)
// The job should exist now.
job, err := jobs.Timeout(500 * time.Millisecond).Get("test:delay")
if err != nil {
t.Fatal(err)
}
jobs.Ack(job)
}
func TestTTL(t *testing.T) {
// Connect to Disque.
jobs, err := disque.Connect("127.0.0.1:7711")
if err != nil {
t.Fatal(err)
}
defer jobs.Close()
// Enqueue job with TTL one second.
_, err = jobs.TTL(time.Second).Add("data1", "test:ttl")
if err != nil {
t.Error(err)
}
time.Sleep(1500 * time.Millisecond)
// The job should no longer exist.
_, err = jobs.Get("test:ttl")
if err == nil {
t.Fatal("expected error")
}
}
func TestTimeoutRetryAfter(t *testing.T) {
// Connect to Disque.
jobs, err := disque.Connect("127.0.0.1:7711")
......@@ -47,7 +101,7 @@ func TestTimeoutRetryAfter(t *testing.T) {
// We should hit time-out fot the first time..
_, err = jobs.Timeout(750 * time.Millisecond).Get("test:retry")
if err == nil {
t.Fatal("expected timeout error")
t.Fatal("expected error")
}
// and we should be successful for the second time..
job, err := jobs.Timeout(750 * time.Millisecond).Get("test:retry")
......
package disque
import "time"
type Job struct {
ID string
Data string
Queue string
}
type JobConfig struct {
Timeout time.Duration `redis:"TIMEOUT"`
Replicate int `redis:"REPLICATE"`
Delay time.Duration `redis:"DELAY"`
RetryAfter time.Duration `redis:"RETRY"`
TTL time.Duration `redis:"TTL"`
MaxLen int `redis:"MAXLEN"`
}
var defaultJobConfig = JobConfig{
Timeout: 500 * time.Millisecond,
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment