Commit eca1810c authored by ale's avatar ale
Browse files

Add more CLI commands to make use of the RPC API

parent d3b24fed
......@@ -2,6 +2,8 @@ package client
import (
"context"
"io"
"log"
ippb "git.autistici.org/ai3/tools/iprep/proto"
"google.golang.org/grpc"
......@@ -15,8 +17,9 @@ type Options struct {
}
type Client interface {
Submit(context.Context, *ippb.Event, *ippb.Aggregate) error
Submit(context.Context, []*ippb.Event, *ippb.Aggregate) error
GetScore(context.Context, string) (*ippb.GetScoreResponse, error)
GetAllScores(context.Context, float32) (<-chan *ippb.GetScoreResponse, error)
Close()
}
......@@ -56,11 +59,9 @@ func (c *rpcClient) Close() {
c.conn.Close()
}
func (c *rpcClient) Submit(ctx context.Context, ev *ippb.Event, aggr *ippb.Aggregate) error {
func (c *rpcClient) Submit(ctx context.Context, events []*ippb.Event, aggr *ippb.Aggregate) error {
var req ippb.SubmitRequest
if ev != nil {
req.Events = append(req.Events, ev)
}
req.Events = events
if aggr != nil {
req.Aggregates = append(req.Aggregates, aggr)
}
......@@ -71,3 +72,28 @@ func (c *rpcClient) Submit(ctx context.Context, ev *ippb.Event, aggr *ippb.Aggre
func (c *rpcClient) GetScore(ctx context.Context, ip string) (*ippb.GetScoreResponse, error) {
return c.stub.GetScore(ctx, &ippb.GetScoreRequest{Ip: ip})
}
func (c *rpcClient) GetAllScores(ctx context.Context, threshold float32) (<-chan *ippb.GetScoreResponse, error) {
stream, err := c.stub.GetAllScores(ctx, &ippb.GetAllScoresRequest{Threshold: threshold})
if err != nil {
return nil, err
}
ch := make(chan *ippb.GetScoreResponse, 100)
go func() {
defer close(ch)
for {
entry, err := stream.Recv()
if err == io.EOF {
break
} else if err != nil {
log.Printf("error in GetAllScores stream: %v", err)
break
}
ch <- entry
}
}()
return ch, nil
}
package main
import (
"context"
"flag"
"fmt"
"log"
ipclient "git.autistici.org/ai3/tools/iprep/client"
"github.com/google/subcommands"
)
type dumpCommand struct {
serverAddr string
}
func (c *dumpCommand) Name() string { return "dump" }
func (c *dumpCommand) Synopsis() string { return "dump all scores" }
func (c *dumpCommand) Usage() string {
return `dump [<flags>]:
Dump all the IPs and scores contained in the database.
`
}
func (c *dumpCommand) SetFlags(f *flag.FlagSet) {
f.StringVar(&c.serverAddr, "server", "", "`address` (https://host:port) of the iprep server")
}
func (c *dumpCommand) Execute(ctx context.Context, f *flag.FlagSet, args ...interface{}) subcommands.ExitStatus {
if f.NArg() > 0 {
log.Printf("error: too many arguments")
return subcommands.ExitUsageError
}
if c.serverAddr == "" {
log.Printf("error: must specify --server")
return subcommands.ExitUsageError
}
if err := c.run(ctx); err != nil {
log.Printf("error: %v", err)
return subcommands.ExitFailure
}
return subcommands.ExitSuccess
}
func (c *dumpCommand) run(ctx context.Context) error {
client, err := ipclient.New(c.serverAddr, &ipclient.Options{Insecure: true})
if err != nil {
return err
}
ch, err := client.GetAllScores(ctx, 0)
if err != nil {
return err
}
for entry := range ch {
fmt.Printf("%s %g\n", entry.Ip, entry.Score)
}
return nil
}
func init() {
subcommands.Register(&dumpCommand{}, "")
}
package main
import (
"context"
"flag"
"fmt"
"log"
ipclient "git.autistici.org/ai3/tools/iprep/client"
"github.com/google/subcommands"
)
type queryCommand struct {
serverAddr string
}
func (c *queryCommand) Name() string { return "query" }
func (c *queryCommand) Synopsis() string { return "query score for an IP" }
func (c *queryCommand) Usage() string {
return `query [<flags>] <IP>:
Query the score for the specified IP.
`
}
func (c *queryCommand) SetFlags(f *flag.FlagSet) {
f.StringVar(&c.serverAddr, "server", "", "`address` (https://host:port) of the iprep server")
}
func (c *queryCommand) Execute(ctx context.Context, f *flag.FlagSet, args ...interface{}) subcommands.ExitStatus {
if f.NArg() != 1 {
log.Printf("error: wrong number of arguments")
return subcommands.ExitUsageError
}
if c.serverAddr == "" {
log.Printf("error: must specify --server")
return subcommands.ExitUsageError
}
if err := c.run(ctx, flag.Arg(0)); err != nil {
log.Printf("error: %v", err)
return subcommands.ExitFailure
}
return subcommands.ExitSuccess
}
func (c *queryCommand) run(ctx context.Context, ip string) error {
client, err := ipclient.New(c.serverAddr, &ipclient.Options{Insecure: true})
if err != nil {
return err
}
resp, err := client.GetScore(ctx, ip)
if err != nil {
return err
}
fmt.Printf("%g\n", resp.Score)
return nil
}
func init() {
subcommands.Register(&queryCommand{}, "")
}
......@@ -17,7 +17,8 @@ import (
"github.com/coreos/go-systemd/v22/daemon"
"github.com/google/subcommands"
_ "golang.org/x/net/trace"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
......@@ -49,7 +50,7 @@ func (c *serverCommand) Usage() string {
}
func (c *serverCommand) SetFlags(f *flag.FlagSet) {
f.StringVar(&c.rpcAddr, "rpc-addr", ":7170", "`address` of GRPC listener")
f.StringVar(&c.rpcAddr, "grpc-addr", ":7170", "`address` of GRPC listener")
f.StringVar(&c.httpAddr, "http-addr", ":7180", "`address` of HTTP debug listener")
f.StringVar(&c.dbURI, "db", "leveldb:///var/lib/iprep/data", "database `uri` (sqlite:// or leveldb://)")
f.StringVar(&c.scriptPath, "scoring-script", "/etc/iprep/score.td", "`path` to a custom scoring script")
......@@ -124,34 +125,31 @@ func (c *serverCommand) run(ctx context.Context) error {
}
defer srv.Close()
var httpServer *http.Server
var grpcServer *grpc.Server
var cancel context.CancelFunc
ctx, cancel = context.WithCancel(ctx)
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
httpServer = &http.Server{
http.Handle("/metrics", promhttp.Handler())
server := &http.Server{
Addr: c.httpAddr,
ReadTimeout: 10 * time.Second,
WriteTimeout: 30 * time.Second,
}
log.Printf("starting HTTP server on %s", c.httpAddr)
if err := httpServer.ListenAndServe(); err != http.ErrServerClosed {
return err
}
return nil
return runHTTPServerWithContext(ctx, server)
})
g.Go(func() error {
l, err := net.Listen("tcp", c.rpcAddr)
if err != nil {
return err
opts := []grpc.ServerOption{
grpc.UnaryInterceptor(
grpc_prometheus.UnaryServerInterceptor,
),
grpc.StreamInterceptor(
grpc_prometheus.StreamServerInterceptor,
),
}
log.Printf("starting GRPC server on %s", c.rpcAddr)
var opts []grpc.ServerOption
if c.tlsCert != "" && c.tlsKey != "" {
creds, err := credentials.NewServerTLSFromFile(c.tlsCert, c.tlsKey)
if err != nil {
......@@ -159,15 +157,20 @@ func (c *serverCommand) run(ctx context.Context) error {
}
opts = append(opts, grpc.Creds(creds))
}
grpcServer = grpc.NewServer(opts...)
ippb.RegisterIpRepServer(grpcServer, srv)
return grpcServer.Serve(l)
server := grpc.NewServer(opts...)
ippb.RegisterIpRepServer(server, srv)
grpc_prometheus.Register(server)
grpc.EnableTracing = true
log.Printf("starting GRPC server on %s", c.rpcAddr)
return runGRPCServerWithContext(ctx, server, c.rpcAddr)
})
// Reload the scoring script on SIGHUP.
reloadSigCh := make(chan os.Signal, 1)
go func() {
for range reloadSigCh {
log.Printf("received SIGHUP, reloading configuration")
srv.Reload()
}
}()
......@@ -175,6 +178,11 @@ func (c *serverCommand) run(ctx context.Context) error {
// Terminate the program on SIGINT/SIGTERM.
stopSigCh := make(chan os.Signal, 1)
go func() {
<-stopSigCh
log.Printf("received termination signal, exiting")
cancel()
}()
signal.Notify(stopSigCh, syscall.SIGINT, syscall.SIGTERM)
// At this point we got nothing else to do but wait for the
......@@ -182,26 +190,39 @@ func (c *serverCommand) run(ctx context.Context) error {
// ready.
daemon.SdNotify(false, daemon.SdNotifyReady) // nolint
select {
case signo := <-stopSigCh:
log.Printf("terminating due to signal %d", signo)
case <-ctx.Done():
return g.Wait()
}
func runGRPCServerWithContext(ctx context.Context, server *grpc.Server, addr string) error {
l, err := net.Listen("tcp", addr)
if err != nil {
return err
}
defer l.Close()
// We got here because of an error (or a termination request),
// so orderly shut down all the serving components.
cancel()
go func() {
<-ctx.Done()
server.GracefulStop()
}()
if httpServer != nil {
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer shutdownCancel()
httpServer.Shutdown(shutdownCtx) // nolint
}
if grpcServer != nil {
grpcServer.GracefulStop()
}
return server.Serve(l)
}
return g.Wait()
func runHTTPServerWithContext(ctx context.Context, server *http.Server) error {
go func() {
<-ctx.Done()
sctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := server.Shutdown(sctx); err != nil {
server.Close() // nolint: errcheck
}
}()
err := server.ListenAndServe()
if err == http.ErrServerClosed {
return nil
}
return err
}
func init() {
......
package main
import (
"bufio"
"context"
"flag"
"log"
"os"
"strconv"
"strings"
ipclient "git.autistici.org/ai3/tools/iprep/client"
ippb "git.autistici.org/ai3/tools/iprep/proto"
"github.com/google/subcommands"
)
type submitCommand struct {
serverAddr string
}
func (c *submitCommand) Name() string { return "submit" }
func (c *submitCommand) Synopsis() string { return "submit events" }
func (c *submitCommand) Usage() string {
return `submit [<flags>]:
Submit events from standard input. The format is very simple
and consists of one line per event, with IP address, event
type, and an optional count (default is 1 if omitted) separated
by spaces.
`
}
func (c *submitCommand) SetFlags(f *flag.FlagSet) {
f.StringVar(&c.serverAddr, "server", "", "`address` (https://host:port) of the iprep server")
}
func (c *submitCommand) Execute(ctx context.Context, f *flag.FlagSet, args ...interface{}) subcommands.ExitStatus {
if f.NArg() > 0 {
log.Printf("error: too many arguments")
return subcommands.ExitUsageError
}
if c.serverAddr == "" {
log.Printf("error: must specify --server")
return subcommands.ExitUsageError
}
if err := c.run(ctx); err != nil {
log.Printf("error: %v", err)
return subcommands.ExitFailure
}
return subcommands.ExitSuccess
}
func (c *submitCommand) run(ctx context.Context) error {
client, err := ipclient.New(c.serverAddr, &ipclient.Options{Insecure: true})
if err != nil {
return err
}
var events []*ippb.Event
scanner := bufio.NewScanner(os.Stdin)
for scanner.Scan() {
line := strings.Fields(scanner.Text())
if len(line) < 2 || len(line) > 3 {
log.Printf("syntax error")
continue
}
event := &ippb.Event{
Ip: line[0],
Type: line[1],
Count: 1,
}
if len(line) == 3 {
var err error
event.Count, err = strconv.ParseInt(line[2], 10, 64)
if err != nil {
log.Printf("syntax error")
continue
}
}
events = append(events, event)
}
return client.Submit(ctx, events, nil)
}
func init() {
subcommands.Register(&submitCommand{}, "")
}
......@@ -62,7 +62,7 @@ func parsePatterns(r io.Reader, filename string) ([]pattern, error) {
}
func (c *tailCommand) Name() string { return "tail" }
func (c *tailCommand) Synopsis() string { return "run the rpc tail" }
func (c *tailCommand) Synopsis() string { return "generate events from logs" }
func (c *tailCommand) Usage() string {
return `tail [<flags>]:
......
package main
import (
"strings"
"testing"
)
const testPatterns = `
### SSH authentication failures
# Silly brute-forcers that do not support our kex:
/^sshd\[\d+\]: Unable to negotiate with ([.0-9]+) port \d+: no matching host key type found./ ssh
### Email-related rules
# Postscreen failures - protocol errors are (in high volume) characteristic of spammers
/^postfix-in\/postscreen\[\d+\]: NOQUEUE: reject: RCPT from \[([.0-9]+)\]:\d+: 550 5.5.1 Protocol error;/ spammer
# Spammers trying to send email via disabled accounts
/^postfix-smtp-auth\/smtpd\[\d+\]: NOQUEUE: reject: RCPT from \[[^[]+(.[0-9]+)\]: 553 5.7.1 <[^>]+>: Sender address rejected: not owned by user/ spammer
# Spammers triggering SPF failures
/^policyd-spf\[\d+\]: 550 5.7.23 Message rejected due to: SPF fail - not authorized. Please see http:\/\/www.openspf.net\/Why?s=mfrom;id=[^;]*;ip=([.0-9]+);/ spammer
### Authentication
# General auth-server errors
/^auth-server\[\d+\]: auth: user=.* service=smtp status=error ip=([.0-9]+) error=/ auth
### Wordpress-specific rules
/^nginx\[\d+\]: .*nginx_access: \S+ \S+ ([.:0-9]+) .*"POST /wp-login\.php HTTP/ wordpress
/^nginx\[\d+\]: .*nginx_access: \S+ \S+ ([.:0-9]+) .*"POST /wp-comments-post\.php HTTP/ wordpress
`
func TestParsePatterns(t *testing.T) {
patterns, err := parsePatterns(strings.NewReader(testPatterns), "test")
if err != nil {
t.Fatal(err)
}
if len(patterns) != 7 {
t.Fatalf("bad result: %+v", patterns)
}
}
......@@ -9,11 +9,16 @@ require (
github.com/golang/protobuf v1.5.2
github.com/google/go-cmp v0.5.5
github.com/google/subcommands v1.2.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/mattn/go-sqlite3 v1.14.7
github.com/oschwald/maxminddb-golang v1.8.0
github.com/prometheus/client_golang v1.10.0
github.com/prometheus/common v0.20.0 // indirect
github.com/syndtr/goleveldb v1.0.0
golang.org/x/net v0.0.0-20210415231046-e915ea6b2b7d
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/sys v0.0.0-20210415045647-66c3f260301c // indirect
google.golang.org/genproto v0.0.0-20210416161957-9910b6c460de // indirect
google.golang.org/grpc v1.37.0
google.golang.org/protobuf v1.26.0
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
......
This diff is collapsed.
......@@ -104,7 +104,7 @@ func (q *submissionQueue) sendAggregate(ctx context.Context, aggr *ippb.Aggregat
}
func (q *submissionQueue) sendEvent(ctx context.Context, ev *ippb.Event) {
if err := q.client.Submit(ctx, ev, nil); err != nil {
if err := q.client.Submit(ctx, []*ippb.Event{ev}, nil); err != nil {
log.Printf("failed to submit event: %v", err)
}
}
......
Copyright (C) 2013 Blake Mizerany
Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:
The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
8
5
26
12
5
235
13
6
28
30
3
3
3
3
5
2
33
7
2
4
7
12
14
5
8
3
10
4
5
3
6
6
209
20
3
10
14
3
4
6
8
5
11
7
3
2
3
3
212
5
222
4
10
10
5
6
3
8
3
10
254
220
2
3
5
24
5
4