Skip to content
Snippets Groups Projects
Commit da1e89da authored by Vojtech Vitek (V-Teq)'s avatar Vojtech Vitek (V-Teq)
Browse files

API change - disque.New() returns *disque.Pool

Feedback by: @mistobaan.
parent ec769f29
No related branches found
No related tags found
No related merge requests found
......@@ -21,7 +21,7 @@ import (
func main() {
// Connect to Disque pool.
jobs, _ := disque.Connect("127.0.0.1:7711") // Accepts more arguments.
jobs, _ := disque.New("127.0.0.1:7711") // Accepts more arguments.
// Enqueue three jobs with different priorities.
job1, _ := jobs.Add(data1, "high")
......@@ -42,7 +42,7 @@ import (
func main() {
// Connect to Disque pool.
jobs, _ := disque.Connect("127.0.0.1:7711") // Accepts more arguments.
jobs, _ := disque.New("127.0.0.1:7711") // Accepts more arguments.
for {
// Get job from highest priority queue possible. Blocks by default.
......@@ -74,7 +74,7 @@ func main() {
## Custom configuration
```go
jobs, _ := disque.Connect("127.0.0.1:7711")
jobs, _ := disque.New("127.0.0.1:7711")
config := disque.Config{
Timeout: time.Second, // Each operation fails after 1s timeout elapses.
......
......@@ -13,48 +13,48 @@ type Config struct {
}
// Use applies given config to every subsequent operation of this connection.
func (conn *Conn) Use(conf Config) *Conn {
conn.conf = conf
return conn
func (pool *Pool) Use(conf Config) *Pool {
pool.conf = conf
return pool
}
// With applies given config to a single operation.
func (conn *Conn) With(conf Config) *Conn {
return &Conn{pool: conn.pool, conf: conf}
func (pool *Pool) With(conf Config) *Pool {
return &Pool{redis: pool.redis, conf: conf}
}
// Timeout option applied to a single operation.
func (conn *Conn) Timeout(timeout time.Duration) *Conn {
conn.conf.Timeout = timeout
return &Conn{pool: conn.pool, conf: conn.conf}
func (pool *Pool) Timeout(timeout time.Duration) *Pool {
pool.conf.Timeout = timeout
return &Pool{redis: pool.redis, conf: pool.conf}
}
// Replicate option applied to a single operation.
func (conn *Conn) Replicate(replicate int) *Conn {
conn.conf.Replicate = replicate
return &Conn{pool: conn.pool, conf: conn.conf}
func (pool *Pool) Replicate(replicate int) *Pool {
pool.conf.Replicate = replicate
return &Pool{redis: pool.redis, conf: pool.conf}
}
// Delay option applied to a single operation.
func (conn *Conn) Delay(delay time.Duration) *Conn {
conn.conf.Delay = delay
return &Conn{pool: conn.pool, conf: conn.conf}
func (pool *Pool) Delay(delay time.Duration) *Pool {
pool.conf.Delay = delay
return &Pool{redis: pool.redis, conf: pool.conf}
}
// RetryAfter option applied to a single operation.
func (conn *Conn) RetryAfter(after time.Duration) *Conn {
conn.conf.RetryAfter = after
return &Conn{pool: conn.pool, conf: conn.conf}
func (pool *Pool) RetryAfter(after time.Duration) *Pool {
pool.conf.RetryAfter = after
return &Pool{redis: pool.redis, conf: pool.conf}
}
// TTL option applied to a single operation.
func (conn *Conn) TTL(ttl time.Duration) *Conn {
conn.conf.TTL = ttl
return &Conn{pool: conn.pool, conf: conn.conf}
func (pool *Pool) TTL(ttl time.Duration) *Pool {
pool.conf.TTL = ttl
return &Pool{redis: pool.redis, conf: pool.conf}
}
// MaxLen option applied to a single operation.
func (conn *Conn) MaxLen(maxlen int) *Conn {
conn.conf.MaxLen = maxlen
return &Conn{pool: conn.pool, conf: conn.conf}
func (pool *Pool) MaxLen(maxlen int) *Pool {
pool.conf.MaxLen = maxlen
return &Pool{redis: pool.redis, conf: pool.conf}
}
......@@ -9,14 +9,15 @@ import (
"github.com/garyburd/redigo/redis"
)
// Conn represent connection to a Disque Pool.
type Conn struct {
pool *redis.Pool
// Pool represent Redis connection to a Disque Pool
// with a certain Disque configuration.
type Pool struct {
redis *redis.Pool
conf Config
}
// Connect creates new connection to a given Disque Pool.
func Connect(address string, extra ...string) (*Conn, error) {
// New creates a new connection to a given Disque Pool.
func New(address string, extra ...string) (*Pool, error) {
pool := &redis.Pool{
MaxIdle: 1024,
MaxActive: 1024,
......@@ -35,17 +36,17 @@ func Connect(address string, extra ...string) (*Conn, error) {
},
}
return &Conn{pool: pool}, nil
return &Pool{redis: pool}, nil
}
// Close closes the connection to a Disque Pool.
func (conn *Conn) Close() error {
return conn.pool.Close()
func (pool *Pool) Close() error {
return pool.redis.Close()
}
// Ping returns nil if Disque Pool is alive, error otherwise.
func (conn *Conn) Ping() error {
sess := conn.pool.Get()
func (pool *Pool) Ping() error {
sess := pool.redis.Get()
defer sess.Close()
if _, err := sess.Do("PING"); err != nil {
......@@ -67,8 +68,8 @@ func (conn *Conn) Ping() error {
// > Build error: "too many arguments in call to sess.Do"
// > Runtime error: "ERR wrong number of arguments for '...' command"
//
func (conn *Conn) do(args []interface{}) (interface{}, error) {
sess := conn.pool.Get()
func (pool *Pool) do(args []interface{}) (interface{}, error) {
sess := pool.redis.Get()
defer sess.Close()
fn := reflect.ValueOf(sess.Do)
......@@ -98,43 +99,43 @@ func (conn *Conn) do(args []interface{}) (interface{}, error) {
}
// Add enqueues new job with a specified data to a given queue.
func (conn *Conn) Add(data string, queue string) (*Job, error) {
func (pool *Pool) Add(data string, queue string) (*Job, error) {
args := []interface{}{
"ADDJOB",
queue,
data,
int(conn.conf.Timeout.Nanoseconds() / 1000000),
int(pool.conf.Timeout.Nanoseconds() / 1000000),
}
if conn.conf.Replicate > 0 {
args = append(args, "REPLICATE", conn.conf.Replicate)
if pool.conf.Replicate > 0 {
args = append(args, "REPLICATE", pool.conf.Replicate)
}
if conn.conf.Delay > 0 {
delay := int(conn.conf.Delay.Seconds())
if pool.conf.Delay > 0 {
delay := int(pool.conf.Delay.Seconds())
if delay == 0 {
delay = 1
}
args = append(args, "DELAY", delay)
}
if conn.conf.RetryAfter > 0 {
retry := int(conn.conf.RetryAfter.Seconds())
if pool.conf.RetryAfter > 0 {
retry := int(pool.conf.RetryAfter.Seconds())
if retry == 0 {
retry = 1
}
args = append(args, "RETRY", retry)
}
if conn.conf.TTL > 0 {
ttl := int(conn.conf.TTL.Seconds())
if pool.conf.TTL > 0 {
ttl := int(pool.conf.TTL.Seconds())
if ttl == 0 {
ttl = 1
}
args = append(args, "TTL", ttl)
}
if conn.conf.MaxLen > 0 {
args = append(args, "MAXLEN", conn.conf.MaxLen)
if pool.conf.MaxLen > 0 {
args = append(args, "MAXLEN", pool.conf.MaxLen)
}
reply, err := conn.do(args)
reply, err := pool.do(args)
if err != nil {
return nil, err
}
......@@ -153,7 +154,7 @@ func (conn *Conn) Add(data string, queue string) (*Job, error) {
// Get returns first available job from a highest priority
// queue possible (left-to-right priority).
func (conn *Conn) Get(queues ...string) (*Job, error) {
func (pool *Pool) Get(queues ...string) (*Job, error) {
if len(queues) == 0 {
return nil, errors.New("expected at least one queue")
}
......@@ -161,14 +162,14 @@ func (conn *Conn) Get(queues ...string) (*Job, error) {
args := []interface{}{
"GETJOB",
"TIMEOUT",
int(conn.conf.Timeout.Nanoseconds() / 1000000),
int(pool.conf.Timeout.Nanoseconds() / 1000000),
"FROM",
}
for _, arg := range queues {
args = append(args, arg)
}
reply, err := conn.do(args)
reply, err := pool.do(args)
if err != nil {
return nil, err
}
......@@ -205,8 +206,8 @@ func (conn *Conn) Get(queues ...string) (*Job, error) {
}
// Ack acknowledges (dequeues/removes) a job from its queue.
func (conn *Conn) Ack(job *Job) error {
sess := conn.pool.Get()
func (pool *Pool) Ack(job *Job) error {
sess := pool.redis.Get()
defer sess.Close()
if _, err := sess.Do("ACKJOB", job.ID); err != nil {
......@@ -217,8 +218,8 @@ func (conn *Conn) Ack(job *Job) error {
// Nack re-queues a job back into its queue.
// Native NACKJOB discussed upstream at https://github.com/antirez/disque/issues/43.
func (conn *Conn) Nack(job *Job) error {
sess := conn.pool.Get()
func (pool *Pool) Nack(job *Job) error {
sess := pool.redis.Get()
defer sess.Close()
if _, err := sess.Do("ENQUEUE", job.ID); err != nil {
......@@ -229,8 +230,8 @@ func (conn *Conn) Nack(job *Job) error {
// Wait waits for a job to finish (blocks until it's ACKed).
// Native WAITJOB discussed upstream at https://github.com/antirez/disque/issues/43.
func (conn *Conn) Wait(job *Job) error {
sess := conn.pool.Get()
func (pool *Pool) Wait(job *Job) error {
sess := pool.redis.Get()
defer sess.Close()
for {
......@@ -249,8 +250,8 @@ func (conn *Conn) Wait(job *Job) error {
}
// Len returns length of a given queue.
func (conn *Conn) Len(queue string) (int, error) {
sess := conn.pool.Get()
func (pool *Pool) Len(queue string) (int, error) {
sess := pool.redis.Get()
defer sess.Close()
length, err := redis.Int(sess.Do("QLEN", queue))
......
......@@ -9,7 +9,7 @@ import (
func TestPing(t *testing.T) {
// Connect to Disque.
jobs, err := disque.Connect("127.0.0.1:7711")
jobs, err := disque.New("127.0.0.1:7711")
if err != nil {
t.Fatal(err)
}
......@@ -23,7 +23,7 @@ func TestPing(t *testing.T) {
func TestDelay(t *testing.T) {
// Connect to Disque.
jobs, err := disque.Connect("127.0.0.1:7711")
jobs, err := disque.New("127.0.0.1:7711")
if err != nil {
t.Fatal(err)
}
......@@ -54,7 +54,7 @@ func TestDelay(t *testing.T) {
func TestTTL(t *testing.T) {
// Connect to Disque.
jobs, err := disque.Connect("127.0.0.1:7711")
jobs, err := disque.New("127.0.0.1:7711")
if err != nil {
t.Fatal(err)
}
......@@ -77,7 +77,7 @@ func TestTTL(t *testing.T) {
func TestTimeoutRetryAfter(t *testing.T) {
// Connect to Disque.
jobs, err := disque.Connect("127.0.0.1:7711")
jobs, err := disque.New("127.0.0.1:7711")
if err != nil {
t.Fatal(err)
}
......@@ -118,7 +118,7 @@ func TestTimeoutRetryAfter(t *testing.T) {
func TestPriorityQueue(t *testing.T) {
// Connect to Disque.
jobs, err := disque.Connect("127.0.0.1:7711")
jobs, err := disque.New("127.0.0.1:7711")
if err != nil {
t.Fatal(err)
}
......@@ -205,7 +205,7 @@ func TestPriorityQueue(t *testing.T) {
func TestWait(t *testing.T) {
// Connect to Disque.
jobs, err := disque.Connect("127.0.0.1:7711")
jobs, err := disque.New("127.0.0.1:7711")
if err != nil {
t.Fatal(err)
}
......@@ -241,7 +241,7 @@ func TestWait(t *testing.T) {
func TestConfig(t *testing.T) {
// Connect to Disque.
jobs, err := disque.Connect("127.0.0.1:7711")
jobs, err := disque.New("127.0.0.1:7711")
if err != nil {
t.Fatal(err)
}
......@@ -267,7 +267,7 @@ func TestConfig(t *testing.T) {
func TestQueueLength(t *testing.T) {
// Connect to Disque.
jobs, err := disque.Connect("127.0.0.1:7711")
jobs, err := disque.New("127.0.0.1:7711")
if err != nil {
t.Fatal(err)
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment