Commit ec6c2aa4 authored by Vojtech Vitek's avatar Vojtech Vitek
Browse files

Merge pull request #2 from goware/api_retry

Change API, add JobConfig, implement RETRY and TIMEOUT
parents b9dab4e1 01872c6d
......@@ -16,4 +16,4 @@ install:
- glock sync -n github.com/goware/disque < Glockfile
script:
- go test
- go test -timeout=10s
......@@ -18,9 +18,9 @@
jobs, _ := disque.Connect("127.0.0.1:7711")
// Enqueue some jobs.
job1, _ := jobs.Enqueue(data1, "low")
job2, _ := jobs.Enqueue(data2, "urgent")
job3, _ := jobs.Enqueue(data3, "high")
job1, _ := jobs.Add(data1, "low")
job2, _ := jobs.Add(data2, "urgent")
job3, _ := jobs.Add(data3, "high")
```
## Consumer (worker)
......@@ -30,10 +30,14 @@ jobs, _ := disque.Connect("127.0.0.1:7711")
for {
// Dequeue a job (from higher priority queues first).
job, _ := jobs.Dequeue("urgent", "high", "low")
job, _ := jobs.Get("urgent", "high", "low")
// Do some hard work with the job data.
Process(job.Data)
err := Process(job.Data)
if err != nil {
// Re-queue job.
jobs.Nack(job)
}
// Acknowledge that we processed the job successfully.
jobs.Ack(job)
......
......@@ -2,6 +2,7 @@ package disque
import (
"errors"
"fmt"
"reflect"
"time"
......@@ -10,9 +11,10 @@ import (
type Conn struct {
pool *redis.Pool
conf JobConfig
}
func Connect(address string, extra ...string) (Conn, error) {
func Connect(address string, extra ...string) (*Conn, error) {
pool := &redis.Pool{
MaxIdle: 64,
MaxActive: 64,
......@@ -30,14 +32,34 @@ func Connect(address string, extra ...string) (Conn, error) {
return err
},
}
return Conn{pool: pool}, nil
return &Conn{pool: pool, conf: defaultJobConfig}, nil
}
func (conn Conn) Close() {
func (conn *Conn) Close() {
conn.pool.Close()
}
func (conn Conn) Ping() error {
func (conn *Conn) Use(conf JobConfig) *Conn {
conn.conf = conf
return conn
}
func (conn *Conn) With(conf JobConfig) *Conn {
return &Conn{pool: conn.pool, conf: conf}
}
func (conn *Conn) RetryAfter(after time.Duration) *Conn {
conn.conf.RetryAfter = after
return &Conn{pool: conn.pool, conf: conn.conf}
}
func (conn *Conn) Timeout(timeout time.Duration) *Conn {
conn.conf.Timeout = timeout
return &Conn{pool: conn.pool, conf: conn.conf}
}
func (conn *Conn) Ping() error {
sess := conn.pool.Get()
defer sess.Close()
......@@ -47,12 +69,58 @@ func (conn Conn) Ping() error {
return nil
}
func (conn Conn) Enqueue(data string, queue string) (*Job, error) {
// Eh, none of the following builds successfully:
//
// reply, err := sess.Do("GETJOB", "FROM", queue, redis.Args{})
// reply, err := sess.Do("GETJOB", "FROM", queue, redis.Args{}...)
// reply, err := sess.Do("GETJOB", "FROM", queue, extraQueues)
// reply, err := sess.Do("GETJOB", "FROM", queue, extraQueues...)
//
// > build error:
// > too many arguments in call to sess.Do
//
// So.. let's work around this with reflect pkg.
func (conn *Conn) do(args []interface{}) (interface{}, error) {
sess := conn.pool.Get()
defer sess.Close()
timeout := "1000"
reply, err := sess.Do("ADDJOB", queue, data, timeout)
fn := reflect.ValueOf(sess.Do)
reflectArgs := []reflect.Value{}
for _, arg := range args {
reflectArgs = append(reflectArgs, reflect.ValueOf(arg))
}
ret := fn.Call(reflectArgs)
if len(ret) != 2 {
return nil, errors.New("expected two return values")
}
if !ret[1].IsNil() {
err, ok := ret[1].Interface().(error)
if !ok {
return nil, fmt.Errorf("expected error type, got: %T %#v", ret[1], ret[1])
}
return nil, err
}
if ret[0].IsNil() {
return nil, fmt.Errorf("no data available")
}
reply, ok := ret[0].Interface().(interface{})
if !ok {
return nil, fmt.Errorf("unexpected interface{} error type, got: %T %#v", ret[0], ret[0])
}
return reply, nil
}
func (conn *Conn) Add(data string, queue string) (*Job, error) {
args := []interface{}{
"ADDJOB", queue, data, conn.conf.Timeout.Nanoseconds() / 1000000,
}
if conn.conf.RetryAfter > 0 {
args = append(args, "RETRY", conn.conf.RetryAfter.Seconds())
}
reply, err := conn.do(args)
if err != nil {
return nil, err
}
......@@ -69,39 +137,21 @@ func (conn Conn) Enqueue(data string, queue string) (*Job, error) {
}, nil
}
func (conn Conn) Dequeue(queue string, extra ...string) (*Job, error) {
sess := conn.pool.Get()
defer sess.Close()
// Eh, the following doesn't build:
// reply, err := sess.Do("GETJOB", "FROM", queue, extra...)
// if err != nil {
// return nil, err
// }
// I get "too many arguments in call to sess.Do" build error.
// So I have to build the function arguments using reflect pkg.
// <HACK>
fn := reflect.ValueOf(sess.Do)
args := []reflect.Value{reflect.ValueOf("GETJOB"), reflect.ValueOf("FROM"), reflect.ValueOf(queue)}
func (conn *Conn) Get(queue string, extra ...string) (*Job, error) {
args := []interface{}{
"GETJOB",
"TIMEOUT",
int(conn.conf.Timeout.Nanoseconds() / 1000000),
"FROM",
queue,
}
for _, arg := range extra {
args = append(args, reflect.ValueOf(arg))
}
ret := fn.Call(args)
if len(ret) != 2 {
return nil, errors.New("expected return value #1")
}
reply, ok := ret[0].Interface().(interface{})
if !ok {
return nil, errors.New("unexpected return value #2")
}
if !ret[1].IsNil() {
err, ok := ret[1].Interface().(error)
if !ok {
return nil, errors.New("unexpected return value #3")
}
reply, err := conn.do(args)
if err != nil {
return nil, err
}
// </HACK>
replyArr, ok := reply.([]interface{})
if !ok || len(replyArr) != 1 {
......@@ -134,7 +184,7 @@ func (conn Conn) Dequeue(queue string, extra ...string) (*Job, error) {
}, nil
}
func (conn Conn) Ack(job *Job) error {
func (conn *Conn) Ack(job *Job) error {
sess := conn.pool.Get()
defer sess.Close()
......@@ -143,3 +193,13 @@ func (conn Conn) Ack(job *Job) error {
}
return nil
}
func (conn *Conn) Nack(job *Job) error {
sess := conn.pool.Get()
defer sess.Close()
if _, err := sess.Do("ENQUEUE", job.ID); err != nil {
return err
}
return nil
}
......@@ -2,43 +2,96 @@ package disque_test
import (
"testing"
"time"
"github.com/goware/disque"
)
func TestPriorityQueue(t *testing.T) {
func TestPing(t *testing.T) {
// Connect to Disque.
jobs, err := disque.Connect("127.0.0.1:7711")
if err != nil {
t.Fatal(err)
}
defer jobs.Close()
// Ping.
if jobs.Ping() != nil {
t.Fatal(err)
}
}
func TestTimeoutRetryAfter(t *testing.T) {
// Connect to Disque.
jobs, err := disque.Connect("127.0.0.1:7711")
if err != nil {
t.Fatal(err)
}
defer jobs.Close()
// Enqueue job with retry after one second.
_, err = jobs.RetryAfter(time.Second).Add("data1", "test:retry")
if err != nil {
t.Error(err)
}
// Dequeue job.
_, err = jobs.Get("test:retry")
if err != nil {
t.Fatal(err)
}
// Don't Ack() to pretend consumer failure.
// Try to dequeue job again..
// We should hit time-out fot the first time..
_, err = jobs.Timeout(750 * time.Millisecond).Get("test:retry")
if err == nil {
t.Fatal("expected timeout error")
}
// and we should be successful for the second time..
job, err := jobs.Timeout(750 * time.Millisecond).Get("test:retry")
if err != nil {
t.Fatal(err)
}
// Ack the job.
err = jobs.Ack(job)
if err != nil {
t.Fatal(err)
}
}
func TestPriorityQueue(t *testing.T) {
// Connect to Disque.
jobs, err := disque.Connect("127.0.0.1:7711")
if err != nil {
t.Fatal(err)
}
defer jobs.Close()
// Enqueue three jobs.
_, err = jobs.Enqueue("data1", "test:low")
_, err = jobs.Add("data1", "test:low")
if err != nil {
t.Error(err)
}
_, err = jobs.Enqueue("data2", "test:urgent")
_, err = jobs.Add("data2", "test:urgent")
if err != nil {
t.Error(err)
}
_, err = jobs.Enqueue("data3", "test:high")
_, err = jobs.Add("data3", "test:high")
if err != nil {
t.Error(err)
}
// Dequeue first job.
job, err := jobs.Dequeue("test:urgent", "test:high", "test:low")
job, err := jobs.Get("test:urgent", "test:high", "test:low")
if err != nil {
t.Error(err)
t.Fatal(err)
}
err = jobs.Ack(job)
if err != nil {
t.Error(err)
t.Fatal(err)
}
if e := "test:urgent"; job.Queue != e {
t.Fatalf("expected %s, got %s", e, job.Queue)
......@@ -48,13 +101,13 @@ func TestPriorityQueue(t *testing.T) {
}
// Dequeue second job.
job, err = jobs.Dequeue("test:urgent", "test:high", "test:low")
job, err = jobs.Get("test:urgent", "test:high", "test:low")
if err != nil {
t.Error(err)
t.Fatal(err)
}
err = jobs.Ack(job)
if err != nil {
t.Error(err)
t.Fatal(err)
}
if e := "test:high"; job.Queue != e {
t.Fatalf("expected %s, got %s", e, job.Queue)
......@@ -63,14 +116,14 @@ func TestPriorityQueue(t *testing.T) {
t.Fatalf("expected %s, got %s", e, job.Data)
}
// Dequeue third job.
job, err = jobs.Dequeue("test:urgent", "test:high", "test:low")
// Dequeue third job and re-queue it again.
job, err = jobs.Get("test:urgent", "test:high", "test:low")
if err != nil {
t.Error(err)
t.Fatal(err)
}
err = jobs.Ack(job)
err = jobs.Nack(job)
if err != nil {
t.Error(err)
t.Fatal(err)
}
if e := "test:low"; job.Queue != e {
t.Fatalf("expected %s, got %s", e, job.Queue)
......@@ -79,4 +132,19 @@ func TestPriorityQueue(t *testing.T) {
t.Fatalf("expected %s, got %s", e, job.Data)
}
// Dequeue third job again.
job, err = jobs.Get("test:urgent", "test:high", "test:low")
if err != nil {
t.Fatal(err)
}
err = jobs.Ack(job)
if err != nil {
t.Fatal(err)
}
if e := "test:low"; job.Queue != e {
t.Fatalf("expected %s, got %s", e, job.Queue)
}
if e := "data1"; job.Data != e {
t.Fatalf("expected %s, got %s", e, job.Data)
}
}
package disque
import "time"
type Job struct {
ID string
Data string
Queue string
}
type JobConfig struct {
Timeout time.Duration `redis:"TIMEOUT"`
Replicate int `redis:"REPLICATE"`
Delay time.Duration `redis:"DELAY"`
RetryAfter time.Duration `redis:"RETRY"`
TTL time.Duration `redis:"TTL"`
MaxLen int `redis:"MAXLEN"`
}
var defaultJobConfig = JobConfig{
Timeout: 500 * time.Millisecond,
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment