Select Git revision
-
ale authored
The app is now self-hosted instead of relying on the static-content standalone server, so we can eventually add dynamic code for graph serving. The static content serving has improved, with more consistent cache header management, as well as the capability of serving pre-compressed content. Additional code to implement the generation of dependency (flow) graphs in dot format was added (not hooked to the HTTP server yet).
ale authoredThe app is now self-hosted instead of relying on the static-content standalone server, so we can eventually add dynamic code for graph serving. The static content serving has improved, with more consistent cache header management, as well as the capability of serving pre-compressed content. Additional code to implement the generation of dependency (flow) graphs in dot format was added (not hooked to the HTTP server yet).
server.go 3.28 KiB
package server
import (
"context"
"log"
"time"
"github.com/golang/protobuf/ptypes/empty"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"git.autistici.org/ai3/tools/iprep/db"
"git.autistici.org/ai3/tools/iprep/ext"
ippb "git.autistici.org/ai3/tools/iprep/proto"
"git.autistici.org/ai3/tools/iprep/script"
)
type Server struct {
*script.Manager
ippb.UnimplementedIpRepServer
ext map[string]ext.ExternalSource
horizon time.Duration
db db.DB
stop chan struct{}
}
var (
emptyResponse = new(empty.Empty)
defaultHorizon = 6 * time.Hour
dataMaxAge = 168 * time.Hour // one week
defaultScript = `score = 0`
)
func New(dbPath, scriptPath string, extSrcs map[string]ext.ExternalSource, mask ippb.IPMask) (*Server, error) {
scriptMgr, err := script.NewManager(scriptPath, defaultScript)
if err != nil {
return nil, err
}
database, err := db.Open(dbPath, mask)
if err != nil {
return nil, err
}
s := &Server{
Manager: scriptMgr,
ext: extSrcs,
horizon: defaultHorizon,
db: database,
stop: make(chan struct{}),
}
go s.cleaner()
return s, nil
}
func (s *Server) Close() {
close(s.stop)
s.db.Close()
}
func (s *Server) Submit(ctx context.Context, req *ippb.SubmitRequest) (*empty.Empty, error) {
// Reduce the request to a single Aggregate.
var aggr *ippb.Aggregate
for _, a := range req.Aggregates {
if aggr == nil {
aggr = a
} else {
aggr = aggr.Merge(a)
}
}
for _, e := range req.Events {
if aggr == nil {
aggr = new(ippb.Aggregate)
}
aggr.AddEvent(e)
}
// Load it into the database if it's not empty.
if len(aggr.ByType) == 0 {
return nil, status.Errorf(codes.InvalidArgument, "empty input")
}
err := s.db.AddAggregate(aggr)
return emptyResponse, err
}
func (s *Server) GetScore(ctx context.Context, req *ippb.GetScoreRequest) (*ippb.GetScoreResponse, error) {
horizon := s.horizon
if req.Horizon > 0 {
horizon = time.Second * time.Duration(req.Horizon)
}
counts, err := s.db.ScanIP(time.Now().Add(-horizon), req.Ip)
if err != nil {
return nil, status.Errorf(codes.Unavailable, "%v", err)
}
score, err := s.Script().RunIP(ctx, req.Ip, counts, horizon.Seconds(), s.ext)
if err != nil {
return nil, status.Errorf(codes.Internal, "%v", err)
}
return &ippb.GetScoreResponse{
Ip: req.Ip,
Score: float32(score),
}, nil
}
func (s *Server) GetAllScores(req *ippb.GetAllScoresRequest, stream ippb.IpRep_GetAllScoresServer) error {
horizon := s.horizon
if req.Horizon > 0 {
horizon = time.Second * time.Duration(req.Horizon)
}
script := s.Script()
return s.db.ScanAll(time.Now().Add(-horizon), func(ip string, m map[string]int64) error {
score, err := script.RunIP(context.Background(), ip, m, horizon.Seconds(), s.ext)
if err != nil {
log.Printf("script error on ip %s: %v", ip, err)
return nil
}
if float32(score) < req.Threshold {
return nil
}
return stream.Send(&ippb.GetScoreResponse{
Ip: ip,
Score: float32(score),
})
})
}
func (s *Server) cleaner() {
t := time.NewTicker(6 * time.Hour)
defer t.Stop()
for {
select {
case <-s.stop:
return
case <-t.C:
n, err := s.db.WipeOldData(dataMaxAge)
if err != nil {
log.Printf("error cleaning up database: %v", err)
} else if n > 0 {
log.Printf("cleaned up %d stale entries", n)
}
}
}
}