disque.go 6.25 KB
Newer Older
Vojtech Vitek (V-Teq)'s avatar
Vojtech Vitek (V-Teq) committed
1
package disque
2
3
4

import (
	"errors"
5
	"fmt"
6
7
8
9
10
11
	"reflect"
	"time"

	"github.com/garyburd/redigo/redis"
)

12
13
14
15
16
// Pool represent Redis connection to a Disque Pool
// with a certain Disque configuration.
type Pool struct {
	redis *redis.Pool
	conf  Config
17
18
}

19
20
// New creates a new connection to a given Disque Pool.
func New(address string, extra ...string) (*Pool, error) {
21
	pool := &redis.Pool{
22
23
		MaxIdle:     1024,
		MaxActive:   1024,
24
25
26
27
28
29
30
31
32
33
34
35
36
37
		IdleTimeout: 300 * time.Second,
		Wait:        true,
		Dial: func() (redis.Conn, error) {
			c, err := redis.Dial("tcp", address)
			if err != nil {
				return nil, err
			}
			return c, err
		},
		TestOnBorrow: func(c redis.Conn, t time.Time) error {
			_, err := c.Do("PING")
			return err
		},
	}
38

39
	return &Pool{redis: pool}, nil
40
41
}

Vojtech Vitek (V-Teq)'s avatar
Vojtech Vitek (V-Teq) committed
42
// Close closes the connection to a Disque Pool.
43
44
func (pool *Pool) Close() error {
	return pool.redis.Close()
45
46
}

Vojtech Vitek (V-Teq)'s avatar
Vojtech Vitek (V-Teq) committed
47
// Ping returns nil if Disque Pool is alive, error otherwise.
48
49
func (pool *Pool) Ping() error {
	sess := pool.redis.Get()
50
51
52
53
54
55
56
57
	defer sess.Close()

	if _, err := sess.Do("PING"); err != nil {
		return err
	}
	return nil
}

Vojtech Vitek (V-Teq)'s avatar
Vojtech Vitek (V-Teq) committed
58
59
// do is a helper function that workarounds redigo/redis API
// flaws with a magic function Call() from the reflect pkg.
60
//
Vojtech Vitek (V-Teq)'s avatar
Vojtech Vitek (V-Teq) committed
61
// None of the following builds or works successfully:
62
//
Vojtech Vitek (V-Teq)'s avatar
Vojtech Vitek (V-Teq) committed
63
64
65
66
// reply, err := sess.Do("GETJOB", "FROM", queues, redis.Args{})
// reply, err := sess.Do("GETJOB", "FROM", queues, redis.Args{}...)
// reply, err := sess.Do("GETJOB", "FROM", queues)
// reply, err := sess.Do("GETJOB", "FROM", queues...)
Vojtech Vitek (V-Teq)'s avatar
Vojtech Vitek (V-Teq) committed
67
//
Vojtech Vitek (V-Teq)'s avatar
Vojtech Vitek (V-Teq) committed
68
69
// > Build error: "too many arguments in call to sess.Do"
// > Runtime error: "ERR wrong number of arguments for '...' command"
70
//
71
72
func (pool *Pool) do(args []interface{}) (interface{}, error) {
	sess := pool.redis.Get()
73
74
	defer sess.Close()

75
	fn := reflect.ValueOf(sess.Do)
Maciej Lisiewski's avatar
Maciej Lisiewski committed
76
77
78
	reflectArgs := make([]reflect.Value, len(args))
	for i, arg := range args {
		reflectArgs[i] = reflect.ValueOf(arg)
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
	}
	ret := fn.Call(reflectArgs)
	if len(ret) != 2 {
		return nil, errors.New("expected two return values")
	}
	if !ret[1].IsNil() {
		err, ok := ret[1].Interface().(error)
		if !ok {
			return nil, fmt.Errorf("expected error type, got: %T %#v", ret[1], ret[1])
		}
		return nil, err
	}
	if ret[0].IsNil() {
		return nil, fmt.Errorf("no data available")
	}
	reply, ok := ret[0].Interface().(interface{})
	if !ok {
		return nil, fmt.Errorf("unexpected interface{} error type, got: %T %#v", ret[0], ret[0])
	}
	return reply, nil
}

Vojtech Vitek (V-Teq)'s avatar
Vojtech Vitek (V-Teq) committed
101
// Add enqueues new job with a specified data to a given queue.
102
func (pool *Pool) Add(data string, queue string) (*Job, error) {
103
	args := []interface{}{
104
105
106
		"ADDJOB",
		queue,
		data,
107
		int(pool.conf.Timeout.Nanoseconds() / 1000000),
108
109
	}

110
111
	if pool.conf.Replicate > 0 {
		args = append(args, "REPLICATE", pool.conf.Replicate)
112
	}
113
114
	if pool.conf.Delay > 0 {
		delay := int(pool.conf.Delay.Seconds())
115
116
117
118
119
		if delay == 0 {
			delay = 1
		}
		args = append(args, "DELAY", delay)
	}
120
121
	if pool.conf.RetryAfter > 0 {
		retry := int(pool.conf.RetryAfter.Seconds())
122
123
124
125
126
		if retry == 0 {
			retry = 1
		}
		args = append(args, "RETRY", retry)
	}
127
128
	if pool.conf.TTL > 0 {
		ttl := int(pool.conf.TTL.Seconds())
129
130
131
132
133
		if ttl == 0 {
			ttl = 1
		}
		args = append(args, "TTL", ttl)
	}
134
135
	if pool.conf.MaxLen > 0 {
		args = append(args, "MAXLEN", pool.conf.MaxLen)
136
137
	}

138
	reply, err := pool.do(args)
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
	if err != nil {
		return nil, err
	}

	id, ok := reply.(string)
	if !ok {
		return nil, errors.New("unexpected reply: id")
	}

	return &Job{
		ID:    id,
		Data:  data,
		Queue: queue,
	}, nil
}

Vojtech Vitek (V-Teq)'s avatar
Vojtech Vitek (V-Teq) committed
155
156
// Get returns first available job from a highest priority
// queue possible (left-to-right priority).
157
func (pool *Pool) Get(queues ...string) (*Job, error) {
Vojtech Vitek (V-Teq)'s avatar
Vojtech Vitek (V-Teq) committed
158
159
160
161
	if len(queues) == 0 {
		return nil, errors.New("expected at least one queue")
	}

162
163
164
	args := []interface{}{
		"GETJOB",
		"TIMEOUT",
165
		int(pool.conf.Timeout.Nanoseconds() / 1000000),
166
167
		"FROM",
	}
Vojtech Vitek (V-Teq)'s avatar
Vojtech Vitek (V-Teq) committed
168
	for _, arg := range queues {
169
		args = append(args, arg)
170
	}
171

172
	reply, err := pool.do(args)
173
	if err != nil {
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
		return nil, err
	}

	replyArr, ok := reply.([]interface{})
	if !ok || len(replyArr) != 1 {
		return nil, errors.New("unexpected reply #1")
	}
	arr, ok := replyArr[0].([]interface{})
	if !ok || len(arr) != 3 {
		return nil, errors.New("unexpected reply #2")
	}

	que, ok := arr[0].([]byte)
	if !ok {
		return nil, errors.New("unexpected reply: queue")
	}

	id, ok := arr[1].([]byte)
	if !ok {
		return nil, errors.New("unexpected reply: id")
	}

	data, ok := arr[2].([]byte)
	if !ok {
		return nil, errors.New("unexpected reply: data")
	}

	return &Job{
		ID:    string(id),
		Data:  string(data),
		Queue: string(que),
	}, nil
}

Vojtech Vitek (V-Teq)'s avatar
Vojtech Vitek (V-Teq) committed
208
// Ack acknowledges (dequeues/removes) a job from its queue.
209
210
func (pool *Pool) Ack(job *Job) error {
	sess := pool.redis.Get()
211
212
213
214
215
216
217
	defer sess.Close()

	if _, err := sess.Do("ACKJOB", job.ID); err != nil {
		return err
	}
	return nil
}
218

Vojtech Vitek (V-Teq)'s avatar
Vojtech Vitek (V-Teq) committed
219
// Nack re-queues a job back into its queue.
220
221
func (pool *Pool) Nack(job *Job) error {
	sess := pool.redis.Get()
222
223
	defer sess.Close()

224
	if _, err := sess.Do("NACK", job.ID); err != nil {
225
226
227
228
		return err
	}
	return nil
}
Vojtech Vitek (V-Teq)'s avatar
Vojtech Vitek (V-Teq) committed
229

ale's avatar
ale committed
230
231
232
233
234
235
236
237
238
239
240
// Working on a job.
func (pool *Pool) Working(job *Job) error {
	sess := pool.redis.Get()
	defer sess.Close()

	if _, err := sess.Do("WORKING", job.ID); err != nil {
		return err
	}
	return nil
}

241
242
// Wait blocks until the given job is ACKed.
// Native WAITJOB discussed upstream at https://github.com/antirez/disque/issues/168.
243
244
func (pool *Pool) Wait(job *Job) error {
	sess := pool.redis.Get()
Vojtech Vitek (V-Teq)'s avatar
Vojtech Vitek (V-Teq) committed
245
246
247
248
249
250
251
252
253
254
255
	defer sess.Close()

	for {
		reply, err := sess.Do("SHOW", job.ID)
		if err != nil {
			return err
		}
		if reply == nil {
			break
		}

256
		time.Sleep(50 * time.Millisecond)
Vojtech Vitek (V-Teq)'s avatar
Vojtech Vitek (V-Teq) committed
257
258
259
260
	}

	return nil
}
Vojtech Vitek (V-Teq)'s avatar
Vojtech Vitek (V-Teq) committed
261
262

// Len returns length of a given queue.
263
264
func (pool *Pool) Len(queue string) (int, error) {
	sess := pool.redis.Get()
Vojtech Vitek (V-Teq)'s avatar
Vojtech Vitek (V-Teq) committed
265
266
267
268
269
270
271
272
273
	defer sess.Close()

	length, err := redis.Int(sess.Do("QLEN", queue))
	if err != nil {
		return 0, err
	}

	return length, nil
}
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293

// ActiveLen returns length of active jobs taken from a given queue.
func (pool *Pool) ActiveLen(queue string) (int, error) {
	sess := pool.redis.Get()
	defer sess.Close()

	reply, err := sess.Do("JSCAN", "QUEUE", queue, "STATE", "active")
	if err != nil {
		return 0, err
	}
	replyArr, ok := reply.([]interface{})
	if !ok || len(replyArr) != 2 {
		return 0, errors.New("unexpected reply #1")
	}
	jobs, ok := replyArr[1].([]interface{})
	if !ok {
		return 0, errors.New("unexpected reply #2")
	}
	return len(jobs), nil
}