diff --git a/cmd/replds/pull.go b/cmd/replds/pull.go index 65294855e78c573483baa1358c9712dac3d85b1e..1fcf3f41d373ac1bed581264bd1664b6d87ea6fb 100644 --- a/cmd/replds/pull.go +++ b/cmd/replds/pull.go @@ -14,6 +14,7 @@ import ( "github.com/google/subcommands" "google.golang.org/grpc" "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" ) type pullCommand struct { @@ -48,17 +49,19 @@ func (c *pullCommand) SetFlags(f *flag.FlagSet) { } func (c *pullCommand) grpcDialOptions() ([]grpc.DialOption, error) { - var opts []grpc.DialOption + var creds credentials.TransportCredentials if c.sslCert != "" && c.sslKey != "" && c.sslCA != "" { tlsconf, err := clientTLSConfig(c.sslCert, c.sslKey, c.sslCA) if err != nil { return nil, err } - opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(tlsconf))) + creds = credentials.NewTLS(tlsconf) } else { - opts = append(opts, grpc.WithInsecure()) + creds = insecure.NewCredentials() } - return opts, nil + return []grpc.DialOption{ + grpc.WithTransportCredentials(creds), + }, nil } func (c *pullCommand) Execute(ctx context.Context, f *flag.FlagSet, args ...interface{}) subcommands.ExitStatus { diff --git a/cmd/replds/server.go b/cmd/replds/server.go index 7b0a5a813c5d50be2b10ad2d751724273f1c9fa0..50dd21fdf6bae442739545fdb29cc011e619e10f 100644 --- a/cmd/replds/server.go +++ b/cmd/replds/server.go @@ -25,6 +25,7 @@ import ( "golang.org/x/sync/errgroup" "google.golang.org/grpc" "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" _ "net/http/pprof" ) @@ -71,17 +72,19 @@ func (c *serverCommand) SetFlags(f *flag.FlagSet) { } func (c *serverCommand) grpcDialOptions() ([]grpc.DialOption, error) { - var opts []grpc.DialOption + var creds credentials.TransportCredentials if c.clientSSLCert != "" && c.clientSSLKey != "" && c.sslCA != "" { tlsconf, err := clientTLSConfig(c.clientSSLCert, c.clientSSLKey, c.sslCA) if err != nil { return nil, err } - opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(tlsconf))) + creds = credentials.NewTLS(tlsconf) } else { - opts = append(opts, grpc.WithInsecure()) + creds = insecure.NewCredentials() } - return opts, nil + return []grpc.DialOption{ + grpc.WithTransportCredentials(creds), + }, nil } func (c *serverCommand) Execute(ctx context.Context, f *flag.FlagSet, args ...interface{}) subcommands.ExitStatus { @@ -175,12 +178,13 @@ func (c *serverCommand) runServer(ctx context.Context, server *replds.Server) er http.Handle("/metrics", promhttp.Handler()) httpSrv := &http.Server{ - Addr: c.httpAddr, - TLSConfig: tlsconf, - Handler: nil, - ReadTimeout: 10 * time.Second, - IdleTimeout: 30 * time.Second, - WriteTimeout: 10 * time.Second, + Addr: c.httpAddr, + TLSConfig: tlsconf, + Handler: nil, + ReadTimeout: 10 * time.Second, + ReadHeaderTimeout: 30 * time.Second, + IdleTimeout: 30 * time.Second, + WriteTimeout: 10 * time.Second, } return runHTTPServerWithContext(ictx, httpSrv) diff --git a/cmd/replds/store.go b/cmd/replds/store.go index ad4253da8a3d9fd1d10150c4535f2223cac731b2..9d8ac8035099c905414e1ef950dffe752b0b03ae 100644 --- a/cmd/replds/store.go +++ b/cmd/replds/store.go @@ -14,6 +14,7 @@ import ( "github.com/google/subcommands" "google.golang.org/grpc" "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -47,17 +48,19 @@ func (c *storeCommand) SetFlags(f *flag.FlagSet) { } func (c *storeCommand) grpcDialOptions() ([]grpc.DialOption, error) { - var opts []grpc.DialOption + var creds credentials.TransportCredentials if c.sslCert != "" && c.sslKey != "" && c.sslCA != "" { tlsconf, err := clientTLSConfig(c.sslCert, c.sslKey, c.sslCA) if err != nil { return nil, err } - opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(tlsconf))) + creds = credentials.NewTLS(tlsconf) } else { - opts = append(opts, grpc.WithInsecure()) + creds = insecure.NewCredentials() } - return opts, nil + return []grpc.DialOption{ + grpc.WithTransportCredentials(creds), + }, nil } func (c *storeCommand) Execute(ctx context.Context, f *flag.FlagSet, args ...interface{}) subcommands.ExitStatus { diff --git a/store/memlog/log.go b/store/memlog/log.go index c40bd5e793624b8ef34f206382184bf6208681ec..1741ac08c8161f02c802ddb32f4d3de0a87c0ae1 100644 --- a/store/memlog/log.go +++ b/store/memlog/log.go @@ -2,6 +2,7 @@ package memlog import ( "encoding/binary" + "errors" "fmt" "io" "log" @@ -120,7 +121,7 @@ func processLog(path string, fn func(*pb.Node) error) (bool, error) { node, nodeDirty, newBuf, err := logRead(ff, buf) buf = newBuf - if err == io.EOF { + if errors.Is(err, io.EOF) { return dirty, nil } if err != nil { diff --git a/watcher/triggers.go b/watcher/triggers.go index a856d1e599e8666ae995bdf1083cd2a2cbcbc762..ae54875989e3fe6700b738dd24f0288b712c1aac 100644 --- a/watcher/triggers.go +++ b/watcher/triggers.go @@ -30,7 +30,9 @@ func (m scriptTriggerManager) Has(path string) bool { func (m scriptTriggerManager) Notify(b *common.NotifyBatch) { b.Apply(func(path string, nodes []*pb.Node) { trigger := m[path] - trigger.Run(nodes) + if err := trigger.Run(nodes); err != nil { + log.Printf("trigger error: %v", err) + } }) } diff --git a/watcher/watcher.go b/watcher/watcher.go index 2d7fd33ee37a6ad8b50629a3769202a3a09a2ad9..9cf5cba1fa4f88b85629eef680e216dda5c0a37e 100644 --- a/watcher/watcher.go +++ b/watcher/watcher.go @@ -2,6 +2,7 @@ package watcher import ( "context" + "errors" "io" "log" "time" @@ -71,7 +72,7 @@ func (w *Watcher) Run(ctx context.Context) { Summary: w.store.Summary(), } stream, err := stub.Watch(ctx, req) - if err == context.Canceled || status.Code(err) == codes.Canceled { + if errors.Is(err, context.Canceled) || status.Code(err) == codes.Canceled { return } if err != nil { @@ -81,7 +82,7 @@ func (w *Watcher) Run(ctx context.Context) { for { resp, err := stream.Recv() - if err == io.EOF { + if errors.Is(err, io.EOF) { break } if err != nil { @@ -92,8 +93,9 @@ func (w *Watcher) Run(ctx context.Context) { // Run triggers for each batch. tb := common.NewNotifyBatch(w.triggers) for _, node := range resp.Nodes { - w.store.AddNode(node) - tb.Add(node) + if ok, err := w.store.AddNode(node); err == nil && ok { + tb.Add(node) + } } w.triggers.Notify(tb) }