Commit e06542e4 authored by Vojtech Vitek (V-Teq)'s avatar Vojtech Vitek (V-Teq)

Implement ActiveLen(), native NACK

parent 7c53cf7f
MIT License
Copyright (c) 2014 Pressly Inc. www.pressly.com
Copyright (c) 2016 Pressly Inc. www.pressly.com
Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
......
......@@ -217,19 +217,18 @@ func (pool *Pool) Ack(job *Job) error {
}
// Nack re-queues a job back into its queue.
// Native NACKJOB discussed upstream at https://github.com/antirez/disque/issues/43.
func (pool *Pool) Nack(job *Job) error {
sess := pool.redis.Get()
defer sess.Close()
if _, err := sess.Do("ENQUEUE", job.ID); err != nil {
if _, err := sess.Do("NACK", job.ID); err != nil {
return err
}
return nil
}
// Wait waits for a job to finish (blocks until it's ACKed).
// Native WAITJOB discussed upstream at https://github.com/antirez/disque/issues/43.
// Wait blocks until the given job is ACKed.
// Native WAITJOB discussed upstream at https://github.com/antirez/disque/issues/168.
func (pool *Pool) Wait(job *Job) error {
sess := pool.redis.Get()
defer sess.Close()
......@@ -243,7 +242,7 @@ func (pool *Pool) Wait(job *Job) error {
break
}
time.Sleep(10 * time.Millisecond)
time.Sleep(50 * time.Millisecond)
}
return nil
......@@ -261,3 +260,23 @@ func (pool *Pool) Len(queue string) (int, error) {
return length, nil
}
// ActiveLen returns length of active jobs taken from a given queue.
func (pool *Pool) ActiveLen(queue string) (int, error) {
sess := pool.redis.Get()
defer sess.Close()
reply, err := sess.Do("JSCAN", "QUEUE", queue, "STATE", "active")
if err != nil {
return 0, err
}
replyArr, ok := reply.([]interface{})
if !ok || len(replyArr) != 2 {
return 0, errors.New("unexpected reply #1")
}
jobs, ok := replyArr[1].([]interface{})
if !ok {
return 0, errors.New("unexpected reply #2")
}
return len(jobs), nil
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment