Skip to content
Snippets Groups Projects
Commit 2267db72 authored by ale's avatar ale
Browse files

Simplify the submission queue

parent 341fff20
No related branches found
No related tags found
No related merge requests found
Pipeline #15018 failed
......@@ -11,14 +11,12 @@ import (
var (
defaultMaxDelay = 30 * time.Second
defaultBurstSize = 32
defaultMaxStored = 1000
defaultChanBufferSize = 100
)
type Options struct {
MaxDelay time.Duration
BurstSize int
MaxStored int
ChanBufferSize int
......@@ -32,9 +30,6 @@ func (o *Options) withDefaults() *Options {
if o.MaxDelay == 0 {
o.MaxDelay = defaultMaxDelay
}
if o.BurstSize == 0 {
o.BurstSize = defaultBurstSize
}
if o.MaxStored == 0 {
o.MaxStored = defaultMaxStored
}
......@@ -45,9 +40,10 @@ func (o *Options) withDefaults() *Options {
}
// A Submitter sends events to a remote iprep collector, trying to
// keep the overall request rate under control: above a certain rate
// of requests (defined via Options), the Submitter will start
// aggregating and batching events.
// keep the overall request rate under control. To do so, the
// Submitter will perform local aggregation of events before sending
// them to the remote server. Options can control the maximum time
// delay and the overall memory impact of the submission queue.
type Submitter interface {
AddEvent(*ippb.Event)
AddAggregate(*ippb.Aggregate)
......@@ -60,6 +56,7 @@ type submissionQueue struct {
evCh chan *ippb.Event
agCh chan *ippb.Aggregate
cancel context.CancelFunc
done chan struct{}
}
func newSubmitter(client client.Client, opts *Options) Submitter {
......@@ -71,6 +68,7 @@ func newSubmitter(client client.Client, opts *Options) Submitter {
evCh: make(chan *ippb.Event, opts.ChanBufferSize),
agCh: make(chan *ippb.Aggregate, opts.ChanBufferSize),
opts: *opts,
done: make(chan struct{}),
}
go s.run(ctx)
return s
......@@ -87,14 +85,21 @@ func New(addr string, opts *Options) (Submitter, error) {
func (q *submissionQueue) Close() {
q.cancel()
<-q.done
}
func (q *submissionQueue) AddEvent(ev *ippb.Event) {
q.evCh <- ev
select {
case q.evCh <- ev:
default:
}
}
func (q *submissionQueue) AddAggregate(aggr *ippb.Aggregate) {
q.agCh <- aggr
select {
case q.agCh <- aggr:
default:
}
}
func (q *submissionQueue) sendAggregate(ctx context.Context, aggr *ippb.Aggregate) {
......@@ -103,95 +108,64 @@ func (q *submissionQueue) sendAggregate(ctx context.Context, aggr *ippb.Aggregat
}
}
func (q *submissionQueue) sendEvent(ctx context.Context, ev *ippb.Event) {
if err := q.client.Submit(ctx, []*ippb.Event{ev}, nil); err != nil {
log.Printf("failed to submit event: %v", err)
}
}
func (q *submissionQueue) run(ctx context.Context) {
defer close(q.done)
tick := time.NewTicker(q.opts.MaxDelay)
defer tick.Stop()
// Rate limiter.
var stamp time.Time
var count int
// In-memory buffer (as an Aggregate).
var curAggr *ippb.Aggregate
var stored int
allow := func(now time.Time) bool {
count++
if now.Sub(stamp) > q.opts.MaxDelay {
count = 1
stamp = now
return true
}
if count < q.opts.BurstSize {
return true
}
return false
}
flush := func(now time.Time) {
flush := func() {
if curAggr != nil {
q.sendAggregate(ctx, curAggr)
// Mess with the ratelimiter.
stamp = now
count = stored
}
curAggr = nil
stored = 0
}
maybeFlush := func(now time.Time) {
// Flush the in-memory buffer if it has grown beyond a certain
// size.
shouldFlush := func() bool {
stored++
if stored > q.opts.MaxStored {
flush(now)
return stored >= q.opts.MaxStored
}
maybeFlush := func() {
if shouldFlush() {
flush()
}
}
addEvent := func(now time.Time, ev *ippb.Event) {
handleEvent := func(ev *ippb.Event) {
if curAggr == nil {
curAggr = new(ippb.Aggregate)
}
curAggr.AddEvent(ev)
maybeFlush(now)
maybeFlush()
}
addAggregate := func(now time.Time, aggr *ippb.Aggregate) {
handleAggregate := func(aggr *ippb.Aggregate) {
if curAggr == nil {
curAggr = aggr
} else {
curAggr = curAggr.Merge(aggr)
}
maybeFlush(now)
}
handleEvent := func(now time.Time, ev *ippb.Event) {
if allow(now) {
q.sendEvent(ctx, ev)
} else {
addEvent(now, ev)
}
}
handleAggregate := func(now time.Time, aggr *ippb.Aggregate) {
if allow(now) {
q.sendAggregate(ctx, aggr)
} else {
addAggregate(now, aggr)
}
maybeFlush()
}
for {
select {
case t := <-tick.C:
flush(t)
case <-tick.C:
flush()
case ev := <-q.evCh:
handleEvent(time.Now(), ev)
handleEvent(ev)
case aggr := <-q.agCh:
handleAggregate(time.Now(), aggr)
handleAggregate(aggr)
case <-ctx.Done():
flush()
return
}
}
......
......@@ -56,7 +56,7 @@ func rateDo(timeout time.Duration, qps float64, f func()) int {
}
func runTest(t *testing.T, tag string, opts *Options, qps float64) (int, int) {
t.Parallel()
//t.Parallel()
fc := new(fakeClient)
if opts == nil {
......@@ -66,8 +66,6 @@ func runTest(t *testing.T, tag string, opts *Options, qps float64) (int, int) {
s := newSubmitter(fc, opts)
defer s.Close()
// Verify that a low-rate of events gets sent through largely
// unmolested.
n := rateDo(3*time.Second, qps, func() {
s.AddEvent(&ippb.Event{
Ip: "1.2.3.4",
......@@ -85,6 +83,8 @@ func runTest(t *testing.T, tag string, opts *Options, qps float64) (int, int) {
}
func TestSubmitter_LowRate(t *testing.T) {
// Verify that a low-rate of events gets sent through largely
// unmolested.
sent, calls := runTest(t, "qps=10", nil, 1)
if sent != calls {
t.Fatalf("wrong number of calls: %d (expected %d)", calls, sent)
......@@ -92,21 +92,23 @@ func TestSubmitter_LowRate(t *testing.T) {
}
func TestSubmitter_HighRate(t *testing.T) {
// A high qps rate gets batched into MaxDelay time chunks (set
// MaxStored very high so it does not factor in).
sent, calls := runTest(t, "qps=1000", &Options{
BurstSize: 2,
MaxStored: 100000,
}, 1000)
expected := 2 + sent/1000
expected := 3
if calls != expected {
t.Fatalf("sent=%d calls=%d, expected=%d", sent, calls, expected)
}
}
func TestSubmitter_HighRate_Buffered(t *testing.T) {
// Same as above but with smaller MaxStored.
sent, calls := runTest(t, "qps=1000/buffered", &Options{
BurstSize: 2,
MaxStored: 100,
}, 1000)
expected := 2 + sent/100
expected := (sent + 99) / 100
if calls != expected {
t.Fatalf("sent=%d calls=%d, expected=%d", sent, calls, expected)
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment