Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
What's new
7
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Open sidebar
ale
disque
Commits
78b615f2
Commit
78b615f2
authored
May 05, 2015
by
Vojtech Vitek (V-Teq)
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Fix default Timeout, add Config to README
parent
a6a49736
Changes
4
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
92 additions
and
38 deletions
+92
-38
README.md
README.md
+56
-24
config.go
config.go
+0
-4
disque.go
disque.go
+1
-1
disque_test.go
disque_test.go
+35
-9
No files found.
README.md
View file @
78b615f2
...
...
@@ -15,43 +15,75 @@
## Producer
```
go
jobs
,
_
:=
disque
.
Connect
(
"127.0.0.1:7711"
)
// Enqueue job with "high" priority.
jobs
.
Add
(
data1
,
"high"
)
import
(
"github.com/goware/disque"
)
// Enqueue job with "low" priority (and remove it after 24 hours if not ACKed).
jobs
.
TTL
(
24
*
time
.
Hour
)
.
Add
(
data2
,
"low"
)
func
main
()
{
// Connect to Disque pool.
jobs
,
_
:=
disque
.
Connect
(
"127.0.0.1:7711"
)
// Accepts more arguments.
// Enqueue job with "urgent" priority. Consumers will have one minute to Ack() the job after they Get() it, or it will be re-queued.
jobs
.
RetryAfter
(
time
.
Minute
)
.
Add
(
data3
,
"urgent"
)
// Enqueue three jobs with different priorities.
job1
,
_
:=
jobs
.
Add
(
data1
,
"high"
)
job2
,
_
:=
jobs
.
Add
(
data2
,
"low"
)
job3
,
_
:=
jobs
.
Add
(
data3
,
"urgent"
)
// Enqueue job with "urgent" priority and wait for it to finish.
job4
,
err
:=
jobs
.
Add
(
data4
,
"urgent"
)
if
err
!=
nil
{
jobs
.
Wait
(
job4
)
// Block until job3 is done.
jobs
.
Wait
(
job3
)
}
```
## Consumer (worker)
```
go
jobs
,
_
:=
disque
.
Connect
(
"127.0.0.1:7711"
)
import
(
"github.com/goware/disque"
)
func
main
()
{
// Connect to Disque pool.
jobs
,
_
:=
disque
.
Connect
(
"127.0.0.1:7711"
)
// Accepts more arguments.
for
{
// Get job from highest priority queue possible. Blocks by default.
job
,
_
:=
jobs
.
Get
(
"urgent"
,
"high"
,
"low"
)
// Left-right priority.
// Do some hard work with the job data.
if
err
:=
Process
(
job
.
Data
);
err
!=
nil
{
// Failed. Re-queue the job.
jobs
.
Nack
(
job
)
}
// Acknowledge (dequeue) the job.
jobs
.
Ack
(
job
)
}
}
```
for
{
// Dequeue a job from highest priority queue (priority left to right).
job
,
_
:=
jobs
.
Get
(
"urgent"
,
"high"
,
"low"
)
## Custom config (Timeout, Replicate, Delay, Retry, TTL, MaxLen)
// Do some hard work with the job data.
err
:=
Process
(
job
.
Data
)
if
err
!=
nil
{
// Re-queue the job. This may be triggered by Panic/Recover.
jobs
.
Nack
(
job
)
}
```
go
jobs
,
_
:=
disque
.
Connect
(
"127.0.0.1:7711"
)
// Acknowledge (dequeue) the job. Success.
jobs
.
Ack
(
job
)
config
:=
disque
.
Config
{
Timeout
:
500
*
time
.
Second
,
// Each operation will fail after 1s. It blocks by default.
Replicate
:
2
,
// Add(): Replicate job to at least two nodes before return.
Delay
:
time
.
Hour
,
// Add(): Schedule the job - enqueue after one hour.
RetryAfter
:
time
.
Minute
,
// Add(): Re-queue job after 1min (time between Get() and Ack()).
TTL
:
24
*
time
.
Hour
,
// Add(): Remove the job from queue after one day.
MaxLen
:
1000
,
// Add(): Fail if there are more than 1000 jobs in the queue.
}
// Apply globally.
jobs
.
Use
(
config
)
// Apply to a single operation.
jobs
.
With
(
config
)
.
Add
(
data
,
"queue"
)
// Apply single option to a single operation.
jobs
.
Timeout
(
time
.
Second
)
.
Get
(
"queue"
,
"queue2"
)
jobs
.
MaxLen
(
1000
)
.
RetryAfter
(
time
.
Minute
)
.
Add
(
data
,
"queue"
)
jobs
.
Timeout
(
time
.
Second
)
.
Add
(
data
,
"queue"
)
```
## License
...
...
config.go
View file @
78b615f2
...
...
@@ -11,10 +11,6 @@ type Config struct {
MaxLen
int
}
var
defaultConfig
=
Config
{
Timeout
:
500
*
time
.
Millisecond
,
}
func
(
conn
*
Conn
)
Use
(
conf
Config
)
*
Conn
{
conn
.
conf
=
conf
return
conn
...
...
disque.go
View file @
78b615f2
...
...
@@ -33,7 +33,7 @@ func Connect(address string, extra ...string) (*Conn, error) {
},
}
return
&
Conn
{
pool
:
pool
,
conf
:
defaultConfig
},
nil
return
&
Conn
{
pool
:
pool
},
nil
}
func
(
conn
*
Conn
)
Close
()
{
...
...
disque_test.go
View file @
78b615f2
...
...
@@ -36,7 +36,7 @@ func TestDelay(t *testing.T) {
}
// The job should not exist yet.
_
,
err
=
jobs
.
Get
(
"test:delay"
)
_
,
err
=
jobs
.
Timeout
(
time
.
Millisecond
)
.
Get
(
"test:delay"
)
if
err
==
nil
{
t
.
Fatal
(
"expected error"
)
}
...
...
@@ -61,7 +61,7 @@ func TestTTL(t *testing.T) {
defer
jobs
.
Close
()
// Enqueue job with TTL one second.
_
,
err
=
jobs
.
TTL
(
time
.
Second
)
.
Add
(
"data1"
,
"test:ttl"
)
_
,
err
=
jobs
.
Timeout
(
time
.
Millisecond
)
.
TTL
(
time
.
Second
)
.
Add
(
"data1"
,
"test:ttl"
)
if
err
!=
nil
{
t
.
Error
(
err
)
}
...
...
@@ -89,7 +89,7 @@ func TestTimeoutRetryAfter(t *testing.T) {
t
.
Error
(
err
)
}
//
Dequeu
e job.
//
Get th
e job.
_
,
err
=
jobs
.
Get
(
"test:retry"
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
...
...
@@ -97,7 +97,7 @@ func TestTimeoutRetryAfter(t *testing.T) {
// Don't Ack() to pretend consumer failure.
// Try to
dequeu
e job again..
// Try to
get th
e job again..
// We should hit time-out for the first time..
_
,
err
=
jobs
.
Timeout
(
250
*
time
.
Millisecond
)
.
Get
(
"test:retry"
)
if
err
==
nil
{
...
...
@@ -138,7 +138,7 @@ func TestPriorityQueue(t *testing.T) {
t
.
Error
(
err
)
}
//
Dequeue
first job.
//
Get
first job.
job
,
err
:=
jobs
.
Get
(
"test:urgent"
,
"test:high"
,
"test:low"
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
...
...
@@ -154,7 +154,7 @@ func TestPriorityQueue(t *testing.T) {
t
.
Fatalf
(
"expected %s, got %s"
,
e
,
job
.
Data
)
}
//
Dequeue
second job.
//
Get
second job.
job
,
err
=
jobs
.
Get
(
"test:urgent"
,
"test:high"
,
"test:low"
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
...
...
@@ -170,7 +170,7 @@ func TestPriorityQueue(t *testing.T) {
t
.
Fatalf
(
"expected %s, got %s"
,
e
,
job
.
Data
)
}
//
Dequeue
third job and re-queue it again.
//
Get
third job and re-queue it again.
job
,
err
=
jobs
.
Get
(
"test:urgent"
,
"test:high"
,
"test:low"
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
...
...
@@ -186,7 +186,7 @@ func TestPriorityQueue(t *testing.T) {
t
.
Fatalf
(
"expected %s, got %s"
,
e
,
job
.
Data
)
}
//
Dequeue
third job again.
//
Get
third job again.
job
,
err
=
jobs
.
Get
(
"test:urgent"
,
"test:high"
,
"test:low"
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
...
...
@@ -219,7 +219,7 @@ func TestWait(t *testing.T) {
}
go
func
()
{
//
Dequeue
the job.
//
Get
the job.
job
,
err
:=
jobs
.
Get
(
"test:wait"
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
...
...
@@ -238,3 +238,29 @@ func TestWait(t *testing.T) {
t
.
Fatalf
(
"expected 1.0s - 1.5s, got %v"
,
time
.
Since
(
start
))
}
}
func
TestConfig
(
t
*
testing
.
T
)
{
// Connect to Disque.
jobs
,
err
:=
disque
.
Connect
(
"127.0.0.1:7711"
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
defer
jobs
.
Close
()
config
:=
disque
.
Config
{
Timeout
:
time
.
Millisecond
,
}
// Should fail on timeout.
_
,
err
=
jobs
.
With
(
config
)
.
Get
(
"test:non-existant-queue"
)
if
err
==
nil
{
t
.
Fatal
(
"expected error"
)
}
// Should fail on timeout.
jobs
.
Use
(
config
)
_
,
err
=
jobs
.
Get
(
"test:non-existant-queue"
)
if
err
==
nil
{
t
.
Fatal
(
"expected error"
)
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment