Commit 6c0caf16 authored by Vojtech Vitek (V-Teq)'s avatar Vojtech Vitek (V-Teq)

Implement Wait(job)

parent 50460515
......@@ -46,7 +46,7 @@ for {
jobs.Nack(job)
}
// Acknowledge that we processed the job successfully.
// Acknowledge (dequeue) the job. Success.
jobs.Ack(job)
}
```
......
......@@ -144,15 +144,18 @@ func (conn *Conn) Add(data string, queue string) (*Job, error) {
}, nil
}
func (conn *Conn) Get(queue string, extraQueue ...string) (*Job, error) {
func (conn *Conn) Get(queues ...string) (*Job, error) {
if len(queues) == 0 {
return nil, errors.New("expected at least one queue")
}
args := []interface{}{
"GETJOB",
"TIMEOUT",
int(conn.conf.Timeout.Nanoseconds() / 1000000),
"FROM",
queue,
}
for _, arg := range extraQueue {
for _, arg := range queues {
args = append(args, arg)
}
......@@ -202,6 +205,7 @@ func (conn *Conn) Ack(job *Job) error {
return nil
}
// Native NACKJOB discussed upstream at https://github.com/antirez/disque/issues/43.
func (conn *Conn) Nack(job *Job) error {
sess := conn.pool.Get()
defer sess.Close()
......@@ -211,3 +215,23 @@ func (conn *Conn) Nack(job *Job) error {
}
return nil
}
// Native WAITJOB discussed upstream at https://github.com/antirez/disque/issues/43.
func (conn *Conn) Wait(job *Job) error {
sess := conn.pool.Get()
defer sess.Close()
for {
reply, err := sess.Do("SHOW", job.ID)
if err != nil {
return err
}
if reply == nil {
break
}
time.Sleep(10 * time.Millisecond)
}
return nil
}
......@@ -98,13 +98,13 @@ func TestTimeoutRetryAfter(t *testing.T) {
// Don't Ack() to pretend consumer failure.
// Try to dequeue job again..
// We should hit time-out fot the first time..
_, err = jobs.Timeout(750 * time.Millisecond).Get("test:retry")
// We should hit time-out for the first time..
_, err = jobs.Timeout(250 * time.Millisecond).Get("test:retry")
if err == nil {
t.Fatal("expected error")
}
// and we should be successful for the second time..
job, err := jobs.Timeout(750 * time.Millisecond).Get("test:retry")
job, err := jobs.Timeout(time.Second).Get("test:retry")
if err != nil {
t.Fatal(err)
}
......@@ -202,3 +202,39 @@ func TestPriorityQueue(t *testing.T) {
t.Fatalf("expected %s, got %s", e, job.Data)
}
}
func TestWait(t *testing.T) {
// Connect to Disque.
jobs, err := disque.Connect("127.0.0.1:7711")
if err != nil {
t.Fatal(err)
}
defer jobs.Close()
// Enqueue job.
start := time.Now()
job, err := jobs.Add("data1", "test:wait")
if err != nil {
t.Error(err)
}
go func() {
// Dequeue the job.
job, err := jobs.Get("test:wait")
if err != nil {
t.Fatal(err)
}
// Sleep for 1 second before ACK.
time.Sleep(time.Second)
jobs.Ack(job)
}()
// Wait for the job to finish. Should take more than 1 second.
jobs.Wait(job)
duration := time.Since(start)
if duration < time.Second || duration > 1500*time.Millisecond {
t.Fatalf("expected 1.0s - 1.5s, got %v", time.Since(start))
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment