package submission import ( "context" "log" "time" "git.autistici.org/ai3/tools/iprep/client" ippb "git.autistici.org/ai3/tools/iprep/proto" "google.golang.org/grpc" ) var ( defaultMaxDelay = 30 * time.Second defaultMaxStored = 1000 defaultChanBufferSize = 100 submitTimeout = 30 * time.Second ) type Options struct { MaxDelay time.Duration MaxStored int ChanBufferSize int } func (o *Options) withDefaults() *Options { if o == nil { o = new(Options) } if o.MaxDelay == 0 { o.MaxDelay = defaultMaxDelay } if o.MaxStored == 0 { o.MaxStored = defaultMaxStored } if o.ChanBufferSize == 0 { o.ChanBufferSize = defaultChanBufferSize } return o } // A Submitter sends events to a remote iprep collector, trying to // keep the overall request rate under control. To do so, the // Submitter will perform local aggregation of events before sending // them to the remote server. Options can control the maximum time // delay and the overall memory impact of the submission queue. type Submitter interface { AddEvent(*ippb.Event) AddAggregate(*ippb.Aggregate) Close() } type submissionQueue struct { client client.Client opts Options evCh chan *ippb.Event agCh chan *ippb.Aggregate stopCh chan struct{} doneCh chan struct{} } func newSubmitter(client client.Client, opts *Options) Submitter { s := &submissionQueue{ client: client, evCh: make(chan *ippb.Event, opts.ChanBufferSize), agCh: make(chan *ippb.Aggregate, opts.ChanBufferSize), opts: *opts, stopCh: make(chan struct{}), doneCh: make(chan struct{}), } go s.run() return s } // New creates a new Submitter pointing at the specified collector addr. func New(conn *grpc.ClientConn, opts *Options) Submitter { opts = opts.withDefaults() return newSubmitter(client.New(conn), opts) } func (q *submissionQueue) Close() { close(q.stopCh) <-q.doneCh } func (q *submissionQueue) AddEvent(ev *ippb.Event) { select { case q.evCh <- ev: default: } } func (q *submissionQueue) AddAggregate(aggr *ippb.Aggregate) { select { case q.agCh <- aggr: default: } } func (q *submissionQueue) sendAggregate(aggr *ippb.Aggregate) { ctx, cancel := context.WithTimeout(context.Background(), submitTimeout) if err := q.client.Submit(ctx, nil, aggr); err != nil { log.Printf("failed to submit aggregate: %v", err) } cancel() } func (q *submissionQueue) run() { defer close(q.doneCh) tick := time.NewTicker(q.opts.MaxDelay) defer tick.Stop() // In-memory buffer (as an Aggregate). var curAggr *ippb.Aggregate var stored int flush := func() { if curAggr != nil { q.sendAggregate(curAggr) } curAggr = nil stored = 0 } // Flush the in-memory buffer if it has grown beyond a certain // size. shouldFlush := func() bool { stored++ return stored >= q.opts.MaxStored } maybeFlush := func() { if shouldFlush() { flush() } } handleEvent := func(ev *ippb.Event) { if curAggr == nil { curAggr = new(ippb.Aggregate) } curAggr.AddEvent(ev) maybeFlush() } handleAggregate := func(aggr *ippb.Aggregate) { if curAggr == nil { curAggr = aggr } else { curAggr = curAggr.Merge(aggr) } maybeFlush() } for { select { case ev := <-q.evCh: handleEvent(ev) case aggr := <-q.agCh: handleAggregate(aggr) case <-tick.C: flush() case <-q.stopCh: flush() return } } }