Skip to content
Snippets Groups Projects
Commit 03f0909c authored by ale's avatar ale
Browse files

Add instrumentation to Follow()

And improve robustness when Subscribe() disconnects.
parent 738f0731
Branches
No related tags found
No related merge requests found
package httptransport
import (
"time"
"github.com/cenkalti/backoff/v4"
)
var RetryPolicy = newPermanentRetryBackOff()
func newPermanentRetryBackOff() backoff.BackOff {
exp := backoff.NewExponentialBackOff()
exp.InitialInterval = 200 * time.Millisecond
exp.RandomizationFactor = 0.2
exp.MaxInterval = 60 * time.Second
exp.MaxElapsedTime = 0
return exp
}
...@@ -5,9 +5,12 @@ import ( ...@@ -5,9 +5,12 @@ import (
"errors" "errors"
"fmt" "fmt"
"log" "log"
"github.com/prometheus/client_golang/prometheus"
) )
func Follow(ctx context.Context, src LogSource, dst LogSink) error { func doFollow(ctx context.Context, src LogSource, dst LogSink) error {
replState.Set(0)
start := dst.LatestSequence() start := dst.LatestSequence()
// Start a subscription with the snapshot as a reference. // Start a subscription with the snapshot as a reference.
...@@ -19,6 +22,7 @@ retry: ...@@ -19,6 +22,7 @@ retry:
if errors.Is(err, ErrHorizon) && restartFromSnapshot { if errors.Is(err, ErrHorizon) && restartFromSnapshot {
// Can't recover from 'start', grab a snapshot. // Can't recover from 'start', grab a snapshot.
log.Printf("index %s is past the remote horizon, grabbing snapshot", start) log.Printf("index %s is past the remote horizon, grabbing snapshot", start)
snapshotCounter.Inc()
snap, serr := src.Snapshot(ctx) snap, serr := src.Snapshot(ctx)
if serr != nil { if serr != nil {
return fmt.Errorf("Snapshot() error: %w", serr) return fmt.Errorf("Snapshot() error: %w", serr)
...@@ -39,12 +43,14 @@ retry: ...@@ -39,12 +43,14 @@ retry:
defer sub.Close() defer sub.Close()
ch := sub.Notify() ch := sub.Notify()
replState.Set(1)
for { for {
select { select {
case op := <-ch: case op := <-ch:
if op == nil { if op == nil {
return nil return nil
} }
latestSequence.Set(float64(op.Seq()))
if err := dst.Apply(op, true); err != nil { if err := dst.Apply(op, true); err != nil {
return fmt.Errorf("sequence %s: %w", op.Seq(), err) return fmt.Errorf("sequence %s: %w", op.Seq(), err)
} }
...@@ -53,3 +59,42 @@ retry: ...@@ -53,3 +59,42 @@ retry:
} }
} }
} }
func Follow(ctx context.Context, src LogSource, dst LogSink) error {
// Outer loop around doFollow that catches transport errors on
// Subscribe() and restarts the process from where it left
// off. Transport errors result in doFollow() returning nil,
// any other error is considered permanent.
for {
err := doFollow(ctx, src, dst)
if err != nil {
return err
}
}
}
var (
replState = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "async_repl_up",
Help: "Status of the asynchronous replication process.",
})
snapshotCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "async_repl_snapshots_total",
Help: "Total number of Snapshot() calls.",
})
latestSequence = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "async_repl_sequence",
Help: "Last sequence number seen.",
})
)
func init() {
prometheus.MustRegister(
replState,
snapshotCounter,
latestSequence,
)
}
...@@ -9,6 +9,7 @@ import ( ...@@ -9,6 +9,7 @@ import (
"net" "net"
"net/http" "net/http"
"net/url" "net/url"
"time"
"git.autistici.org/ai3/attic/wig/datastore/crud/httpapi" "git.autistici.org/ai3/attic/wig/datastore/crud/httpapi"
"git.autistici.org/ai3/attic/wig/datastore/crud/httptransport" "git.autistici.org/ai3/attic/wig/datastore/crud/httptransport"
...@@ -46,7 +47,7 @@ func (r *remotePubsubClient) Snapshot(ctx context.Context) (Snapshot, error) { ...@@ -46,7 +47,7 @@ func (r *remotePubsubClient) Snapshot(ctx context.Context) (Snapshot, error) {
err = maybeTempError(err) err = maybeTempError(err)
return return
}, },
backoff.WithContext(httptransport.RetryPolicy, ctx), backoff.WithContext(retryPolicy, ctx),
) )
return snap, err return snap, err
} }
...@@ -65,7 +66,7 @@ func (r *remotePubsubClient) Subscribe(ctx context.Context, start Sequence) (Sub ...@@ -65,7 +66,7 @@ func (r *remotePubsubClient) Subscribe(ctx context.Context, start Sequence) (Sub
err = maybeTempError(err) err = maybeTempError(err)
return return
}, },
backoff.WithContext(httptransport.RetryPolicy, ctx), backoff.WithContext(retryPolicy, ctx),
) )
return sub, err return sub, err
} }
...@@ -261,3 +262,17 @@ func maybeTempError(err error) error { ...@@ -261,3 +262,17 @@ func maybeTempError(err error) error {
} }
return err return err
} }
// The log client uses a custom backoff policy that will back off
// exponentially up to a relatively short interval, and will just keep
// retrying forever (until the context is canceled).
var retryPolicy = newPermanentRetryBackOff()
func newPermanentRetryBackOff() backoff.BackOff {
exp := backoff.NewExponentialBackOff()
exp.InitialInterval = 200 * time.Millisecond
exp.RandomizationFactor = 0.2
exp.MaxInterval = 60 * time.Second
exp.MaxElapsedTime = 0
return exp
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment