diff --git a/datastore/crud/httptransport/retry.go b/datastore/crud/httptransport/retry.go deleted file mode 100644 index 19cc5e272a0cf6a28bf160068ccb2630bbfe5108..0000000000000000000000000000000000000000 --- a/datastore/crud/httptransport/retry.go +++ /dev/null @@ -1,18 +0,0 @@ -package httptransport - -import ( - "time" - - "github.com/cenkalti/backoff/v4" -) - -var RetryPolicy = newPermanentRetryBackOff() - -func newPermanentRetryBackOff() backoff.BackOff { - exp := backoff.NewExponentialBackOff() - exp.InitialInterval = 200 * time.Millisecond - exp.RandomizationFactor = 0.2 - exp.MaxInterval = 60 * time.Second - exp.MaxElapsedTime = 0 - return exp -} diff --git a/datastore/crudlog/follow.go b/datastore/crudlog/follow.go index f4d86b540865fa9f723ee9f5f9d7c50500b09ee4..9874cd5505d8d67cb1b90f1432066ebd2739b8b2 100644 --- a/datastore/crudlog/follow.go +++ b/datastore/crudlog/follow.go @@ -5,9 +5,12 @@ import ( "errors" "fmt" "log" + + "github.com/prometheus/client_golang/prometheus" ) -func Follow(ctx context.Context, src LogSource, dst LogSink) error { +func doFollow(ctx context.Context, src LogSource, dst LogSink) error { + replState.Set(0) start := dst.LatestSequence() // Start a subscription with the snapshot as a reference. @@ -19,6 +22,7 @@ retry: if errors.Is(err, ErrHorizon) && restartFromSnapshot { // Can't recover from 'start', grab a snapshot. log.Printf("index %s is past the remote horizon, grabbing snapshot", start) + snapshotCounter.Inc() snap, serr := src.Snapshot(ctx) if serr != nil { return fmt.Errorf("Snapshot() error: %w", serr) @@ -39,12 +43,14 @@ retry: defer sub.Close() ch := sub.Notify() + replState.Set(1) for { select { case op := <-ch: if op == nil { return nil } + latestSequence.Set(float64(op.Seq())) if err := dst.Apply(op, true); err != nil { return fmt.Errorf("sequence %s: %w", op.Seq(), err) } @@ -53,3 +59,42 @@ retry: } } } + +func Follow(ctx context.Context, src LogSource, dst LogSink) error { + // Outer loop around doFollow that catches transport errors on + // Subscribe() and restarts the process from where it left + // off. Transport errors result in doFollow() returning nil, + // any other error is considered permanent. + for { + err := doFollow(ctx, src, dst) + if err != nil { + return err + } + } +} + +var ( + replState = prometheus.NewGauge( + prometheus.GaugeOpts{ + Name: "async_repl_up", + Help: "Status of the asynchronous replication process.", + }) + snapshotCounter = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "async_repl_snapshots_total", + Help: "Total number of Snapshot() calls.", + }) + latestSequence = prometheus.NewGauge( + prometheus.GaugeOpts{ + Name: "async_repl_sequence", + Help: "Last sequence number seen.", + }) +) + +func init() { + prometheus.MustRegister( + replState, + snapshotCounter, + latestSequence, + ) +} diff --git a/datastore/crudlog/remote.go b/datastore/crudlog/remote.go index 47096baf67a07e5448f04c4233d058ae315346c2..f88a54114350d24ae24b5ede06d95cab10170ca3 100644 --- a/datastore/crudlog/remote.go +++ b/datastore/crudlog/remote.go @@ -9,6 +9,7 @@ import ( "net" "net/http" "net/url" + "time" "git.autistici.org/ai3/attic/wig/datastore/crud/httpapi" "git.autistici.org/ai3/attic/wig/datastore/crud/httptransport" @@ -46,7 +47,7 @@ func (r *remotePubsubClient) Snapshot(ctx context.Context) (Snapshot, error) { err = maybeTempError(err) return }, - backoff.WithContext(httptransport.RetryPolicy, ctx), + backoff.WithContext(retryPolicy, ctx), ) return snap, err } @@ -65,7 +66,7 @@ func (r *remotePubsubClient) Subscribe(ctx context.Context, start Sequence) (Sub err = maybeTempError(err) return }, - backoff.WithContext(httptransport.RetryPolicy, ctx), + backoff.WithContext(retryPolicy, ctx), ) return sub, err } @@ -261,3 +262,17 @@ func maybeTempError(err error) error { } return err } + +// The log client uses a custom backoff policy that will back off +// exponentially up to a relatively short interval, and will just keep +// retrying forever (until the context is canceled). +var retryPolicy = newPermanentRetryBackOff() + +func newPermanentRetryBackOff() backoff.BackOff { + exp := backoff.NewExponentialBackOff() + exp.InitialInterval = 200 * time.Millisecond + exp.RandomizationFactor = 0.2 + exp.MaxInterval = 60 * time.Second + exp.MaxElapsedTime = 0 + return exp +}