diff --git a/client/conn.go b/client/conn.go index de16c0ffa20bfe4f66b72fe6cfe104c06443af05..d90fe457903985ab1af173678095e66d61924851 100644 --- a/client/conn.go +++ b/client/conn.go @@ -7,58 +7,28 @@ import ( ippb "git.autistici.org/ai3/tools/iprep/proto" "google.golang.org/grpc" - "google.golang.org/grpc/credentials" ) -type Options struct { - CAFile string `yaml:"cert"` - ServerName string `yaml:"server_name"` - Insecure bool `yaml:"insecure"` -} - type Client interface { Submit(context.Context, []*ippb.Event, *ippb.Aggregate) error GetScore(context.Context, string) (*ippb.GetScoreResponse, error) GetAllScores(context.Context, float32) (<-chan *ippb.GetScoreResponse, error) - Close() } -func New(addr string, opts *Options) (Client, error) { - return newClient(addr, opts) +func New(conn *grpc.ClientConn) (Client, error) { + return newClient(conn) } type rpcClient struct { - conn *grpc.ClientConn stub ippb.IpRepClient } -func newClient(addr string, opts *Options) (*rpcClient, error) { - var grpcOpts []grpc.DialOption - if opts.CAFile != "" { - creds, err := credentials.NewClientTLSFromFile(opts.CAFile, opts.ServerName) - if err != nil { - return nil, err - } - grpcOpts = append(grpcOpts, grpc.WithTransportCredentials(creds)) - } - if opts.Insecure { - grpcOpts = append(grpcOpts, grpc.WithInsecure()) - } - - conn, err := grpc.Dial(addr, grpcOpts...) - if err != nil { - return nil, err - } +func newClient(conn *grpc.ClientConn) (*rpcClient, error) { return &rpcClient{ - conn: conn, stub: ippb.NewIpRepClient(conn), }, nil } -func (c *rpcClient) Close() { - c.conn.Close() -} - func (c *rpcClient) Submit(ctx context.Context, events []*ippb.Event, aggr *ippb.Aggregate) error { var req ippb.SubmitRequest req.Events = events diff --git a/cmd/iprep/client.go b/cmd/iprep/client.go new file mode 100644 index 0000000000000000000000000000000000000000..8a847db1ba711afabdf76ddae409d2274b5bc3dd --- /dev/null +++ b/cmd/iprep/client.go @@ -0,0 +1,40 @@ +package main + +import ( + "flag" + "fmt" + "net" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" +) + +type clientBase struct { + serverAddr string + sslCA string +} + +func (c *clientBase) SetFlags(f *flag.FlagSet) { + f.StringVar(&c.serverAddr, "server", "", "`address` (host:port) of the iprep server") + f.StringVar(&c.sslCA, "ssl-ca", "", "`path` to a SSL CA certificate, enables SSL") +} + +func (c *clientBase) ClientOpts() ([]grpc.DialOption, error) { + var opts []grpc.DialOption + + if c.sslCA != "" { + servername, _, err := net.SplitHostPort(c.serverAddr) + if err != nil { + return nil, fmt.Errorf("could not parse server addr: %w", err) + } + creds, err := credentials.NewClientTLSFromFile(c.sslCA, servername) + if err != nil { + return nil, err + } + opts = append(opts, grpc.WithTransportCredentials(creds)) + } else { + opts = append(opts, grpc.WithInsecure()) + } + + return opts, nil +} diff --git a/cmd/iprep/dump.go b/cmd/iprep/dump.go index 4eb9bc82ac1f7db464ddb5fb1965a81932bfd2a9..965be870d6f0f32791da144ea432f60df59adbbb 100644 --- a/cmd/iprep/dump.go +++ b/cmd/iprep/dump.go @@ -8,10 +8,11 @@ import ( ipclient "git.autistici.org/ai3/tools/iprep/client" "github.com/google/subcommands" + "google.golang.org/grpc" ) type dumpCommand struct { - serverAddr string + clientBase } func (c *dumpCommand) Name() string { return "dump" } @@ -24,7 +25,7 @@ func (c *dumpCommand) Usage() string { } func (c *dumpCommand) SetFlags(f *flag.FlagSet) { - f.StringVar(&c.serverAddr, "server", "", "`address` (https://host:port) of the iprep server") + c.clientBase.SetFlags(f) } func (c *dumpCommand) Execute(ctx context.Context, f *flag.FlagSet, args ...interface{}) subcommands.ExitStatus { @@ -46,7 +47,17 @@ func (c *dumpCommand) Execute(ctx context.Context, f *flag.FlagSet, args ...inte } func (c *dumpCommand) run(ctx context.Context) error { - client, err := ipclient.New(c.serverAddr, &ipclient.Options{Insecure: true}) + opts, err := c.clientBase.ClientOpts() + if err != nil { + return err + } + conn, err := grpc.Dial(c.serverAddr, opts...) + if err != nil { + return err + } + defer conn.Close() + + client, err := ipclient.New(conn) if err != nil { return err } diff --git a/cmd/iprep/query.go b/cmd/iprep/query.go index b47fc70802472ab812f3253224aaf8c07786a158..41bef59a3b5490d038ebdb27be6ee355bc07c433 100644 --- a/cmd/iprep/query.go +++ b/cmd/iprep/query.go @@ -8,10 +8,11 @@ import ( ipclient "git.autistici.org/ai3/tools/iprep/client" "github.com/google/subcommands" + "google.golang.org/grpc" ) type queryCommand struct { - serverAddr string + clientBase } func (c *queryCommand) Name() string { return "query" } @@ -25,7 +26,7 @@ func (c *queryCommand) Usage() string { } func (c *queryCommand) SetFlags(f *flag.FlagSet) { - f.StringVar(&c.serverAddr, "server", "", "`address` (https://host:port) of the iprep server") + c.clientBase.SetFlags(f) } func (c *queryCommand) Execute(ctx context.Context, f *flag.FlagSet, args ...interface{}) subcommands.ExitStatus { @@ -47,7 +48,17 @@ func (c *queryCommand) Execute(ctx context.Context, f *flag.FlagSet, args ...int } func (c *queryCommand) run(ctx context.Context, ip string) error { - client, err := ipclient.New(c.serverAddr, &ipclient.Options{Insecure: true}) + opts, err := c.clientBase.ClientOpts() + if err != nil { + return err + } + conn, err := grpc.Dial(c.serverAddr, opts...) + if err != nil { + return err + } + defer conn.Close() + + client, err := ipclient.New(conn) if err != nil { return err } diff --git a/cmd/iprep/submit.go b/cmd/iprep/submit.go index cfc6bfd82bea5d6094da10569393adbda904d4fd..5a94809ae6583f9dc351617baded6f3941303d07 100644 --- a/cmd/iprep/submit.go +++ b/cmd/iprep/submit.go @@ -12,10 +12,11 @@ import ( ipclient "git.autistici.org/ai3/tools/iprep/client" ippb "git.autistici.org/ai3/tools/iprep/proto" "github.com/google/subcommands" + "google.golang.org/grpc" ) type submitCommand struct { - serverAddr string + clientBase } func (c *submitCommand) Name() string { return "submit" } @@ -32,7 +33,7 @@ func (c *submitCommand) Usage() string { } func (c *submitCommand) SetFlags(f *flag.FlagSet) { - f.StringVar(&c.serverAddr, "server", "", "`address` (https://host:port) of the iprep server") + c.clientBase.SetFlags(f) } func (c *submitCommand) Execute(ctx context.Context, f *flag.FlagSet, args ...interface{}) subcommands.ExitStatus { @@ -54,7 +55,17 @@ func (c *submitCommand) Execute(ctx context.Context, f *flag.FlagSet, args ...in } func (c *submitCommand) run(ctx context.Context) error { - client, err := ipclient.New(c.serverAddr, &ipclient.Options{Insecure: true}) + opts, err := c.clientBase.ClientOpts() + if err != nil { + return err + } + conn, err := grpc.Dial(c.serverAddr, opts...) + if err != nil { + return err + } + defer conn.Close() + + client, err := ipclient.New(conn) if err != nil { return err } diff --git a/cmd/iprep/tail.go b/cmd/iprep/tail.go index 77be3896d78316b4494f477f9a549c45be37b07f..171f691aaf322206ee5599a43251684b9ae15989 100644 --- a/cmd/iprep/tail.go +++ b/cmd/iprep/tail.go @@ -10,17 +10,21 @@ import ( "os" "regexp" "strings" + "time" - "git.autistici.org/ai3/tools/iprep/client" ippb "git.autistici.org/ai3/tools/iprep/proto" "git.autistici.org/ai3/tools/iprep/submission" "github.com/google/subcommands" + "google.golang.org/grpc" ) type tailCommand struct { + clientBase + testOnly bool - serverAddr string patternFile string + maxDelay time.Duration + maxStored int } type pattern struct { @@ -88,9 +92,12 @@ func (c *tailCommand) Usage() string { } func (c *tailCommand) SetFlags(f *flag.FlagSet) { - f.StringVar(&c.serverAddr, "server", "", "`address` (https://host:port) of the iprep server") + c.clientBase.SetFlags(f) + f.StringVar(&c.patternFile, "patterns", "", "`file` with patterns to load") f.BoolVar(&c.testOnly, "test", false, "only print matches, do not submit them") + f.DurationVar(&c.maxDelay, "max-delay", 30*time.Second, "max queue delay") + f.IntVar(&c.maxStored, "max-size", 1000, "max queue size") } func (c *tailCommand) Execute(ctx context.Context, f *flag.FlagSet, args ...interface{}) subcommands.ExitStatus { @@ -126,12 +133,21 @@ func (c *tailCommand) run(ctx context.Context) error { return err } + opts, err := c.clientBase.ClientOpts() + if err != nil { + return err + } + conn, err := grpc.Dial(c.serverAddr, opts...) + if err != nil { + return err + } + defer conn.Close() + var sub submission.Submitter if !c.testOnly { - sub, err = submission.New(c.serverAddr, &submission.Options{ - ClientOptions: &client.Options{ - Insecure: true, - }, + sub, err = submission.New(conn, &submission.Options{ + MaxDelay: c.maxDelay, + MaxStored: c.maxStored, }) if err != nil { return err diff --git a/submission/queue.go b/submission/queue.go index 75bc5de621d7c369778817b3184226db81f8a16d..1b6385ae6428586361b7f6b781f4ee5d8c6ac4a8 100644 --- a/submission/queue.go +++ b/submission/queue.go @@ -7,6 +7,7 @@ import ( "git.autistici.org/ai3/tools/iprep/client" ippb "git.autistici.org/ai3/tools/iprep/proto" + "google.golang.org/grpc" ) var ( @@ -21,8 +22,6 @@ type Options struct { MaxDelay time.Duration MaxStored int ChanBufferSize int - - ClientOptions *client.Options } func (o *Options) withDefaults() *Options { @@ -75,9 +74,9 @@ func newSubmitter(client client.Client, opts *Options) Submitter { } // New creates a new Submitter pointing at the specified collector addr. -func New(addr string, opts *Options) (Submitter, error) { +func New(conn *grpc.ClientConn, opts *Options) (Submitter, error) { opts = opts.withDefaults() - c, err := client.New(addr, opts.ClientOptions) + c, err := client.New(conn) if err != nil { return nil, err } diff --git a/submission/queue_test.go b/submission/queue_test.go index f0fc3061971868e5432aff6772fa78e2cede1f5a..aabf120c18c561f7c365df4cd9743fae27481cd7 100644 --- a/submission/queue_test.go +++ b/submission/queue_test.go @@ -16,8 +16,6 @@ type fakeClient struct { count int64 } -func (f *fakeClient) Close() {} - func (f *fakeClient) Submit(_ context.Context, evs []*ippb.Event, aggr *ippb.Aggregate) error { f.mx.Lock() defer f.mx.Unlock()