package node import ( "context" "errors" "fmt" "log" "net" "sync" "time" "git.autistici.org/ale/autoradio" "git.autistici.org/ale/autoradio/coordination/presence" pb "git.autistici.org/ale/autoradio/proto" "git.autistici.org/ale/autoradio/util" "go.etcd.io/etcd/client/v3" "google.golang.org/grpc" ) var ( gossipInterval = 3 * time.Second gossipTimeout = 5 * time.Second ) func mergeNodes(curNodes, newNodes []*pb.Status) []*pb.Status { tmp := make(map[string]*pb.Status) for _, node := range curNodes { tmp[node.Name] = node } for _, node := range newNodes { cur, ok := tmp[node.Name] if !ok || cur.Timestamp < node.Timestamp { tmp[node.Name] = node } } out := make([]*pb.Status, 0, len(tmp)) for _, node := range tmp { out = append(out, node) } return out } func withoutNode(nodes []*pb.Status, name string) []*pb.Status { out := make([]*pb.Status, 0, len(nodes)) for _, n := range nodes { if name != n.Name { out = append(out, n) } } return out } type statusManager struct { pb.UnimplementedGossipServiceServer peers *presence.EndpointSet conns *util.ConnCache mx sync.Mutex // Invariant: statuses[0] is always self. self *pb.Status statuses []*pb.Status } // NewStatusManager returns a new status manager object, that will // receive status information from nodes, and propagate it among the // other frontends with a gossip-like protocol. The statusManager // implements both the GossipService and the StatusService GRPC // interfaces. func newStatusManager(ctx context.Context, cli *clientv3.Client) *statusManager { // Start a watcher on the 'frontend/gossip' endpoints. peers := presence.WatchEndpoints(ctx, cli, autoradio.StatusEndpointPrefix) m := &statusManager{ peers: peers, conns: util.NewConnCache(grpc.WithInsecure()), } go util.RunCron(ctx, gossipInterval, m.tick) // nolint return m } func (m *statusManager) update(self *pb.Status) { m.mx.Lock() m.self = self if len(m.statuses) == 0 { m.statuses = []*pb.Status{self} } else { m.statuses[0] = self } m.mx.Unlock() } func (m *statusManager) getStatus() []*pb.Status { m.mx.Lock() defer m.mx.Unlock() return append([]*pb.Status{}, m.statuses...) } func (m *statusManager) buildStatusUpdate() (endpoint *pb.Endpoint, statuses []*pb.Status) { m.mx.Lock() defer m.mx.Unlock() if m.self != nil { endpoint = m.peers.RandomEndpointExcluding(m.self.Name) } statuses = append([]*pb.Status{}, m.statuses...) return } func (m *statusManager) propagateStatus(ctx context.Context) error { // Pick a randomly selected peer to send our update to. endpoint, statuses := m.buildStatusUpdate() if endpoint == nil { return errors.New("no targets available") } // Get a cached GRPC connection for our target. If there is an // RPC error, invalidate the connection and reconnect on our // next attempt. target := endpoint.Addrs[0] conn, err := m.conns.Get(ctx, target) if err != nil { return fmt.Errorf("connect: %s: %v", target, err) } client := pb.NewGossipServiceClient(conn) resp, err := client.Exchange(ctx, &pb.ExchangeRequest{ Nodes: statuses, }) if err != nil { m.conns.Drop(target, conn) return fmt.Errorf("Exchange: %s: %v", target, err) } // Merge the node info provided in the response. m.mx.Lock() m.mergeRemoteStatuses(resp.Nodes) m.mx.Unlock() return nil } func (m *statusManager) mergeRemoteStatuses(remote []*pb.Status) { if len(remote) == 0 { return } s := mergeNodes(m.statuses, remote) if m.self != nil { s = withoutNode(s, m.self.Name) s = append([]*pb.Status{m.self}, s...) } m.statuses = s } // Export the number of known nodes, and the timestamp of the oldest // update seen, to monitoring, so we can have a rough idea of when the // gossip protocol isn't working. func (m *statusManager) updateMetrics() { m.mx.Lock() var oldest uint64 for _, s := range m.statuses { t := s.Timestamp if oldest == 0 || t < oldest { oldest = t } } gossipNumNodes.Set(float64(len(m.statuses))) gossipOldestTS.Set(float64(oldest)) m.mx.Unlock() } func (m *statusManager) tick(ctx context.Context) { pctx, cancel := context.WithTimeout(ctx, gossipTimeout) defer cancel() err := m.propagateStatus(pctx) if err != nil && err != context.Canceled { log.Printf("status: gossip error: %v", err) } m.updateMetrics() } func (m *statusManager) Exchange(ctx context.Context, req *pb.ExchangeRequest) (*pb.ExchangeResponse, error) { m.mx.Lock() defer m.mx.Unlock() // Ignore our peer's idea about ourselves. m.mergeRemoteStatuses(req.Nodes) return &pb.ExchangeResponse{ Nodes: m.statuses, }, nil } // genericServer wrapper for a grpc.Server running the statusManager service. type statusServer struct { *grpc.Server addr string } func newStatusServer(addr string, statusMgr *statusManager) *statusServer { grpcServer := grpc.NewServer() pb.RegisterGossipServiceServer(grpcServer, statusMgr) return &statusServer{ Server: grpcServer, addr: addr, } } func (s *statusServer) Name() string { return "status" } func (s *statusServer) Start(ctx context.Context) error { l, err := net.Listen("tcp", s.addr) if err != nil { return err } defer l.Close() //nolint return runGRPCServerWithContext(ctx, s.Server, l) } func runGRPCServerWithContext(ctx context.Context, srv *grpc.Server, l net.Listener) error { go func() { <-ctx.Done() srv.Stop() }() return srv.Serve(l) }