Skip to content
Snippets Groups Projects
Select Git revision
  • a11b46dd8fcf9b4d0cf876376c4d571d82108e87
  • master default protected
  • renovate/bootstrap-5.x
  • renovate/purgecss-webpack-plugin-7.x
  • renovate/mini-css-extract-plugin-2.x
  • renovate/html-webpack-plugin-5.x
  • renovate/golang-1.x
  • renovate/css-loader-6.x
8 results

main.go

Blame
    • ale's avatar
      0def9013
      Switch to a self-hosted binary, add graph-related code · 0def9013
      ale authored
      The app is now self-hosted instead of relying on the static-content
      standalone server, so we can eventually add dynamic code for graph
      serving.
      
      The static content serving has improved, with more consistent cache
      header management, as well as the capability of serving pre-compressed
      content.
      
      Additional code to implement the generation of dependency (flow)
      graphs in dot format was added (not hooked to the HTTP server yet).
      0def9013
      History
      Switch to a self-hosted binary, add graph-related code
      ale authored
      The app is now self-hosted instead of relying on the static-content
      standalone server, so we can eventually add dynamic code for graph
      serving.
      
      The static content serving has improved, with more consistent cache
      header management, as well as the capability of serving pre-compressed
      content.
      
      Additional code to implement the generation of dependency (flow)
      graphs in dot format was added (not hooked to the HTTP server yet).
    server.go 3.28 KiB
    package server
    
    import (
    	"context"
    	"log"
    	"time"
    
    	"github.com/golang/protobuf/ptypes/empty"
    	"google.golang.org/grpc/codes"
    	"google.golang.org/grpc/status"
    
    	"git.autistici.org/ai3/tools/iprep/db"
    	"git.autistici.org/ai3/tools/iprep/ext"
    	ippb "git.autistici.org/ai3/tools/iprep/proto"
    	"git.autistici.org/ai3/tools/iprep/script"
    )
    
    type Server struct {
    	*script.Manager
    	ippb.UnimplementedIpRepServer
    
    	ext     map[string]ext.ExternalSource
    	horizon time.Duration
    	db      db.DB
    	stop    chan struct{}
    }
    
    var (
    	emptyResponse = new(empty.Empty)
    
    	defaultHorizon = 6 * time.Hour
    	dataMaxAge     = 168 * time.Hour // one week
    
    	defaultScript = `score = 0`
    )
    
    func New(dbPath, scriptPath string, extSrcs map[string]ext.ExternalSource, mask ippb.IPMask) (*Server, error) {
    	scriptMgr, err := script.NewManager(scriptPath, defaultScript)
    	if err != nil {
    		return nil, err
    	}
    
    	database, err := db.Open(dbPath, mask)
    	if err != nil {
    		return nil, err
    	}
    
    	s := &Server{
    		Manager: scriptMgr,
    		ext:     extSrcs,
    		horizon: defaultHorizon,
    		db:      database,
    		stop:    make(chan struct{}),
    	}
    	go s.cleaner()
    	return s, nil
    }
    
    func (s *Server) Close() {
    	close(s.stop)
    	s.db.Close()
    }
    
    func (s *Server) Submit(ctx context.Context, req *ippb.SubmitRequest) (*empty.Empty, error) {
    	// Reduce the request to a single Aggregate.
    	var aggr *ippb.Aggregate
    	for _, a := range req.Aggregates {
    		if aggr == nil {
    			aggr = a
    		} else {
    			aggr = aggr.Merge(a)
    		}
    	}
    	for _, e := range req.Events {
    		if aggr == nil {
    			aggr = new(ippb.Aggregate)
    		}
    		aggr.AddEvent(e)
    	}
    
    	// Load it into the database if it's not empty.
    	if len(aggr.ByType) == 0 {
    		return nil, status.Errorf(codes.InvalidArgument, "empty input")
    	}
    	err := s.db.AddAggregate(aggr)
    
    	return emptyResponse, err
    }
    
    func (s *Server) GetScore(ctx context.Context, req *ippb.GetScoreRequest) (*ippb.GetScoreResponse, error) {
    	horizon := s.horizon
    	if req.Horizon > 0 {
    		horizon = time.Second * time.Duration(req.Horizon)
    	}
    	counts, err := s.db.ScanIP(time.Now().Add(-horizon), req.Ip)
    	if err != nil {
    		return nil, status.Errorf(codes.Unavailable, "%v", err)
    	}
    
    	score, err := s.Script().RunIP(ctx, req.Ip, counts, horizon.Seconds(), s.ext)
    	if err != nil {
    		return nil, status.Errorf(codes.Internal, "%v", err)
    	}
    
    	return &ippb.GetScoreResponse{
    		Ip:    req.Ip,
    		Score: float32(score),
    	}, nil
    }
    
    func (s *Server) GetAllScores(req *ippb.GetAllScoresRequest, stream ippb.IpRep_GetAllScoresServer) error {
    	horizon := s.horizon
    	if req.Horizon > 0 {
    		horizon = time.Second * time.Duration(req.Horizon)
    	}
    	script := s.Script()
    	return s.db.ScanAll(time.Now().Add(-horizon), func(ip string, m map[string]int64) error {
    		score, err := script.RunIP(context.Background(), ip, m, horizon.Seconds(), s.ext)
    		if err != nil {
    			log.Printf("script error on ip %s: %v", ip, err)
    			return nil
    		}
    
    		if float32(score) < req.Threshold {
    			return nil
    		}
    
    		return stream.Send(&ippb.GetScoreResponse{
    			Ip:    ip,
    			Score: float32(score),
    		})
    	})
    }
    
    func (s *Server) cleaner() {
    	t := time.NewTicker(6 * time.Hour)
    	defer t.Stop()
    	for {
    		select {
    		case <-s.stop:
    			return
    		case <-t.C:
    			n, err := s.db.WipeOldData(dataMaxAge)
    			if err != nil {
    				log.Printf("error cleaning up database: %v", err)
    			} else if n > 0 {
    				log.Printf("cleaned up %d stale entries", n)
    			}
    		}
    	}
    }