Select Git revision
server.go 3.28 KiB
package server
import (
"context"
"log"
"time"
"github.com/golang/protobuf/ptypes/empty"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"git.autistici.org/ai3/tools/iprep/db"
"git.autistici.org/ai3/tools/iprep/ext"
ippb "git.autistici.org/ai3/tools/iprep/proto"
"git.autistici.org/ai3/tools/iprep/script"
)
type Server struct {
*script.Manager
ippb.UnimplementedIpRepServer
ext map[string]ext.ExternalSource
horizon time.Duration
db db.DB
stop chan struct{}
}
var (
emptyResponse = new(empty.Empty)
defaultHorizon = 6 * time.Hour
dataMaxAge = 168 * time.Hour // one week
defaultScript = `score = 0`
)
func New(dbPath, scriptPath string, extSrcs map[string]ext.ExternalSource, mask ippb.IPMask) (*Server, error) {
scriptMgr, err := script.NewManager(scriptPath, defaultScript)
if err != nil {
return nil, err
}
database, err := db.Open(dbPath, mask)
if err != nil {
return nil, err
}
s := &Server{
Manager: scriptMgr,
ext: extSrcs,
horizon: defaultHorizon,
db: database,
stop: make(chan struct{}),
}
go s.cleaner()
return s, nil
}
func (s *Server) Close() {
close(s.stop)
s.db.Close()
}
func (s *Server) Submit(ctx context.Context, req *ippb.SubmitRequest) (*empty.Empty, error) {
// Reduce the request to a single Aggregate.
var aggr *ippb.Aggregate
for _, a := range req.Aggregates {
if aggr == nil {
aggr = a
} else {
aggr = aggr.Merge(a)
}
}
for _, e := range req.Events {
if aggr == nil {
aggr = new(ippb.Aggregate)
}
aggr.AddEvent(e)
}
// Load it into the database if it's not empty.
if len(aggr.ByType) == 0 {
return nil, status.Errorf(codes.InvalidArgument, "empty input")
}
err := s.db.AddAggregate(aggr)
return emptyResponse, err
}
func (s *Server) GetScore(ctx context.Context, req *ippb.GetScoreRequest) (*ippb.GetScoreResponse, error) {
horizon := s.horizon
if req.Horizon > 0 {
horizon = time.Second * time.Duration(req.Horizon)
}
counts, err := s.db.ScanIP(time.Now().Add(-horizon), req.Ip)
if err != nil {
return nil, status.Errorf(codes.Unavailable, "%v", err)
}
score, err := s.Script().RunIP(ctx, req.Ip, counts, horizon.Seconds(), s.ext)
if err != nil {
return nil, status.Errorf(codes.Internal, "%v", err)
}
return &ippb.GetScoreResponse{
Ip: req.Ip,
Score: float32(score),
}, nil
}
func (s *Server) GetAllScores(req *ippb.GetAllScoresRequest, stream ippb.IpRep_GetAllScoresServer) error {
horizon := s.horizon
if req.Horizon > 0 {
horizon = time.Second * time.Duration(req.Horizon)
}
script := s.Script()
return s.db.ScanAll(time.Now().Add(-horizon), func(ip string, m map[string]int64) error {
score, err := script.RunIP(context.Background(), ip, m, horizon.Seconds(), s.ext)
if err != nil {
log.Printf("script error on ip %s: %v", ip, err)
return nil
}
if float32(score) < req.Threshold {
return nil
}
return stream.Send(&ippb.GetScoreResponse{
Ip: ip,
Score: float32(score),
})
})
}
func (s *Server) cleaner() {
t := time.NewTicker(6 * time.Hour)
defer t.Stop()
for {
select {
case <-s.stop:
return
case <-t.C:
n, err := s.db.WipeOldData(dataMaxAge)
if err != nil {
log.Printf("error cleaning up database: %v", err)
} else if n > 0 {
log.Printf("cleaned up %d stale entries", n)
}
}
}
}