From 03f0909ca79626e6caa7cdb1a5bf0044d2a5a27c Mon Sep 17 00:00:00 2001 From: ale <ale@incal.net> Date: Fri, 10 Feb 2023 17:56:49 +0000 Subject: [PATCH] Add instrumentation to Follow() And improve robustness when Subscribe() disconnects. --- datastore/crud/httptransport/retry.go | 18 ---------- datastore/crudlog/follow.go | 47 ++++++++++++++++++++++++++- datastore/crudlog/remote.go | 19 +++++++++-- 3 files changed, 63 insertions(+), 21 deletions(-) delete mode 100644 datastore/crud/httptransport/retry.go diff --git a/datastore/crud/httptransport/retry.go b/datastore/crud/httptransport/retry.go deleted file mode 100644 index 19cc5e2..0000000 --- a/datastore/crud/httptransport/retry.go +++ /dev/null @@ -1,18 +0,0 @@ -package httptransport - -import ( - "time" - - "github.com/cenkalti/backoff/v4" -) - -var RetryPolicy = newPermanentRetryBackOff() - -func newPermanentRetryBackOff() backoff.BackOff { - exp := backoff.NewExponentialBackOff() - exp.InitialInterval = 200 * time.Millisecond - exp.RandomizationFactor = 0.2 - exp.MaxInterval = 60 * time.Second - exp.MaxElapsedTime = 0 - return exp -} diff --git a/datastore/crudlog/follow.go b/datastore/crudlog/follow.go index f4d86b5..9874cd5 100644 --- a/datastore/crudlog/follow.go +++ b/datastore/crudlog/follow.go @@ -5,9 +5,12 @@ import ( "errors" "fmt" "log" + + "github.com/prometheus/client_golang/prometheus" ) -func Follow(ctx context.Context, src LogSource, dst LogSink) error { +func doFollow(ctx context.Context, src LogSource, dst LogSink) error { + replState.Set(0) start := dst.LatestSequence() // Start a subscription with the snapshot as a reference. @@ -19,6 +22,7 @@ retry: if errors.Is(err, ErrHorizon) && restartFromSnapshot { // Can't recover from 'start', grab a snapshot. log.Printf("index %s is past the remote horizon, grabbing snapshot", start) + snapshotCounter.Inc() snap, serr := src.Snapshot(ctx) if serr != nil { return fmt.Errorf("Snapshot() error: %w", serr) @@ -39,12 +43,14 @@ retry: defer sub.Close() ch := sub.Notify() + replState.Set(1) for { select { case op := <-ch: if op == nil { return nil } + latestSequence.Set(float64(op.Seq())) if err := dst.Apply(op, true); err != nil { return fmt.Errorf("sequence %s: %w", op.Seq(), err) } @@ -53,3 +59,42 @@ retry: } } } + +func Follow(ctx context.Context, src LogSource, dst LogSink) error { + // Outer loop around doFollow that catches transport errors on + // Subscribe() and restarts the process from where it left + // off. Transport errors result in doFollow() returning nil, + // any other error is considered permanent. + for { + err := doFollow(ctx, src, dst) + if err != nil { + return err + } + } +} + +var ( + replState = prometheus.NewGauge( + prometheus.GaugeOpts{ + Name: "async_repl_up", + Help: "Status of the asynchronous replication process.", + }) + snapshotCounter = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "async_repl_snapshots_total", + Help: "Total number of Snapshot() calls.", + }) + latestSequence = prometheus.NewGauge( + prometheus.GaugeOpts{ + Name: "async_repl_sequence", + Help: "Last sequence number seen.", + }) +) + +func init() { + prometheus.MustRegister( + replState, + snapshotCounter, + latestSequence, + ) +} diff --git a/datastore/crudlog/remote.go b/datastore/crudlog/remote.go index 47096ba..f88a541 100644 --- a/datastore/crudlog/remote.go +++ b/datastore/crudlog/remote.go @@ -9,6 +9,7 @@ import ( "net" "net/http" "net/url" + "time" "git.autistici.org/ai3/attic/wig/datastore/crud/httpapi" "git.autistici.org/ai3/attic/wig/datastore/crud/httptransport" @@ -46,7 +47,7 @@ func (r *remotePubsubClient) Snapshot(ctx context.Context) (Snapshot, error) { err = maybeTempError(err) return }, - backoff.WithContext(httptransport.RetryPolicy, ctx), + backoff.WithContext(retryPolicy, ctx), ) return snap, err } @@ -65,7 +66,7 @@ func (r *remotePubsubClient) Subscribe(ctx context.Context, start Sequence) (Sub err = maybeTempError(err) return }, - backoff.WithContext(httptransport.RetryPolicy, ctx), + backoff.WithContext(retryPolicy, ctx), ) return sub, err } @@ -261,3 +262,17 @@ func maybeTempError(err error) error { } return err } + +// The log client uses a custom backoff policy that will back off +// exponentially up to a relatively short interval, and will just keep +// retrying forever (until the context is canceled). +var retryPolicy = newPermanentRetryBackOff() + +func newPermanentRetryBackOff() backoff.BackOff { + exp := backoff.NewExponentialBackOff() + exp.InitialInterval = 200 * time.Millisecond + exp.RandomizationFactor = 0.2 + exp.MaxInterval = 60 * time.Second + exp.MaxElapsedTime = 0 + return exp +} -- GitLab