Skip to content
Snippets Groups Projects
Commit 157d78d3 authored by ale's avatar ale
Browse files

Refactor the client to take a grpc.ClientConn

No need to duplicate functionality and have weird subset of
DialOptions available as part of the iprep client API.
parent b7834478
No related branches found
No related tags found
No related merge requests found
......@@ -7,58 +7,28 @@ import (
ippb "git.autistici.org/ai3/tools/iprep/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
type Options struct {
CAFile string `yaml:"cert"`
ServerName string `yaml:"server_name"`
Insecure bool `yaml:"insecure"`
}
type Client interface {
Submit(context.Context, []*ippb.Event, *ippb.Aggregate) error
GetScore(context.Context, string) (*ippb.GetScoreResponse, error)
GetAllScores(context.Context, float32) (<-chan *ippb.GetScoreResponse, error)
Close()
}
func New(addr string, opts *Options) (Client, error) {
return newClient(addr, opts)
func New(conn *grpc.ClientConn) (Client, error) {
return newClient(conn)
}
type rpcClient struct {
conn *grpc.ClientConn
stub ippb.IpRepClient
}
func newClient(addr string, opts *Options) (*rpcClient, error) {
var grpcOpts []grpc.DialOption
if opts.CAFile != "" {
creds, err := credentials.NewClientTLSFromFile(opts.CAFile, opts.ServerName)
if err != nil {
return nil, err
}
grpcOpts = append(grpcOpts, grpc.WithTransportCredentials(creds))
}
if opts.Insecure {
grpcOpts = append(grpcOpts, grpc.WithInsecure())
}
conn, err := grpc.Dial(addr, grpcOpts...)
if err != nil {
return nil, err
}
func newClient(conn *grpc.ClientConn) (*rpcClient, error) {
return &rpcClient{
conn: conn,
stub: ippb.NewIpRepClient(conn),
}, nil
}
func (c *rpcClient) Close() {
c.conn.Close()
}
func (c *rpcClient) Submit(ctx context.Context, events []*ippb.Event, aggr *ippb.Aggregate) error {
var req ippb.SubmitRequest
req.Events = events
......
package main
import (
"flag"
"fmt"
"net"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
type clientBase struct {
serverAddr string
sslCA string
}
func (c *clientBase) SetFlags(f *flag.FlagSet) {
f.StringVar(&c.serverAddr, "server", "", "`address` (host:port) of the iprep server")
f.StringVar(&c.sslCA, "ssl-ca", "", "`path` to a SSL CA certificate, enables SSL")
}
func (c *clientBase) ClientOpts() ([]grpc.DialOption, error) {
var opts []grpc.DialOption
if c.sslCA != "" {
servername, _, err := net.SplitHostPort(c.serverAddr)
if err != nil {
return nil, fmt.Errorf("could not parse server addr: %w", err)
}
creds, err := credentials.NewClientTLSFromFile(c.sslCA, servername)
if err != nil {
return nil, err
}
opts = append(opts, grpc.WithTransportCredentials(creds))
} else {
opts = append(opts, grpc.WithInsecure())
}
return opts, nil
}
......@@ -8,10 +8,11 @@ import (
ipclient "git.autistici.org/ai3/tools/iprep/client"
"github.com/google/subcommands"
"google.golang.org/grpc"
)
type dumpCommand struct {
serverAddr string
clientBase
}
func (c *dumpCommand) Name() string { return "dump" }
......@@ -24,7 +25,7 @@ func (c *dumpCommand) Usage() string {
}
func (c *dumpCommand) SetFlags(f *flag.FlagSet) {
f.StringVar(&c.serverAddr, "server", "", "`address` (https://host:port) of the iprep server")
c.clientBase.SetFlags(f)
}
func (c *dumpCommand) Execute(ctx context.Context, f *flag.FlagSet, args ...interface{}) subcommands.ExitStatus {
......@@ -46,7 +47,17 @@ func (c *dumpCommand) Execute(ctx context.Context, f *flag.FlagSet, args ...inte
}
func (c *dumpCommand) run(ctx context.Context) error {
client, err := ipclient.New(c.serverAddr, &ipclient.Options{Insecure: true})
opts, err := c.clientBase.ClientOpts()
if err != nil {
return err
}
conn, err := grpc.Dial(c.serverAddr, opts...)
if err != nil {
return err
}
defer conn.Close()
client, err := ipclient.New(conn)
if err != nil {
return err
}
......
......@@ -8,10 +8,11 @@ import (
ipclient "git.autistici.org/ai3/tools/iprep/client"
"github.com/google/subcommands"
"google.golang.org/grpc"
)
type queryCommand struct {
serverAddr string
clientBase
}
func (c *queryCommand) Name() string { return "query" }
......@@ -25,7 +26,7 @@ func (c *queryCommand) Usage() string {
}
func (c *queryCommand) SetFlags(f *flag.FlagSet) {
f.StringVar(&c.serverAddr, "server", "", "`address` (https://host:port) of the iprep server")
c.clientBase.SetFlags(f)
}
func (c *queryCommand) Execute(ctx context.Context, f *flag.FlagSet, args ...interface{}) subcommands.ExitStatus {
......@@ -47,7 +48,17 @@ func (c *queryCommand) Execute(ctx context.Context, f *flag.FlagSet, args ...int
}
func (c *queryCommand) run(ctx context.Context, ip string) error {
client, err := ipclient.New(c.serverAddr, &ipclient.Options{Insecure: true})
opts, err := c.clientBase.ClientOpts()
if err != nil {
return err
}
conn, err := grpc.Dial(c.serverAddr, opts...)
if err != nil {
return err
}
defer conn.Close()
client, err := ipclient.New(conn)
if err != nil {
return err
}
......
......@@ -12,10 +12,11 @@ import (
ipclient "git.autistici.org/ai3/tools/iprep/client"
ippb "git.autistici.org/ai3/tools/iprep/proto"
"github.com/google/subcommands"
"google.golang.org/grpc"
)
type submitCommand struct {
serverAddr string
clientBase
}
func (c *submitCommand) Name() string { return "submit" }
......@@ -32,7 +33,7 @@ func (c *submitCommand) Usage() string {
}
func (c *submitCommand) SetFlags(f *flag.FlagSet) {
f.StringVar(&c.serverAddr, "server", "", "`address` (https://host:port) of the iprep server")
c.clientBase.SetFlags(f)
}
func (c *submitCommand) Execute(ctx context.Context, f *flag.FlagSet, args ...interface{}) subcommands.ExitStatus {
......@@ -54,7 +55,17 @@ func (c *submitCommand) Execute(ctx context.Context, f *flag.FlagSet, args ...in
}
func (c *submitCommand) run(ctx context.Context) error {
client, err := ipclient.New(c.serverAddr, &ipclient.Options{Insecure: true})
opts, err := c.clientBase.ClientOpts()
if err != nil {
return err
}
conn, err := grpc.Dial(c.serverAddr, opts...)
if err != nil {
return err
}
defer conn.Close()
client, err := ipclient.New(conn)
if err != nil {
return err
}
......
......@@ -10,17 +10,21 @@ import (
"os"
"regexp"
"strings"
"time"
"git.autistici.org/ai3/tools/iprep/client"
ippb "git.autistici.org/ai3/tools/iprep/proto"
"git.autistici.org/ai3/tools/iprep/submission"
"github.com/google/subcommands"
"google.golang.org/grpc"
)
type tailCommand struct {
clientBase
testOnly bool
serverAddr string
patternFile string
maxDelay time.Duration
maxStored int
}
type pattern struct {
......@@ -88,9 +92,12 @@ func (c *tailCommand) Usage() string {
}
func (c *tailCommand) SetFlags(f *flag.FlagSet) {
f.StringVar(&c.serverAddr, "server", "", "`address` (https://host:port) of the iprep server")
c.clientBase.SetFlags(f)
f.StringVar(&c.patternFile, "patterns", "", "`file` with patterns to load")
f.BoolVar(&c.testOnly, "test", false, "only print matches, do not submit them")
f.DurationVar(&c.maxDelay, "max-delay", 30*time.Second, "max queue delay")
f.IntVar(&c.maxStored, "max-size", 1000, "max queue size")
}
func (c *tailCommand) Execute(ctx context.Context, f *flag.FlagSet, args ...interface{}) subcommands.ExitStatus {
......@@ -126,12 +133,21 @@ func (c *tailCommand) run(ctx context.Context) error {
return err
}
opts, err := c.clientBase.ClientOpts()
if err != nil {
return err
}
conn, err := grpc.Dial(c.serverAddr, opts...)
if err != nil {
return err
}
defer conn.Close()
var sub submission.Submitter
if !c.testOnly {
sub, err = submission.New(c.serverAddr, &submission.Options{
ClientOptions: &client.Options{
Insecure: true,
},
sub, err = submission.New(conn, &submission.Options{
MaxDelay: c.maxDelay,
MaxStored: c.maxStored,
})
if err != nil {
return err
......
......@@ -7,6 +7,7 @@ import (
"git.autistici.org/ai3/tools/iprep/client"
ippb "git.autistici.org/ai3/tools/iprep/proto"
"google.golang.org/grpc"
)
var (
......@@ -21,8 +22,6 @@ type Options struct {
MaxDelay time.Duration
MaxStored int
ChanBufferSize int
ClientOptions *client.Options
}
func (o *Options) withDefaults() *Options {
......@@ -75,9 +74,9 @@ func newSubmitter(client client.Client, opts *Options) Submitter {
}
// New creates a new Submitter pointing at the specified collector addr.
func New(addr string, opts *Options) (Submitter, error) {
func New(conn *grpc.ClientConn, opts *Options) (Submitter, error) {
opts = opts.withDefaults()
c, err := client.New(addr, opts.ClientOptions)
c, err := client.New(conn)
if err != nil {
return nil, err
}
......
......@@ -16,8 +16,6 @@ type fakeClient struct {
count int64
}
func (f *fakeClient) Close() {}
func (f *fakeClient) Submit(_ context.Context, evs []*ippb.Event, aggr *ippb.Aggregate) error {
f.mx.Lock()
defer f.mx.Unlock()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment