From 2267db72183ed8115665e122c60dab163291e90e Mon Sep 17 00:00:00 2001
From: ale <ale@incal.net>
Date: Mon, 19 Apr 2021 09:07:21 +0100
Subject: [PATCH] Simplify the submission queue

---
 submission/queue.go      | 100 +++++++++++++++------------------------
 submission/queue_test.go |  16 ++++---
 2 files changed, 46 insertions(+), 70 deletions(-)

diff --git a/submission/queue.go b/submission/queue.go
index 0f4df51..2d7893a 100644
--- a/submission/queue.go
+++ b/submission/queue.go
@@ -11,14 +11,12 @@ import (
 
 var (
 	defaultMaxDelay       = 30 * time.Second
-	defaultBurstSize      = 32
 	defaultMaxStored      = 1000
 	defaultChanBufferSize = 100
 )
 
 type Options struct {
 	MaxDelay       time.Duration
-	BurstSize      int
 	MaxStored      int
 	ChanBufferSize int
 
@@ -32,9 +30,6 @@ func (o *Options) withDefaults() *Options {
 	if o.MaxDelay == 0 {
 		o.MaxDelay = defaultMaxDelay
 	}
-	if o.BurstSize == 0 {
-		o.BurstSize = defaultBurstSize
-	}
 	if o.MaxStored == 0 {
 		o.MaxStored = defaultMaxStored
 	}
@@ -45,9 +40,10 @@ func (o *Options) withDefaults() *Options {
 }
 
 // A Submitter sends events to a remote iprep collector, trying to
-// keep the overall request rate under control: above a certain rate
-// of requests (defined via Options), the Submitter will start
-// aggregating and batching events.
+// keep the overall request rate under control. To do so, the
+// Submitter will perform local aggregation of events before sending
+// them to the remote server. Options can control the maximum time
+// delay and the overall memory impact of the submission queue.
 type Submitter interface {
 	AddEvent(*ippb.Event)
 	AddAggregate(*ippb.Aggregate)
@@ -60,6 +56,7 @@ type submissionQueue struct {
 	evCh   chan *ippb.Event
 	agCh   chan *ippb.Aggregate
 	cancel context.CancelFunc
+	done   chan struct{}
 }
 
 func newSubmitter(client client.Client, opts *Options) Submitter {
@@ -71,6 +68,7 @@ func newSubmitter(client client.Client, opts *Options) Submitter {
 		evCh:   make(chan *ippb.Event, opts.ChanBufferSize),
 		agCh:   make(chan *ippb.Aggregate, opts.ChanBufferSize),
 		opts:   *opts,
+		done:   make(chan struct{}),
 	}
 	go s.run(ctx)
 	return s
@@ -87,14 +85,21 @@ func New(addr string, opts *Options) (Submitter, error) {
 
 func (q *submissionQueue) Close() {
 	q.cancel()
+	<-q.done
 }
 
 func (q *submissionQueue) AddEvent(ev *ippb.Event) {
-	q.evCh <- ev
+	select {
+	case q.evCh <- ev:
+	default:
+	}
 }
 
 func (q *submissionQueue) AddAggregate(aggr *ippb.Aggregate) {
-	q.agCh <- aggr
+	select {
+	case q.agCh <- aggr:
+	default:
+	}
 }
 
 func (q *submissionQueue) sendAggregate(ctx context.Context, aggr *ippb.Aggregate) {
@@ -103,95 +108,64 @@ func (q *submissionQueue) sendAggregate(ctx context.Context, aggr *ippb.Aggregat
 	}
 }
 
-func (q *submissionQueue) sendEvent(ctx context.Context, ev *ippb.Event) {
-	if err := q.client.Submit(ctx, []*ippb.Event{ev}, nil); err != nil {
-		log.Printf("failed to submit event: %v", err)
-	}
-}
-
 func (q *submissionQueue) run(ctx context.Context) {
+	defer close(q.done)
+
 	tick := time.NewTicker(q.opts.MaxDelay)
 	defer tick.Stop()
 
-	// Rate limiter.
-	var stamp time.Time
-	var count int
-
 	// In-memory buffer (as an Aggregate).
 	var curAggr *ippb.Aggregate
 	var stored int
 
-	allow := func(now time.Time) bool {
-		count++
-		if now.Sub(stamp) > q.opts.MaxDelay {
-			count = 1
-			stamp = now
-			return true
-		}
-		if count < q.opts.BurstSize {
-			return true
-		}
-		return false
-	}
-
-	flush := func(now time.Time) {
+	flush := func() {
 		if curAggr != nil {
 			q.sendAggregate(ctx, curAggr)
-			// Mess with the ratelimiter.
-			stamp = now
-			count = stored
 		}
 		curAggr = nil
 		stored = 0
 	}
-	maybeFlush := func(now time.Time) {
+
+	// Flush the in-memory buffer if it has grown beyond a certain
+	// size.
+	shouldFlush := func() bool {
 		stored++
-		if stored > q.opts.MaxStored {
-			flush(now)
+		return stored >= q.opts.MaxStored
+	}
+
+	maybeFlush := func() {
+		if shouldFlush() {
+			flush()
 		}
 	}
 
-	addEvent := func(now time.Time, ev *ippb.Event) {
+	handleEvent := func(ev *ippb.Event) {
 		if curAggr == nil {
 			curAggr = new(ippb.Aggregate)
 		}
 		curAggr.AddEvent(ev)
-		maybeFlush(now)
+		maybeFlush()
 	}
 
-	addAggregate := func(now time.Time, aggr *ippb.Aggregate) {
+	handleAggregate := func(aggr *ippb.Aggregate) {
 		if curAggr == nil {
 			curAggr = aggr
 		} else {
 			curAggr = curAggr.Merge(aggr)
 		}
-		maybeFlush(now)
-	}
-
-	handleEvent := func(now time.Time, ev *ippb.Event) {
-		if allow(now) {
-			q.sendEvent(ctx, ev)
-		} else {
-			addEvent(now, ev)
-		}
-	}
-	handleAggregate := func(now time.Time, aggr *ippb.Aggregate) {
-		if allow(now) {
-			q.sendAggregate(ctx, aggr)
-		} else {
-			addAggregate(now, aggr)
-		}
+		maybeFlush()
 	}
 
 	for {
 		select {
-		case t := <-tick.C:
-			flush(t)
+		case <-tick.C:
+			flush()
 		case ev := <-q.evCh:
-			handleEvent(time.Now(), ev)
+			handleEvent(ev)
 		case aggr := <-q.agCh:
-			handleAggregate(time.Now(), aggr)
+			handleAggregate(aggr)
 		case <-ctx.Done():
+			flush()
 			return
 		}
 	}
diff --git a/submission/queue_test.go b/submission/queue_test.go
index 9595e25..02b841e 100644
--- a/submission/queue_test.go
+++ b/submission/queue_test.go
@@ -56,7 +56,7 @@ func rateDo(timeout time.Duration, qps float64, f func()) int {
 }
 
 func runTest(t *testing.T, tag string, opts *Options, qps float64) (int, int) {
-	t.Parallel()
+	//t.Parallel()
 
 	fc := new(fakeClient)
 	if opts == nil {
@@ -66,8 +66,6 @@ func runTest(t *testing.T, tag string, opts *Options, qps float64) (int, int) {
 	s := newSubmitter(fc, opts)
 	defer s.Close()
 
-	// Verify that a low-rate of events gets sent through largely
-	// unmolested.
 	n := rateDo(3*time.Second, qps, func() {
 		s.AddEvent(&ippb.Event{
 			Ip:    "1.2.3.4",
@@ -85,6 +83,8 @@ func runTest(t *testing.T, tag string, opts *Options, qps float64) (int, int) {
 }
 
 func TestSubmitter_LowRate(t *testing.T) {
+	// Verify that a low-rate of events gets sent through largely
+	// unmolested.
 	sent, calls := runTest(t, "qps=10", nil, 1)
 	if sent != calls {
 		t.Fatalf("wrong number of calls: %d (expected %d)", calls, sent)
@@ -92,21 +92,23 @@ func TestSubmitter_LowRate(t *testing.T) {
 }
 
 func TestSubmitter_HighRate(t *testing.T) {
+	// A high qps rate gets batched into MaxDelay time chunks (set
+	// MaxStored very high so it does not factor in).
 	sent, calls := runTest(t, "qps=1000", &Options{
-		BurstSize: 2,
+		MaxStored: 100000,
 	}, 1000)
-	expected := 2 + sent/1000
+	expected := 3
 	if calls != expected {
 		t.Fatalf("sent=%d calls=%d, expected=%d", sent, calls, expected)
 	}
 }
 
 func TestSubmitter_HighRate_Buffered(t *testing.T) {
+	// Same as above but with smaller MaxStored.
 	sent, calls := runTest(t, "qps=1000/buffered", &Options{
-		BurstSize: 2,
 		MaxStored: 100,
 	}, 1000)
-	expected := 2 + sent/100
+	expected := (sent + 99) / 100
 	if calls != expected {
 		t.Fatalf("sent=%d calls=%d, expected=%d", sent, calls, expected)
 	}
-- 
GitLab