Select Git revision
-
ale authored
Make it possible to apply an IP mask (different for v4 and v6) to all addresses stored and retrieved from the database. This is transparent to the API (you can still send and query individual IPs). The default behavior becomes to aggregate away IPv6 addresses to a /64 network, while there is no change to IPv4 handling.
ale authoredMake it possible to apply an IP mask (different for v4 and v6) to all addresses stored and retrieved from the database. This is transparent to the API (you can still send and query individual IPs). The default behavior becomes to aggregate away IPv6 addresses to a /64 network, while there is no change to IPv4 handling.
server.go 3.28 KiB
package server
import (
"context"
"log"
"time"
"github.com/golang/protobuf/ptypes/empty"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"git.autistici.org/ai3/tools/iprep/db"
"git.autistici.org/ai3/tools/iprep/ext"
ippb "git.autistici.org/ai3/tools/iprep/proto"
"git.autistici.org/ai3/tools/iprep/script"
)
type Server struct {
*script.Manager
ippb.UnimplementedIpRepServer
ext map[string]ext.ExternalSource
horizon time.Duration
db db.DB
stop chan struct{}
}
var (
emptyResponse = new(empty.Empty)
defaultHorizon = 6 * time.Hour
dataMaxAge = 168 * time.Hour // one week
defaultScript = `score = 0`
)
func New(dbPath, scriptPath string, extSrcs map[string]ext.ExternalSource, mask ippb.IPMask) (*Server, error) {
scriptMgr, err := script.NewManager(scriptPath, defaultScript)
if err != nil {
return nil, err
}
database, err := db.Open(dbPath, mask)
if err != nil {
return nil, err
}
s := &Server{
Manager: scriptMgr,
ext: extSrcs,
horizon: defaultHorizon,
db: database,
stop: make(chan struct{}),
}
go s.cleaner()
return s, nil
}
func (s *Server) Close() {
close(s.stop)
s.db.Close()
}
func (s *Server) Submit(ctx context.Context, req *ippb.SubmitRequest) (*empty.Empty, error) {
// Reduce the request to a single Aggregate.
var aggr *ippb.Aggregate
for _, a := range req.Aggregates {
if aggr == nil {
aggr = a
} else {
aggr = aggr.Merge(a)
}
}
for _, e := range req.Events {
if aggr == nil {
aggr = new(ippb.Aggregate)
}
aggr.AddEvent(e)
}
// Load it into the database if it's not empty.
if len(aggr.ByType) == 0 {
return nil, status.Errorf(codes.InvalidArgument, "empty input")
}
err := s.db.AddAggregate(aggr)
return emptyResponse, err
}
func (s *Server) GetScore(ctx context.Context, req *ippb.GetScoreRequest) (*ippb.GetScoreResponse, error) {
horizon := s.horizon
if req.Horizon > 0 {
horizon = time.Second * time.Duration(req.Horizon)
}
counts, err := s.db.ScanIP(time.Now().Add(-horizon), req.Ip)
if err != nil {
return nil, status.Errorf(codes.Unavailable, "%v", err)
}
score, err := s.Script().RunIP(ctx, req.Ip, counts, horizon.Seconds(), s.ext)
if err != nil {
return nil, status.Errorf(codes.Internal, "%v", err)
}
return &ippb.GetScoreResponse{
Ip: req.Ip,
Score: float32(score),
}, nil
}
func (s *Server) GetAllScores(req *ippb.GetAllScoresRequest, stream ippb.IpRep_GetAllScoresServer) error {
horizon := s.horizon
if req.Horizon > 0 {
horizon = time.Second * time.Duration(req.Horizon)
}
script := s.Script()
return s.db.ScanAll(time.Now().Add(-horizon), func(ip string, m map[string]int64) error {
score, err := script.RunIP(context.Background(), ip, m, horizon.Seconds(), s.ext)
if err != nil {
log.Printf("script error on ip %s: %v", ip, err)
return nil
}
if float32(score) < req.Threshold {
return nil
}
return stream.Send(&ippb.GetScoreResponse{
Ip: ip,
Score: float32(score),
})
})
}
func (s *Server) cleaner() {
t := time.NewTicker(6 * time.Hour)
defer t.Stop()
for {
select {
case <-s.stop:
return
case <-t.C:
n, err := s.db.WipeOldData(dataMaxAge)
if err != nil {
log.Printf("error cleaning up database: %v", err)
} else if n > 0 {
log.Printf("cleaned up %d stale entries", n)
}
}
}
}