Skip to content
Snippets Groups Projects
Commit aee11ca1 authored by ale's avatar ale
Browse files

Do not use a Context to control the queue manager

Context was unnecessary when a simple internal channel will do. Also
fix queue_test.
parent f13e7293
Branches
No related tags found
No related merge requests found
Pipeline #15046 failed
......@@ -88,7 +88,7 @@ func (a *Aggregate) AddEvent(e *Event) {
ByIp: []*AggregateIPEntry{
&AggregateIPEntry{
Ip: e.Ip,
Count: 1,
Count: e.Count,
},
},
})
......
......@@ -13,6 +13,8 @@ var (
defaultMaxDelay = 30 * time.Second
defaultMaxStored = 1000
defaultChanBufferSize = 100
submitTimeout = 30 * time.Second
)
type Options struct {
......@@ -55,27 +57,26 @@ type submissionQueue struct {
opts Options
evCh chan *ippb.Event
agCh chan *ippb.Aggregate
cancel context.CancelFunc
done chan struct{}
stopCh chan struct{}
doneCh chan struct{}
}
func newSubmitter(client client.Client, opts *Options) Submitter {
opts = opts.withDefaults()
ctx, cancel := context.WithCancel(context.Background())
s := &submissionQueue{
client: client,
cancel: cancel,
evCh: make(chan *ippb.Event, opts.ChanBufferSize),
agCh: make(chan *ippb.Aggregate, opts.ChanBufferSize),
opts: *opts,
done: make(chan struct{}),
stopCh: make(chan struct{}),
doneCh: make(chan struct{}),
}
go s.run(ctx)
go s.run()
return s
}
// New creates a new Submitter pointing at the specified collector addr.
func New(addr string, opts *Options) (Submitter, error) {
opts = opts.withDefaults()
c, err := client.New(addr, opts.ClientOptions)
if err != nil {
return nil, err
......@@ -84,8 +85,8 @@ func New(addr string, opts *Options) (Submitter, error) {
}
func (q *submissionQueue) Close() {
q.cancel()
<-q.done
close(q.stopCh)
<-q.doneCh
}
func (q *submissionQueue) AddEvent(ev *ippb.Event) {
......@@ -102,14 +103,16 @@ func (q *submissionQueue) AddAggregate(aggr *ippb.Aggregate) {
}
}
func (q *submissionQueue) sendAggregate(ctx context.Context, aggr *ippb.Aggregate) {
func (q *submissionQueue) sendAggregate(aggr *ippb.Aggregate) {
ctx, cancel := context.WithTimeout(context.Background(), submitTimeout)
if err := q.client.Submit(ctx, nil, aggr); err != nil {
log.Printf("failed to submit aggregate: %v", err)
}
cancel()
}
func (q *submissionQueue) run(ctx context.Context) {
defer close(q.done)
func (q *submissionQueue) run() {
defer close(q.doneCh)
tick := time.NewTicker(q.opts.MaxDelay)
defer tick.Stop()
......@@ -120,7 +123,7 @@ func (q *submissionQueue) run(ctx context.Context) {
flush := func() {
if curAggr != nil {
q.sendAggregate(ctx, curAggr)
q.sendAggregate(curAggr)
}
curAggr = nil
stored = 0
......@@ -158,13 +161,13 @@ func (q *submissionQueue) run(ctx context.Context) {
for {
select {
case <-tick.C:
flush()
case ev := <-q.evCh:
handleEvent(ev)
case aggr := <-q.agCh:
handleAggregate(aggr)
case <-ctx.Done():
case <-tick.C:
flush()
case <-q.stopCh:
flush()
return
}
......
......@@ -3,6 +3,7 @@ package submission
import (
"context"
"errors"
"sync"
"testing"
"time"
......@@ -10,19 +11,25 @@ import (
)
type fakeClient struct {
mx sync.Mutex
calls int
rcvd int
count int64
}
func (f *fakeClient) Close() {}
func (f *fakeClient) Submit(_ context.Context, evs []*ippb.Event, aggr *ippb.Aggregate) error {
f.mx.Lock()
defer f.mx.Unlock()
f.calls++
f.rcvd += len(evs)
for _, ev := range evs {
f.count += ev.Count
}
if aggr != nil {
for _, bt := range aggr.ByType {
for _, bi := range bt.ByIp {
f.rcvd += int(bi.Count)
f.count += bi.Count
}
}
}
......@@ -75,11 +82,19 @@ func runTest(t *testing.T, tag string, opts *Options, qps float64) (int, int) {
s.Close()
if fc.rcvd != n {
t.Fatalf("%s: mismatch between events sent (%d) and received (%d)", tag, n, fc.rcvd)
fc.mx.Lock()
rcvd := int(fc.count)
calls := fc.calls
fc.mx.Unlock()
// Since there may be pending events in the channel buffer
// when we close the queue, so we trust the counter of
// received events.
if rcvd > n {
t.Fatalf("%s: received too many events: sent=%d, received=%d", tag, n, rcvd)
}
return n, fc.calls
return rcvd, calls
}
func TestSubmitter_LowRate(t *testing.T) {
......@@ -104,9 +119,6 @@ func TestSubmitter_HighRate(t *testing.T) {
}
func TestSubmitter_HighRate_Buffered(t *testing.T) {
// TODO: Keeps failing on CI.
t.SkipNow()
// Same as above but with smaller MaxStored.
sent, calls := runTest(t, "qps=1000/buffered", &Options{
MaxStored: 100,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment