Skip to content
Snippets Groups Projects
Select Git revision
  • 9446198149d34d9d3329a89b43eacf2ecd93b343
  • master default protected
  • renovate/github.com-mattn-go-sqlite3-1.x
  • renovate/golang-1.x
  • renovate/github.com-oschwald-maxminddb-golang-1.x
  • renovate/github.com-prometheus-client_golang-1.x
  • renovate/google.golang.org-protobuf-1.x
  • renovate/golang.org-x-sync-digest
  • renovate/github.com-d5-tengo-v2-2.x
  • renovate/google.golang.org-grpc-1.x
  • renovate/gopkg.in-yaml.v3-3.x
  • renovate/github.com-golang-migrate-migrate-v4-4.x
  • renovate/github.com-google-go-cmp-0.x
13 results

server.go

Blame
    • ale's avatar
      94461981
      Implement IP masking in the database · 94461981
      ale authored
      Make it possible to apply an IP mask (different for v4 and v6) to all
      addresses stored and retrieved from the database. This is transparent
      to the API (you can still send and query individual IPs).
      
      The default behavior becomes to aggregate away IPv6 addresses to a /64
      network, while there is no change to IPv4 handling.
      94461981
      History
      Implement IP masking in the database
      ale authored
      Make it possible to apply an IP mask (different for v4 and v6) to all
      addresses stored and retrieved from the database. This is transparent
      to the API (you can still send and query individual IPs).
      
      The default behavior becomes to aggregate away IPv6 addresses to a /64
      network, while there is no change to IPv4 handling.
    server.go 3.28 KiB
    package server
    
    import (
    	"context"
    	"log"
    	"time"
    
    	"github.com/golang/protobuf/ptypes/empty"
    	"google.golang.org/grpc/codes"
    	"google.golang.org/grpc/status"
    
    	"git.autistici.org/ai3/tools/iprep/db"
    	"git.autistici.org/ai3/tools/iprep/ext"
    	ippb "git.autistici.org/ai3/tools/iprep/proto"
    	"git.autistici.org/ai3/tools/iprep/script"
    )
    
    type Server struct {
    	*script.Manager
    	ippb.UnimplementedIpRepServer
    
    	ext     map[string]ext.ExternalSource
    	horizon time.Duration
    	db      db.DB
    	stop    chan struct{}
    }
    
    var (
    	emptyResponse = new(empty.Empty)
    
    	defaultHorizon = 6 * time.Hour
    	dataMaxAge     = 168 * time.Hour // one week
    
    	defaultScript = `score = 0`
    )
    
    func New(dbPath, scriptPath string, extSrcs map[string]ext.ExternalSource, mask ippb.IPMask) (*Server, error) {
    	scriptMgr, err := script.NewManager(scriptPath, defaultScript)
    	if err != nil {
    		return nil, err
    	}
    
    	database, err := db.Open(dbPath, mask)
    	if err != nil {
    		return nil, err
    	}
    
    	s := &Server{
    		Manager: scriptMgr,
    		ext:     extSrcs,
    		horizon: defaultHorizon,
    		db:      database,
    		stop:    make(chan struct{}),
    	}
    	go s.cleaner()
    	return s, nil
    }
    
    func (s *Server) Close() {
    	close(s.stop)
    	s.db.Close()
    }
    
    func (s *Server) Submit(ctx context.Context, req *ippb.SubmitRequest) (*empty.Empty, error) {
    	// Reduce the request to a single Aggregate.
    	var aggr *ippb.Aggregate
    	for _, a := range req.Aggregates {
    		if aggr == nil {
    			aggr = a
    		} else {
    			aggr = aggr.Merge(a)
    		}
    	}
    	for _, e := range req.Events {
    		if aggr == nil {
    			aggr = new(ippb.Aggregate)
    		}
    		aggr.AddEvent(e)
    	}
    
    	// Load it into the database if it's not empty.
    	if len(aggr.ByType) == 0 {
    		return nil, status.Errorf(codes.InvalidArgument, "empty input")
    	}
    	err := s.db.AddAggregate(aggr)
    
    	return emptyResponse, err
    }
    
    func (s *Server) GetScore(ctx context.Context, req *ippb.GetScoreRequest) (*ippb.GetScoreResponse, error) {
    	horizon := s.horizon
    	if req.Horizon > 0 {
    		horizon = time.Second * time.Duration(req.Horizon)
    	}
    	counts, err := s.db.ScanIP(time.Now().Add(-horizon), req.Ip)
    	if err != nil {
    		return nil, status.Errorf(codes.Unavailable, "%v", err)
    	}
    
    	score, err := s.Script().RunIP(ctx, req.Ip, counts, horizon.Seconds(), s.ext)
    	if err != nil {
    		return nil, status.Errorf(codes.Internal, "%v", err)
    	}
    
    	return &ippb.GetScoreResponse{
    		Ip:    req.Ip,
    		Score: float32(score),
    	}, nil
    }
    
    func (s *Server) GetAllScores(req *ippb.GetAllScoresRequest, stream ippb.IpRep_GetAllScoresServer) error {
    	horizon := s.horizon
    	if req.Horizon > 0 {
    		horizon = time.Second * time.Duration(req.Horizon)
    	}
    	script := s.Script()
    	return s.db.ScanAll(time.Now().Add(-horizon), func(ip string, m map[string]int64) error {
    		score, err := script.RunIP(context.Background(), ip, m, horizon.Seconds(), s.ext)
    		if err != nil {
    			log.Printf("script error on ip %s: %v", ip, err)
    			return nil
    		}
    
    		if float32(score) < req.Threshold {
    			return nil
    		}
    
    		return stream.Send(&ippb.GetScoreResponse{
    			Ip:    ip,
    			Score: float32(score),
    		})
    	})
    }
    
    func (s *Server) cleaner() {
    	t := time.NewTicker(6 * time.Hour)
    	defer t.Stop()
    	for {
    		select {
    		case <-s.stop:
    			return
    		case <-t.C:
    			n, err := s.db.WipeOldData(dataMaxAge)
    			if err != nil {
    				log.Printf("error cleaning up database: %v", err)
    			} else if n > 0 {
    				log.Printf("cleaned up %d stale entries", n)
    			}
    		}
    	}
    }