diff --git a/proto/iprep.go b/proto/iprep.go index 85717362cba0d2e0be114fe24f1166db3c66bee7..f279d54e46e21ec54eb05970d5163d1a4f65841f 100644 --- a/proto/iprep.go +++ b/proto/iprep.go @@ -88,7 +88,7 @@ func (a *Aggregate) AddEvent(e *Event) { ByIp: []*AggregateIPEntry{ &AggregateIPEntry{ Ip: e.Ip, - Count: 1, + Count: e.Count, }, }, }) diff --git a/submission/queue.go b/submission/queue.go index 2d7893ada84e1e135e9742fb8cfe038909116b4b..75bc5de621d7c369778817b3184226db81f8a16d 100644 --- a/submission/queue.go +++ b/submission/queue.go @@ -13,6 +13,8 @@ var ( defaultMaxDelay = 30 * time.Second defaultMaxStored = 1000 defaultChanBufferSize = 100 + + submitTimeout = 30 * time.Second ) type Options struct { @@ -55,27 +57,26 @@ type submissionQueue struct { opts Options evCh chan *ippb.Event agCh chan *ippb.Aggregate - cancel context.CancelFunc - done chan struct{} + stopCh chan struct{} + doneCh chan struct{} } func newSubmitter(client client.Client, opts *Options) Submitter { - opts = opts.withDefaults() - ctx, cancel := context.WithCancel(context.Background()) s := &submissionQueue{ client: client, - cancel: cancel, evCh: make(chan *ippb.Event, opts.ChanBufferSize), agCh: make(chan *ippb.Aggregate, opts.ChanBufferSize), opts: *opts, - done: make(chan struct{}), + stopCh: make(chan struct{}), + doneCh: make(chan struct{}), } - go s.run(ctx) + go s.run() return s } // New creates a new Submitter pointing at the specified collector addr. func New(addr string, opts *Options) (Submitter, error) { + opts = opts.withDefaults() c, err := client.New(addr, opts.ClientOptions) if err != nil { return nil, err @@ -84,8 +85,8 @@ func New(addr string, opts *Options) (Submitter, error) { } func (q *submissionQueue) Close() { - q.cancel() - <-q.done + close(q.stopCh) + <-q.doneCh } func (q *submissionQueue) AddEvent(ev *ippb.Event) { @@ -102,14 +103,16 @@ func (q *submissionQueue) AddAggregate(aggr *ippb.Aggregate) { } } -func (q *submissionQueue) sendAggregate(ctx context.Context, aggr *ippb.Aggregate) { +func (q *submissionQueue) sendAggregate(aggr *ippb.Aggregate) { + ctx, cancel := context.WithTimeout(context.Background(), submitTimeout) if err := q.client.Submit(ctx, nil, aggr); err != nil { log.Printf("failed to submit aggregate: %v", err) } + cancel() } -func (q *submissionQueue) run(ctx context.Context) { - defer close(q.done) +func (q *submissionQueue) run() { + defer close(q.doneCh) tick := time.NewTicker(q.opts.MaxDelay) defer tick.Stop() @@ -120,7 +123,7 @@ func (q *submissionQueue) run(ctx context.Context) { flush := func() { if curAggr != nil { - q.sendAggregate(ctx, curAggr) + q.sendAggregate(curAggr) } curAggr = nil stored = 0 @@ -158,13 +161,13 @@ func (q *submissionQueue) run(ctx context.Context) { for { select { - case <-tick.C: - flush() case ev := <-q.evCh: handleEvent(ev) case aggr := <-q.agCh: handleAggregate(aggr) - case <-ctx.Done(): + case <-tick.C: + flush() + case <-q.stopCh: flush() return } diff --git a/submission/queue_test.go b/submission/queue_test.go index 174552d779614b25d30f44a0307e2069cd6c560e..7f093e3146800a8018520bf177999fba3fe0bac4 100644 --- a/submission/queue_test.go +++ b/submission/queue_test.go @@ -3,6 +3,7 @@ package submission import ( "context" "errors" + "sync" "testing" "time" @@ -10,19 +11,25 @@ import ( ) type fakeClient struct { + mx sync.Mutex calls int - rcvd int + count int64 } func (f *fakeClient) Close() {} func (f *fakeClient) Submit(_ context.Context, evs []*ippb.Event, aggr *ippb.Aggregate) error { + f.mx.Lock() + defer f.mx.Unlock() + f.calls++ - f.rcvd += len(evs) + for _, ev := range evs { + f.count += ev.Count + } if aggr != nil { for _, bt := range aggr.ByType { for _, bi := range bt.ByIp { - f.rcvd += int(bi.Count) + f.count += bi.Count } } } @@ -75,11 +82,19 @@ func runTest(t *testing.T, tag string, opts *Options, qps float64) (int, int) { s.Close() - if fc.rcvd != n { - t.Fatalf("%s: mismatch between events sent (%d) and received (%d)", tag, n, fc.rcvd) + fc.mx.Lock() + rcvd := int(fc.count) + calls := fc.calls + fc.mx.Unlock() + + // Since there may be pending events in the channel buffer + // when we close the queue, so we trust the counter of + // received events. + if rcvd > n { + t.Fatalf("%s: received too many events: sent=%d, received=%d", tag, n, rcvd) } - return n, fc.calls + return rcvd, calls } func TestSubmitter_LowRate(t *testing.T) { @@ -104,9 +119,6 @@ func TestSubmitter_HighRate(t *testing.T) { } func TestSubmitter_HighRate_Buffered(t *testing.T) { - // TODO: Keeps failing on CI. - t.SkipNow() - // Same as above but with smaller MaxStored. sent, calls := runTest(t, "qps=1000/buffered", &Options{ MaxStored: 100,