package submission

import (
	"context"
	"log"
	"time"

	"git.autistici.org/ai3/tools/iprep/client"
	ippb "git.autistici.org/ai3/tools/iprep/proto"
	"google.golang.org/grpc"
)

var (
	defaultMaxDelay       = 30 * time.Second
	defaultMaxStored      = 1000
	defaultChanBufferSize = 100

	submitTimeout = 30 * time.Second
)

type Options struct {
	MaxDelay       time.Duration
	MaxStored      int
	ChanBufferSize int
}

func (o *Options) withDefaults() *Options {
	if o == nil {
		o = new(Options)
	}
	if o.MaxDelay == 0 {
		o.MaxDelay = defaultMaxDelay
	}
	if o.MaxStored == 0 {
		o.MaxStored = defaultMaxStored
	}
	if o.ChanBufferSize == 0 {
		o.ChanBufferSize = defaultChanBufferSize
	}
	return o
}

// A Submitter sends events to a remote iprep collector, trying to
// keep the overall request rate under control. To do so, the
// Submitter will perform local aggregation of events before sending
// them to the remote server. Options can control the maximum time
// delay and the overall memory impact of the submission queue.
type Submitter interface {
	AddEvent(*ippb.Event)
	AddAggregate(*ippb.Aggregate)
	Close()
}

type submissionQueue struct {
	client client.Client
	opts   Options
	evCh   chan *ippb.Event
	agCh   chan *ippb.Aggregate
	stopCh chan struct{}
	doneCh chan struct{}
}

func newSubmitter(client client.Client, opts *Options) Submitter {
	s := &submissionQueue{
		client: client,
		evCh:   make(chan *ippb.Event, opts.ChanBufferSize),
		agCh:   make(chan *ippb.Aggregate, opts.ChanBufferSize),
		opts:   *opts,
		stopCh: make(chan struct{}),
		doneCh: make(chan struct{}),
	}
	go s.run()
	return s
}

// New creates a new Submitter pointing at the specified collector addr.
func New(conn *grpc.ClientConn, opts *Options) Submitter {
	opts = opts.withDefaults()
	return newSubmitter(client.New(conn), opts)
}

func (q *submissionQueue) Close() {
	close(q.stopCh)
	<-q.doneCh
}

func (q *submissionQueue) AddEvent(ev *ippb.Event) {
	select {
	case q.evCh <- ev:
	default:
	}
}

func (q *submissionQueue) AddAggregate(aggr *ippb.Aggregate) {
	select {
	case q.agCh <- aggr:
	default:
	}
}

func (q *submissionQueue) sendAggregate(aggr *ippb.Aggregate) {
	ctx, cancel := context.WithTimeout(context.Background(), submitTimeout)
	if err := q.client.Submit(ctx, nil, aggr); err != nil {
		log.Printf("failed to submit aggregate: %v", err)
	}
	cancel()
}

func (q *submissionQueue) run() {
	defer close(q.doneCh)

	tick := time.NewTicker(q.opts.MaxDelay)
	defer tick.Stop()

	// In-memory buffer (as an Aggregate).
	var curAggr *ippb.Aggregate
	var stored int

	flush := func() {
		if curAggr != nil {
			q.sendAggregate(curAggr)
		}
		curAggr = nil
		stored = 0
	}

	// Flush the in-memory buffer if it has grown beyond a certain
	// size.
	shouldFlush := func() bool {
		stored++
		return stored >= q.opts.MaxStored
	}

	maybeFlush := func() {
		if shouldFlush() {
			flush()
		}
	}

	handleEvent := func(ev *ippb.Event) {
		if curAggr == nil {
			curAggr = new(ippb.Aggregate)
		}
		curAggr.AddEvent(ev)
		maybeFlush()
	}

	handleAggregate := func(aggr *ippb.Aggregate) {
		if curAggr == nil {
			curAggr = aggr
		} else {
			curAggr = curAggr.Merge(aggr)
		}
		maybeFlush()
	}

	for {
		select {
		case ev := <-q.evCh:
			handleEvent(ev)
		case aggr := <-q.agCh:
			handleAggregate(aggr)
		case <-tick.C:
			flush()
		case <-q.stopCh:
			flush()
			return
		}
	}
}