package node

import (
	"context"
	"errors"
	"fmt"
	"log"
	"net"
	"sync"
	"time"

	"git.autistici.org/ale/autoradio"
	"git.autistici.org/ale/autoradio/coordination/presence"
	pb "git.autistici.org/ale/autoradio/proto"
	"git.autistici.org/ale/autoradio/util"
	"go.etcd.io/etcd/client/v3"
	"google.golang.org/grpc"
)

var (
	gossipInterval = 3 * time.Second
	gossipTimeout  = 5 * time.Second
)

func mergeNodes(curNodes, newNodes []*pb.Status) []*pb.Status {
	tmp := make(map[string]*pb.Status)
	for _, node := range curNodes {
		tmp[node.Name] = node
	}
	for _, node := range newNodes {
		cur, ok := tmp[node.Name]
		if !ok || cur.Timestamp < node.Timestamp {
			tmp[node.Name] = node
		}
	}

	out := make([]*pb.Status, 0, len(tmp))
	for _, node := range tmp {
		out = append(out, node)
	}
	return out
}

func withoutNode(nodes []*pb.Status, name string) []*pb.Status {
	out := make([]*pb.Status, 0, len(nodes))
	for _, n := range nodes {
		if name != n.Name {
			out = append(out, n)
		}
	}
	return out
}

type statusManager struct {
	pb.UnimplementedGossipServiceServer

	peers *presence.EndpointSet
	conns *util.ConnCache

	mx sync.Mutex
	// Invariant: statuses[0] is always self.
	self     *pb.Status
	statuses []*pb.Status
}

// NewStatusManager returns a new status manager object, that will
// receive status information from nodes, and propagate it among the
// other frontends with a gossip-like protocol. The statusManager
// implements both the GossipService and the StatusService GRPC
// interfaces.
func newStatusManager(ctx context.Context, cli *clientv3.Client) *statusManager {
	// Start a watcher on the 'frontend/gossip' endpoints.
	peers := presence.WatchEndpoints(ctx, cli, autoradio.StatusEndpointPrefix)

	m := &statusManager{
		peers: peers,
		conns: util.NewConnCache(grpc.WithInsecure()),
	}
	go util.RunCron(ctx, gossipInterval, m.tick) // nolint
	return m
}

func (m *statusManager) update(self *pb.Status) {
	m.mx.Lock()
	m.self = self
	if len(m.statuses) == 0 {
		m.statuses = []*pb.Status{self}
	} else {
		m.statuses[0] = self
	}
	m.mx.Unlock()
}

func (m *statusManager) getStatus() []*pb.Status {
	m.mx.Lock()
	defer m.mx.Unlock()
	return append([]*pb.Status{}, m.statuses...)
}

func (m *statusManager) buildStatusUpdate() (endpoint *pb.Endpoint, statuses []*pb.Status) {
	m.mx.Lock()
	defer m.mx.Unlock()

	if m.self != nil {
		endpoint = m.peers.RandomEndpointExcluding(m.self.Name)
	}
	statuses = append([]*pb.Status{}, m.statuses...)
	return
}

func (m *statusManager) propagateStatus(ctx context.Context) error {
	// Pick a randomly selected peer to send our update to.
	endpoint, statuses := m.buildStatusUpdate()
	if endpoint == nil {
		return errors.New("no targets available")
	}

	// Get a cached GRPC connection for our target. If there is an
	// RPC error, invalidate the connection and reconnect on our
	// next attempt.
	target := endpoint.Addrs[0]
	conn, err := m.conns.Get(ctx, target)
	if err != nil {
		return fmt.Errorf("connect: %s: %v", target, err)
	}
	client := pb.NewGossipServiceClient(conn)
	resp, err := client.Exchange(ctx, &pb.ExchangeRequest{
		Nodes: statuses,
	})
	if err != nil {
		m.conns.Drop(target, conn)
		return fmt.Errorf("Exchange: %s: %v", target, err)
	}

	// Merge the node info provided in the response.
	m.mx.Lock()
	m.mergeRemoteStatuses(resp.Nodes)
	m.mx.Unlock()

	return nil
}

func (m *statusManager) mergeRemoteStatuses(remote []*pb.Status) {
	if len(remote) == 0 {
		return
	}

	s := mergeNodes(m.statuses, remote)
	if m.self != nil {
		s = withoutNode(s, m.self.Name)
		s = append([]*pb.Status{m.self}, s...)
	}
	m.statuses = s
}

// Export the number of known nodes, and the timestamp of the oldest
// update seen, to monitoring, so we can have a rough idea of when the
// gossip protocol isn't working.
func (m *statusManager) updateMetrics() {
	m.mx.Lock()
	var oldest uint64
	for _, s := range m.statuses {
		t := s.Timestamp
		if oldest == 0 || t < oldest {
			oldest = t
		}
	}

	gossipNumNodes.Set(float64(len(m.statuses)))
	gossipOldestTS.Set(float64(oldest))
	m.mx.Unlock()
}

func (m *statusManager) tick(ctx context.Context) {
	pctx, cancel := context.WithTimeout(ctx, gossipTimeout)
	defer cancel()

	err := m.propagateStatus(pctx)
	if err != nil && err != context.Canceled {
		log.Printf("status: gossip error: %v", err)
	}

	m.updateMetrics()
}

func (m *statusManager) Exchange(ctx context.Context, req *pb.ExchangeRequest) (*pb.ExchangeResponse, error) {
	m.mx.Lock()
	defer m.mx.Unlock()

	// Ignore our peer's idea about ourselves.
	m.mergeRemoteStatuses(req.Nodes)
	return &pb.ExchangeResponse{
		Nodes: m.statuses,
	}, nil
}

// genericServer wrapper for a grpc.Server running the statusManager service.
type statusServer struct {
	*grpc.Server
	addr string
}

func newStatusServer(addr string, statusMgr *statusManager) *statusServer {
	grpcServer := grpc.NewServer()
	pb.RegisterGossipServiceServer(grpcServer, statusMgr)
	return &statusServer{
		Server: grpcServer,
		addr:   addr,
	}
}

func (s *statusServer) Name() string { return "status" }

func (s *statusServer) Start(ctx context.Context) error {
	l, err := net.Listen("tcp", s.addr)
	if err != nil {
		return err
	}
	defer l.Close() //nolint

	return runGRPCServerWithContext(ctx, s.Server, l)
}

func runGRPCServerWithContext(ctx context.Context, srv *grpc.Server, l net.Listener) error {
	go func() {
		<-ctx.Done()
		srv.Stop()
	}()
	return srv.Serve(l)
}