Skip to content
Snippets Groups Projects
Select Git revision
  • b1b0f1bafe4c8458a11e0b54cbbac849a79e4bc4
  • master default
  • renovate/golang.org-x-crypto-0.x
  • renovate/go-1.x
  • renovate/golang.org-x-sync-0.x
  • renovate/opentelemetry-go-monorepo
  • renovate/github.com-go-webauthn-webauthn-0.x
  • renovate/github.com-mattn-go-sqlite3-1.x
  • renovate/github.com-go-ldap-ldap-v3-3.x
  • renovate/github.com-prometheus-client_golang-1.x
  • renovate/github.com-google-go-cmp-0.x
  • renovate/github.com-lunixbochs-struc-digest
  • renovate/github.com-duo-labs-webauthn-digest
13 results

main.go

Blame
  • server.go 3.28 KiB
    package server
    
    import (
    	"context"
    	"log"
    	"time"
    
    	"github.com/golang/protobuf/ptypes/empty"
    	"google.golang.org/grpc/codes"
    	"google.golang.org/grpc/status"
    
    	"git.autistici.org/ai3/tools/iprep/db"
    	"git.autistici.org/ai3/tools/iprep/ext"
    	ippb "git.autistici.org/ai3/tools/iprep/proto"
    	"git.autistici.org/ai3/tools/iprep/script"
    )
    
    type Server struct {
    	*script.Manager
    	ippb.UnimplementedIpRepServer
    
    	ext     map[string]ext.ExternalSource
    	horizon time.Duration
    	db      db.DB
    	stop    chan struct{}
    }
    
    var (
    	emptyResponse = new(empty.Empty)
    
    	defaultHorizon = 6 * time.Hour
    	dataMaxAge     = 168 * time.Hour // one week
    
    	defaultScript = `score = 0`
    )
    
    func New(dbPath, scriptPath string, extSrcs map[string]ext.ExternalSource, mask ippb.IPMask) (*Server, error) {
    	scriptMgr, err := script.NewManager(scriptPath, defaultScript)
    	if err != nil {
    		return nil, err
    	}
    
    	database, err := db.Open(dbPath, mask)
    	if err != nil {
    		return nil, err
    	}
    
    	s := &Server{
    		Manager: scriptMgr,
    		ext:     extSrcs,
    		horizon: defaultHorizon,
    		db:      database,
    		stop:    make(chan struct{}),
    	}
    	go s.cleaner()
    	return s, nil
    }
    
    func (s *Server) Close() {
    	close(s.stop)
    	s.db.Close()
    }
    
    func (s *Server) Submit(ctx context.Context, req *ippb.SubmitRequest) (*empty.Empty, error) {
    	// Reduce the request to a single Aggregate.
    	var aggr *ippb.Aggregate
    	for _, a := range req.Aggregates {
    		if aggr == nil {
    			aggr = a
    		} else {
    			aggr = aggr.Merge(a)
    		}
    	}
    	for _, e := range req.Events {
    		if aggr == nil {
    			aggr = new(ippb.Aggregate)
    		}
    		aggr.AddEvent(e)
    	}
    
    	// Load it into the database if it's not empty.
    	if len(aggr.ByType) == 0 {
    		return nil, status.Errorf(codes.InvalidArgument, "empty input")
    	}
    	err := s.db.AddAggregate(aggr)
    
    	return emptyResponse, err
    }
    
    func (s *Server) GetScore(ctx context.Context, req *ippb.GetScoreRequest) (*ippb.GetScoreResponse, error) {
    	horizon := s.horizon
    	if req.Horizon > 0 {
    		horizon = time.Second * time.Duration(req.Horizon)
    	}
    	counts, err := s.db.ScanIP(time.Now().Add(-horizon), req.Ip)
    	if err != nil {
    		return nil, status.Errorf(codes.Unavailable, "%v", err)
    	}
    
    	score, err := s.Script().RunIP(ctx, req.Ip, counts, horizon.Seconds(), s.ext)
    	if err != nil {
    		return nil, status.Errorf(codes.Internal, "%v", err)
    	}
    
    	return &ippb.GetScoreResponse{
    		Ip:    req.Ip,
    		Score: float32(score),
    	}, nil
    }
    
    func (s *Server) GetAllScores(req *ippb.GetAllScoresRequest, stream ippb.IpRep_GetAllScoresServer) error {
    	horizon := s.horizon
    	if req.Horizon > 0 {
    		horizon = time.Second * time.Duration(req.Horizon)
    	}
    	script := s.Script()
    	return s.db.ScanAll(time.Now().Add(-horizon), func(ip string, m map[string]int64) error {
    		score, err := script.RunIP(context.Background(), ip, m, horizon.Seconds(), s.ext)
    		if err != nil {
    			log.Printf("script error on ip %s: %v", ip, err)
    			return nil
    		}
    
    		if float32(score) < req.Threshold {
    			return nil
    		}
    
    		return stream.Send(&ippb.GetScoreResponse{
    			Ip:    ip,
    			Score: float32(score),
    		})
    	})
    }
    
    func (s *Server) cleaner() {
    	t := time.NewTicker(6 * time.Hour)
    	defer t.Stop()
    	for {
    		select {
    		case <-s.stop:
    			return
    		case <-t.C:
    			n, err := s.db.WipeOldData(dataMaxAge)
    			if err != nil {
    				log.Printf("error cleaning up database: %v", err)
    			} else if n > 0 {
    				log.Printf("cleaned up %d stale entries", n)
    			}
    		}
    	}
    }