Commit 7c53cf7f authored by Vojtech Vitek's avatar Vojtech Vitek
Browse files

Merge pull request #8 from goware/slack_feedback

Slack feedback
parents c85ca5e2 175f5f00
...@@ -10,6 +10,8 @@ ...@@ -10,6 +10,8 @@
[![GoDoc](https://godoc.org/github.com/goware/disque?status.png)](https://godoc.org/github.com/goware/disque) [![GoDoc](https://godoc.org/github.com/goware/disque?status.png)](https://godoc.org/github.com/goware/disque)
[![Travis](https://travis-ci.org/goware/disque.svg?branch=master)](https://travis-ci.org/goware/disque) [![Travis](https://travis-ci.org/goware/disque.svg?branch=master)](https://travis-ci.org/goware/disque)
*Note: The examples below ignore error handling for readability.*
## Producer ## Producer
```go ```go
...@@ -19,7 +21,8 @@ import ( ...@@ -19,7 +21,8 @@ import (
func main() { func main() {
// Connect to Disque pool. // Connect to Disque pool.
jobs, _ := disque.Connect("127.0.0.1:7711") // Accepts more arguments. jobs, _ := disque.New("127.0.0.1:7711") // Accepts more arguments.
defer jobs.Close()
// Enqueue three jobs with different priorities. // Enqueue three jobs with different priorities.
job1, _ := jobs.Add(data1, "high") job1, _ := jobs.Add(data1, "high")
...@@ -40,7 +43,8 @@ import ( ...@@ -40,7 +43,8 @@ import (
func main() { func main() {
// Connect to Disque pool. // Connect to Disque pool.
jobs, _ := disque.Connect("127.0.0.1:7711") // Accepts more arguments. jobs, _ := disque.New("127.0.0.1:7711") // Accepts more arguments.
defer jobs.Close()
for { for {
// Get job from highest priority queue possible. Blocks by default. // Get job from highest priority queue possible. Blocks by default.
...@@ -72,7 +76,7 @@ func main() { ...@@ -72,7 +76,7 @@ func main() {
## Custom configuration ## Custom configuration
```go ```go
jobs, _ := disque.Connect("127.0.0.1:7711") jobs, _ := disque.New("127.0.0.1:7711")
config := disque.Config{ config := disque.Config{
Timeout: time.Second, // Each operation fails after 1s timeout elapses. Timeout: time.Second, // Each operation fails after 1s timeout elapses.
......
...@@ -13,48 +13,48 @@ type Config struct { ...@@ -13,48 +13,48 @@ type Config struct {
} }
// Use applies given config to every subsequent operation of this connection. // Use applies given config to every subsequent operation of this connection.
func (conn *Conn) Use(conf Config) *Conn { func (pool *Pool) Use(conf Config) *Pool {
conn.conf = conf pool.conf = conf
return conn return pool
} }
// With applies given config to a single operation. // With applies given config to a single operation.
func (conn *Conn) With(conf Config) *Conn { func (pool *Pool) With(conf Config) *Pool {
return &Conn{pool: conn.pool, conf: conf} return &Pool{redis: pool.redis, conf: conf}
} }
// Timeout option applied to a single operation. // Timeout option applied to a single operation.
func (conn *Conn) Timeout(timeout time.Duration) *Conn { func (pool *Pool) Timeout(timeout time.Duration) *Pool {
conn.conf.Timeout = timeout pool.conf.Timeout = timeout
return &Conn{pool: conn.pool, conf: conn.conf} return &Pool{redis: pool.redis, conf: pool.conf}
} }
// Replicate option applied to a single operation. // Replicate option applied to a single operation.
func (conn *Conn) Replicate(replicate int) *Conn { func (pool *Pool) Replicate(replicate int) *Pool {
conn.conf.Replicate = replicate pool.conf.Replicate = replicate
return &Conn{pool: conn.pool, conf: conn.conf} return &Pool{redis: pool.redis, conf: pool.conf}
} }
// Delay option applied to a single operation. // Delay option applied to a single operation.
func (conn *Conn) Delay(delay time.Duration) *Conn { func (pool *Pool) Delay(delay time.Duration) *Pool {
conn.conf.Delay = delay pool.conf.Delay = delay
return &Conn{pool: conn.pool, conf: conn.conf} return &Pool{redis: pool.redis, conf: pool.conf}
} }
// RetryAfter option applied to a single operation. // RetryAfter option applied to a single operation.
func (conn *Conn) RetryAfter(after time.Duration) *Conn { func (pool *Pool) RetryAfter(after time.Duration) *Pool {
conn.conf.RetryAfter = after pool.conf.RetryAfter = after
return &Conn{pool: conn.pool, conf: conn.conf} return &Pool{redis: pool.redis, conf: pool.conf}
} }
// TTL option applied to a single operation. // TTL option applied to a single operation.
func (conn *Conn) TTL(ttl time.Duration) *Conn { func (pool *Pool) TTL(ttl time.Duration) *Pool {
conn.conf.TTL = ttl pool.conf.TTL = ttl
return &Conn{pool: conn.pool, conf: conn.conf} return &Pool{redis: pool.redis, conf: pool.conf}
} }
// MaxLen option applied to a single operation. // MaxLen option applied to a single operation.
func (conn *Conn) MaxLen(maxlen int) *Conn { func (pool *Pool) MaxLen(maxlen int) *Pool {
conn.conf.MaxLen = maxlen pool.conf.MaxLen = maxlen
return &Conn{pool: conn.pool, conf: conn.conf} return &Pool{redis: pool.redis, conf: pool.conf}
} }
...@@ -9,17 +9,18 @@ import ( ...@@ -9,17 +9,18 @@ import (
"github.com/garyburd/redigo/redis" "github.com/garyburd/redigo/redis"
) )
// Conn represent connection to a Disque Pool. // Pool represent Redis connection to a Disque Pool
type Conn struct { // with a certain Disque configuration.
pool *redis.Pool type Pool struct {
conf Config redis *redis.Pool
conf Config
} }
// Connect creates new connection to a given Disque Pool. // New creates a new connection to a given Disque Pool.
func Connect(address string, extra ...string) (*Conn, error) { func New(address string, extra ...string) (*Pool, error) {
pool := &redis.Pool{ pool := &redis.Pool{
MaxIdle: 64, MaxIdle: 1024,
MaxActive: 64, MaxActive: 1024,
IdleTimeout: 300 * time.Second, IdleTimeout: 300 * time.Second,
Wait: true, Wait: true,
Dial: func() (redis.Conn, error) { Dial: func() (redis.Conn, error) {
...@@ -35,17 +36,17 @@ func Connect(address string, extra ...string) (*Conn, error) { ...@@ -35,17 +36,17 @@ func Connect(address string, extra ...string) (*Conn, error) {
}, },
} }
return &Conn{pool: pool}, nil return &Pool{redis: pool}, nil
} }
// Close closes the connection to a Disque Pool. // Close closes the connection to a Disque Pool.
func (conn *Conn) Close() { func (pool *Pool) Close() error {
conn.pool.Close() return pool.redis.Close()
} }
// Ping returns nil if Disque Pool is alive, error otherwise. // Ping returns nil if Disque Pool is alive, error otherwise.
func (conn *Conn) Ping() error { func (pool *Pool) Ping() error {
sess := conn.pool.Get() sess := pool.redis.Get()
defer sess.Close() defer sess.Close()
if _, err := sess.Do("PING"); err != nil { if _, err := sess.Do("PING"); err != nil {
...@@ -67,8 +68,8 @@ func (conn *Conn) Ping() error { ...@@ -67,8 +68,8 @@ func (conn *Conn) Ping() error {
// > Build error: "too many arguments in call to sess.Do" // > Build error: "too many arguments in call to sess.Do"
// > Runtime error: "ERR wrong number of arguments for '...' command" // > Runtime error: "ERR wrong number of arguments for '...' command"
// //
func (conn *Conn) do(args []interface{}) (interface{}, error) { func (pool *Pool) do(args []interface{}) (interface{}, error) {
sess := conn.pool.Get() sess := pool.redis.Get()
defer sess.Close() defer sess.Close()
fn := reflect.ValueOf(sess.Do) fn := reflect.ValueOf(sess.Do)
...@@ -98,43 +99,43 @@ func (conn *Conn) do(args []interface{}) (interface{}, error) { ...@@ -98,43 +99,43 @@ func (conn *Conn) do(args []interface{}) (interface{}, error) {
} }
// Add enqueues new job with a specified data to a given queue. // Add enqueues new job with a specified data to a given queue.
func (conn *Conn) Add(data string, queue string) (*Job, error) { func (pool *Pool) Add(data string, queue string) (*Job, error) {
args := []interface{}{ args := []interface{}{
"ADDJOB", "ADDJOB",
queue, queue,
data, data,
int(conn.conf.Timeout.Nanoseconds() / 1000000), int(pool.conf.Timeout.Nanoseconds() / 1000000),
} }
if conn.conf.Replicate > 0 { if pool.conf.Replicate > 0 {
args = append(args, "REPLICATE", conn.conf.Replicate) args = append(args, "REPLICATE", pool.conf.Replicate)
} }
if conn.conf.Delay > 0 { if pool.conf.Delay > 0 {
delay := int(conn.conf.Delay.Seconds()) delay := int(pool.conf.Delay.Seconds())
if delay == 0 { if delay == 0 {
delay = 1 delay = 1
} }
args = append(args, "DELAY", delay) args = append(args, "DELAY", delay)
} }
if conn.conf.RetryAfter > 0 { if pool.conf.RetryAfter > 0 {
retry := int(conn.conf.RetryAfter.Seconds()) retry := int(pool.conf.RetryAfter.Seconds())
if retry == 0 { if retry == 0 {
retry = 1 retry = 1
} }
args = append(args, "RETRY", retry) args = append(args, "RETRY", retry)
} }
if conn.conf.TTL > 0 { if pool.conf.TTL > 0 {
ttl := int(conn.conf.TTL.Seconds()) ttl := int(pool.conf.TTL.Seconds())
if ttl == 0 { if ttl == 0 {
ttl = 1 ttl = 1
} }
args = append(args, "TTL", ttl) args = append(args, "TTL", ttl)
} }
if conn.conf.MaxLen > 0 { if pool.conf.MaxLen > 0 {
args = append(args, "MAXLEN", conn.conf.MaxLen) args = append(args, "MAXLEN", pool.conf.MaxLen)
} }
reply, err := conn.do(args) reply, err := pool.do(args)
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -153,7 +154,7 @@ func (conn *Conn) Add(data string, queue string) (*Job, error) { ...@@ -153,7 +154,7 @@ func (conn *Conn) Add(data string, queue string) (*Job, error) {
// Get returns first available job from a highest priority // Get returns first available job from a highest priority
// queue possible (left-to-right priority). // queue possible (left-to-right priority).
func (conn *Conn) Get(queues ...string) (*Job, error) { func (pool *Pool) Get(queues ...string) (*Job, error) {
if len(queues) == 0 { if len(queues) == 0 {
return nil, errors.New("expected at least one queue") return nil, errors.New("expected at least one queue")
} }
...@@ -161,14 +162,14 @@ func (conn *Conn) Get(queues ...string) (*Job, error) { ...@@ -161,14 +162,14 @@ func (conn *Conn) Get(queues ...string) (*Job, error) {
args := []interface{}{ args := []interface{}{
"GETJOB", "GETJOB",
"TIMEOUT", "TIMEOUT",
int(conn.conf.Timeout.Nanoseconds() / 1000000), int(pool.conf.Timeout.Nanoseconds() / 1000000),
"FROM", "FROM",
} }
for _, arg := range queues { for _, arg := range queues {
args = append(args, arg) args = append(args, arg)
} }
reply, err := conn.do(args) reply, err := pool.do(args)
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -205,8 +206,8 @@ func (conn *Conn) Get(queues ...string) (*Job, error) { ...@@ -205,8 +206,8 @@ func (conn *Conn) Get(queues ...string) (*Job, error) {
} }
// Ack acknowledges (dequeues/removes) a job from its queue. // Ack acknowledges (dequeues/removes) a job from its queue.
func (conn *Conn) Ack(job *Job) error { func (pool *Pool) Ack(job *Job) error {
sess := conn.pool.Get() sess := pool.redis.Get()
defer sess.Close() defer sess.Close()
if _, err := sess.Do("ACKJOB", job.ID); err != nil { if _, err := sess.Do("ACKJOB", job.ID); err != nil {
...@@ -217,8 +218,8 @@ func (conn *Conn) Ack(job *Job) error { ...@@ -217,8 +218,8 @@ func (conn *Conn) Ack(job *Job) error {
// Nack re-queues a job back into its queue. // Nack re-queues a job back into its queue.
// Native NACKJOB discussed upstream at https://github.com/antirez/disque/issues/43. // Native NACKJOB discussed upstream at https://github.com/antirez/disque/issues/43.
func (conn *Conn) Nack(job *Job) error { func (pool *Pool) Nack(job *Job) error {
sess := conn.pool.Get() sess := pool.redis.Get()
defer sess.Close() defer sess.Close()
if _, err := sess.Do("ENQUEUE", job.ID); err != nil { if _, err := sess.Do("ENQUEUE", job.ID); err != nil {
...@@ -229,8 +230,8 @@ func (conn *Conn) Nack(job *Job) error { ...@@ -229,8 +230,8 @@ func (conn *Conn) Nack(job *Job) error {
// Wait waits for a job to finish (blocks until it's ACKed). // Wait waits for a job to finish (blocks until it's ACKed).
// Native WAITJOB discussed upstream at https://github.com/antirez/disque/issues/43. // Native WAITJOB discussed upstream at https://github.com/antirez/disque/issues/43.
func (conn *Conn) Wait(job *Job) error { func (pool *Pool) Wait(job *Job) error {
sess := conn.pool.Get() sess := pool.redis.Get()
defer sess.Close() defer sess.Close()
for { for {
...@@ -249,8 +250,8 @@ func (conn *Conn) Wait(job *Job) error { ...@@ -249,8 +250,8 @@ func (conn *Conn) Wait(job *Job) error {
} }
// Len returns length of a given queue. // Len returns length of a given queue.
func (conn *Conn) Len(queue string) (int, error) { func (pool *Pool) Len(queue string) (int, error) {
sess := conn.pool.Get() sess := pool.redis.Get()
defer sess.Close() defer sess.Close()
length, err := redis.Int(sess.Do("QLEN", queue)) length, err := redis.Int(sess.Do("QLEN", queue))
......
...@@ -9,7 +9,7 @@ import ( ...@@ -9,7 +9,7 @@ import (
func TestPing(t *testing.T) { func TestPing(t *testing.T) {
// Connect to Disque. // Connect to Disque.
jobs, err := disque.Connect("127.0.0.1:7711") jobs, err := disque.New("127.0.0.1:7711")
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -23,7 +23,7 @@ func TestPing(t *testing.T) { ...@@ -23,7 +23,7 @@ func TestPing(t *testing.T) {
func TestDelay(t *testing.T) { func TestDelay(t *testing.T) {
// Connect to Disque. // Connect to Disque.
jobs, err := disque.Connect("127.0.0.1:7711") jobs, err := disque.New("127.0.0.1:7711")
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -54,7 +54,7 @@ func TestDelay(t *testing.T) { ...@@ -54,7 +54,7 @@ func TestDelay(t *testing.T) {
func TestTTL(t *testing.T) { func TestTTL(t *testing.T) {
// Connect to Disque. // Connect to Disque.
jobs, err := disque.Connect("127.0.0.1:7711") jobs, err := disque.New("127.0.0.1:7711")
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -77,7 +77,7 @@ func TestTTL(t *testing.T) { ...@@ -77,7 +77,7 @@ func TestTTL(t *testing.T) {
func TestTimeoutRetryAfter(t *testing.T) { func TestTimeoutRetryAfter(t *testing.T) {
// Connect to Disque. // Connect to Disque.
jobs, err := disque.Connect("127.0.0.1:7711") jobs, err := disque.New("127.0.0.1:7711")
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -118,7 +118,7 @@ func TestTimeoutRetryAfter(t *testing.T) { ...@@ -118,7 +118,7 @@ func TestTimeoutRetryAfter(t *testing.T) {
func TestPriorityQueue(t *testing.T) { func TestPriorityQueue(t *testing.T) {
// Connect to Disque. // Connect to Disque.
jobs, err := disque.Connect("127.0.0.1:7711") jobs, err := disque.New("127.0.0.1:7711")
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -205,7 +205,7 @@ func TestPriorityQueue(t *testing.T) { ...@@ -205,7 +205,7 @@ func TestPriorityQueue(t *testing.T) {
func TestWait(t *testing.T) { func TestWait(t *testing.T) {
// Connect to Disque. // Connect to Disque.
jobs, err := disque.Connect("127.0.0.1:7711") jobs, err := disque.New("127.0.0.1:7711")
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -241,7 +241,7 @@ func TestWait(t *testing.T) { ...@@ -241,7 +241,7 @@ func TestWait(t *testing.T) {
func TestConfig(t *testing.T) { func TestConfig(t *testing.T) {
// Connect to Disque. // Connect to Disque.
jobs, err := disque.Connect("127.0.0.1:7711") jobs, err := disque.New("127.0.0.1:7711")
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -267,7 +267,7 @@ func TestConfig(t *testing.T) { ...@@ -267,7 +267,7 @@ func TestConfig(t *testing.T) {
func TestQueueLength(t *testing.T) { func TestQueueLength(t *testing.T) {
// Connect to Disque. // Connect to Disque.
jobs, err := disque.Connect("127.0.0.1:7711") jobs, err := disque.New("127.0.0.1:7711")
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment