Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
What's new
7
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Open sidebar
ale
disque
Commits
5d1310aa
Commit
5d1310aa
authored
May 06, 2015
by
Vojtech Vitek
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #7 from goware/godoc
Godoc refinements
parents
56664cb6
b8c10418
Changes
4
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
36 additions
and
35 deletions
+36
-35
README.md
README.md
+4
-4
config.go
config.go
+12
-12
disque.go
disque.go
+19
-18
job.go
job.go
+1
-1
No files found.
README.md
View file @
5d1310aa
...
...
@@ -62,10 +62,10 @@ func main() {
| Config option | Default value | Description |
| ------------- |:-------------:| ------------ |
| Timeout | 0 | Block
s
on each operation until it returns. |
| Timeout | 0 | Block on each operation until it returns. |
| Replicate | 0 | Job doesn't need to be replicated before Add() returns. |
| Delay | 0 | Job is
add
ed immediately. |
| RetryAfter | 0 |
Job is no
t re-queue
d
automatically. |
| Delay | 0 | Job is
enqueu
ed immediately. |
| RetryAfter | 0 |
Don'
t re-queue
job
automatically. |
| TTL | 0 | Job lives until it's ACKed. |
| MaxLen | 0 | Unlimited queue. |
...
...
@@ -75,7 +75,7 @@ func main() {
jobs
,
_
:=
disque
.
Connect
(
"127.0.0.1:7711"
)
config
:=
disque
.
Config
{
Timeout
:
time
.
Second
,
// E
very
operation
timeout
s after 1s.
Timeout
:
time
.
Second
,
// E
ach
operation
fail
s after 1s
timeout elapses
.
Replicate
:
2
,
// Replicates job to 2+ nodes before Add() returns.
Delay
:
time
.
Hour
,
// Schedules the job (enqueues after 1h).
RetryAfter
:
time
.
Minute
,
// Re-queues the job after 1min of not being ACKed.
...
...
config.go
View file @
5d1310aa
...
...
@@ -4,12 +4,12 @@ import "time"
// Config represents Disque configuration for certain operations.
type
Config
struct
{
Timeout
time
.
Duration
// Each operation
will
fail after a specified timeout. Blocks by default.
Replicate
int
//
Add():
Replicate job to at least N nodes before return.
Delay
time
.
Duration
//
Add():
Schedule the job - enqueue after a specified time.
RetryAfter
time
.
Duration
//
Add():
Re-queue job after a specified time
(
between Get() and Ack()
)
.
TTL
time
.
Duration
//
Add():
Remove the job from queue after a specified time.
MaxLen
int
//
Add(): Fail
if there are more than N jobs in the queue.
Timeout
time
.
Duration
// Each operation fail
s
after a specified timeout
elapses
. Blocks by default.
Replicate
int
// Replicate job to at least N nodes before
Add()
return
s
.
Delay
time
.
Duration
// Schedule the job
on Add()
- enqueue after a specified time.
RetryAfter
time
.
Duration
// Re-queue job after a specified time
elapses
between Get() and Ack().
TTL
time
.
Duration
// Remove the job from
the
queue after a specified time.
MaxLen
int
//
Fail on Add()
if there are more than N jobs in the queue.
}
// Use applies given config to every subsequent operation of this connection.
...
...
@@ -23,37 +23,37 @@ func (conn *Conn) With(conf Config) *Conn {
return
&
Conn
{
pool
:
conn
.
pool
,
conf
:
conf
}
}
//
With applies Timeout
to a single operation.
//
Timeout option applied
to a single operation.
func
(
conn
*
Conn
)
Timeout
(
timeout
time
.
Duration
)
*
Conn
{
conn
.
conf
.
Timeout
=
timeout
return
&
Conn
{
pool
:
conn
.
pool
,
conf
:
conn
.
conf
}
}
//
With applies Replicate
to a single operation.
//
Replicate option applied
to a single operation.
func
(
conn
*
Conn
)
Replicate
(
replicate
int
)
*
Conn
{
conn
.
conf
.
Replicate
=
replicate
return
&
Conn
{
pool
:
conn
.
pool
,
conf
:
conn
.
conf
}
}
//
With applies Delay
to a single operation.
//
Delay option applied
to a single operation.
func
(
conn
*
Conn
)
Delay
(
delay
time
.
Duration
)
*
Conn
{
conn
.
conf
.
Delay
=
delay
return
&
Conn
{
pool
:
conn
.
pool
,
conf
:
conn
.
conf
}
}
//
With applies RetryAfter
to a single operation.
//
RetryAfter option applied
to a single operation.
func
(
conn
*
Conn
)
RetryAfter
(
after
time
.
Duration
)
*
Conn
{
conn
.
conf
.
RetryAfter
=
after
return
&
Conn
{
pool
:
conn
.
pool
,
conf
:
conn
.
conf
}
}
//
With
applie
s TTL
to a single operation.
//
TTL option
applie
d
to a single operation.
func
(
conn
*
Conn
)
TTL
(
ttl
time
.
Duration
)
*
Conn
{
conn
.
conf
.
TTL
=
ttl
return
&
Conn
{
pool
:
conn
.
pool
,
conf
:
conn
.
conf
}
}
//
With applies MaxLen
to a single operation.
//
MaxLen option applied
to a single operation.
func
(
conn
*
Conn
)
MaxLen
(
maxlen
int
)
*
Conn
{
conn
.
conf
.
MaxLen
=
maxlen
return
&
Conn
{
pool
:
conn
.
pool
,
conf
:
conn
.
conf
}
...
...
disque.go
View file @
5d1310aa
...
...
@@ -9,13 +9,13 @@ import (
"github.com/garyburd/redigo/redis"
)
// Conn represent
a
connection to a Disque Pool.
// Conn represent connection to a Disque Pool.
type
Conn
struct
{
pool
*
redis
.
Pool
conf
Config
}
// Connect creates
a
connection to a given Disque Pool.
// Connect creates
new
connection to a given Disque Pool.
func
Connect
(
address
string
,
extra
...
string
)
(
*
Conn
,
error
)
{
pool
:=
&
redis
.
Pool
{
MaxIdle
:
64
,
...
...
@@ -38,12 +38,12 @@ func Connect(address string, extra ...string) (*Conn, error) {
return
&
Conn
{
pool
:
pool
},
nil
}
// Close closes connection to
the
Disque Pool.
// Close closes
the
connection to
a
Disque Pool.
func
(
conn
*
Conn
)
Close
()
{
conn
.
pool
.
Close
()
}
// Ping returns nil if Disque
// Ping returns nil if Disque
Pool is alive, error otherwise.
func
(
conn
*
Conn
)
Ping
()
error
{
sess
:=
conn
.
pool
.
Get
()
defer
sess
.
Close
()
...
...
@@ -54,17 +54,18 @@ func (conn *Conn) Ping() error {
return
nil
}
// do is a helper function that workarounds redigo/redis API
flaws
//
with
reflect pkg.
// do is a helper function that workarounds redigo/redis API
//
flaws with a magic function Call() from the
reflect pkg.
//
// None of the following builds successfully:
// None of the following builds
or works
successfully:
//
// reply, err := sess.Do("GETJOB", "FROM", queue, redis.Args{})
// reply, err := sess.Do("GETJOB", "FROM", queue, redis.Args{}...)
// reply, err := sess.Do("GETJOB", "FROM", queue
, extraQueue
s)
// reply, err := sess.Do("GETJOB", "FROM", queue
, extraQueue
s...)
// reply, err := sess.Do("GETJOB", "FROM", queue
s
, redis.Args{})
// reply, err := sess.Do("GETJOB", "FROM", queue
s
, redis.Args{}...)
// reply, err := sess.Do("GETJOB", "FROM", queues)
// reply, err := sess.Do("GETJOB", "FROM", queues...)
//
// > Error: "too many arguments in call to sess.Do"
// > Build error: "too many arguments in call to sess.Do"
// > Runtime error: "ERR wrong number of arguments for '...' command"
//
func
(
conn
*
Conn
)
do
(
args
[]
interface
{})
(
interface
{},
error
)
{
sess
:=
conn
.
pool
.
Get
()
...
...
@@ -96,7 +97,7 @@ func (conn *Conn) do(args []interface{}) (interface{}, error) {
return
reply
,
nil
}
// Add enqueues new
J
ob with specified data to a given queue.
// Add enqueues new
j
ob with
a
specified data to a given queue.
func
(
conn
*
Conn
)
Add
(
data
string
,
queue
string
)
(
*
Job
,
error
)
{
args
:=
[]
interface
{}{
"ADDJOB"
,
...
...
@@ -150,8 +151,8 @@ func (conn *Conn) Add(data string, queue string) (*Job, error) {
},
nil
}
// Get returns
the
first available job from
the
highest priority
// queue (left-to-right).
// Get returns first available job from
a
highest priority
// queue
possible
(left-to-right
priority
).
func
(
conn
*
Conn
)
Get
(
queues
...
string
)
(
*
Job
,
error
)
{
if
len
(
queues
)
==
0
{
return
nil
,
errors
.
New
(
"expected at least one queue"
)
...
...
@@ -203,7 +204,7 @@ func (conn *Conn) Get(queues ...string) (*Job, error) {
},
nil
}
// Ack acknowledges (dequeues
) the
job from its
job
queue.
// Ack acknowledges (dequeues
/removes) a
job from its queue.
func
(
conn
*
Conn
)
Ack
(
job
*
Job
)
error
{
sess
:=
conn
.
pool
.
Get
()
defer
sess
.
Close
()
...
...
@@ -214,7 +215,7 @@ func (conn *Conn) Ack(job *Job) error {
return
nil
}
// Nack re-queues job into its
job
queue.
// Nack re-queues
a
job
back
into its queue.
// Native NACKJOB discussed upstream at https://github.com/antirez/disque/issues/43.
func
(
conn
*
Conn
)
Nack
(
job
*
Job
)
error
{
sess
:=
conn
.
pool
.
Get
()
...
...
@@ -226,7 +227,7 @@ func (conn *Conn) Nack(job *Job) error {
return
nil
}
// Wait waits for
the
job to finish (blocks until it's ACKed).
// Wait waits for
a
job to finish (blocks until it's ACKed).
// Native WAITJOB discussed upstream at https://github.com/antirez/disque/issues/43.
func
(
conn
*
Conn
)
Wait
(
job
*
Job
)
error
{
sess
:=
conn
.
pool
.
Get
()
...
...
job.go
View file @
5d1310aa
package
disque
// Job represents job
(and its data) belonging to a queue
.
// Job represents job
/message returned from a Disque server
.
type
Job
struct
{
ID
string
Data
string
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment