Skip to content
Snippets Groups Projects
Commit e80f9bda authored by ale's avatar ale
Browse files

Use the replds client instead of going through clientutil

parent 69d65dfa
No related branches found
No related tags found
No related merge requests found
Showing
with 629 additions and 391 deletions
...@@ -17,7 +17,7 @@ import ( ...@@ -17,7 +17,7 @@ import (
"sync" "sync"
"time" "time"
"git.autistici.org/ai3/go-common/clientutil" "git.autistici.org/ai3/replds"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
) )
...@@ -109,13 +109,13 @@ func NewManager(config *Config, certGen CertGenerator) (*Manager, error) { ...@@ -109,13 +109,13 @@ func NewManager(config *Config, certGen CertGenerator) (*Manager, error) {
if config.ReplDS == nil { if config.ReplDS == nil {
m.storage = ds m.storage = ds
} else { } else {
be, err := clientutil.NewBackend(config.ReplDS) r, err := replds.NewPublicClient(config.ReplDS)
if err != nil { if err != nil {
return nil, err return nil, err
} }
m.storage = &replStorage{ m.storage = &replStorage{
dirStorage: ds, dirStorage: ds,
replClient: be, replClient: r,
} }
} }
......
...@@ -16,7 +16,6 @@ import ( ...@@ -16,7 +16,6 @@ import (
"strings" "strings"
"time" "time"
"git.autistici.org/ai3/go-common/clientutil"
"git.autistici.org/ai3/replds" "git.autistici.org/ai3/replds"
) )
...@@ -93,7 +92,7 @@ func dumpCertsAndKey(cn string, der [][]byte, key crypto.Signer) (map[string][]b ...@@ -93,7 +92,7 @@ func dumpCertsAndKey(cn string, der [][]byte, key crypto.Signer) (map[string][]b
// certificates to replds instead. // certificates to replds instead.
type replStorage struct { type replStorage struct {
*dirStorage *dirStorage
replClient clientutil.Backend replClient replds.PublicClient
} }
func (d *replStorage) PutCert(cn string, der [][]byte, key crypto.Signer) error { func (d *replStorage) PutCert(cn string, der [][]byte, key crypto.Signer) error {
...@@ -115,8 +114,8 @@ func (d *replStorage) PutCert(cn string, der [][]byte, key crypto.Signer) error ...@@ -115,8 +114,8 @@ func (d *replStorage) PutCert(cn string, der [][]byte, key crypto.Signer) error
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel() defer cancel()
var resp replds.SetNodesResponse resp, err := d.replClient.SetNodes(ctx, &req)
if err := clientutil.DoJSONHTTPRequest(ctx, d.replClient.Client(""), d.replClient.URL("")+"/api/set_nodes", req, &resp); err != nil { if err != nil {
return err return err
} }
if resp.HostsOk < 1 { if resp.HostsOk < 1 {
......
package clientutil package clientutil
import ( import (
"crypto/tls" "context"
"fmt"
"net/http"
"net/url"
"sync"
"time"
) )
// BackendConfig specifies the configuration to access a service. // BackendConfig specifies the configuration of a service backend.
// //
// Services with multiple backends can be replicated or partitioned, // Services with multiple backends can be replicated or partitioned,
// depending on a configuration switch, making it a deployment-time // depending on a configuration switch, making it a deployment-time
...@@ -18,102 +13,30 @@ import ( ...@@ -18,102 +13,30 @@ import (
// 'shard' parameter on their APIs. // 'shard' parameter on their APIs.
type BackendConfig struct { type BackendConfig struct {
URL string `yaml:"url"` URL string `yaml:"url"`
Sharded bool `yaml:"sharded"`
TLSConfig *TLSClientConfig `yaml:"tls_config"` TLSConfig *TLSClientConfig `yaml:"tls_config"`
Sharded bool `yaml:"sharded"`
Debug bool `yaml:"debug"`
} }
// Backend is a runtime class that provides http Clients for use with // Backend is a runtime class that provides http Clients for use with
// a specific service backend. If the service can't be partitioned, // a specific service backend. If the service can't be partitioned,
// pass an empty string to the Client method. // pass an empty string to the Call method.
type Backend interface { type Backend interface {
// URL for the service for a specific shard. // Call a remote method. The sharding behavior is the following:
URL(string) string //
// Services that support sharding (partitioning) should always
// include the shard ID in their Call() requests. Users can
// then configure backends to be sharded or not in their
// Config. When invoking Call with a shard ID on a non-sharded
// service, the shard ID is simply ignored. Invoking Call
// *without* a shard ID on a sharded service is an error.
Call(context.Context, string, string, interface{}, interface{}) error
// Client that can be used to make a request to the service. // Close all resources associated with the backend.
Client(string) *http.Client Close()
} }
// NewBackend returns a new Backend with the given config. // NewBackend returns a new Backend with the given config.
func NewBackend(config *BackendConfig) (Backend, error) { func NewBackend(config *BackendConfig) (Backend, error) {
u, err := url.Parse(config.URL) return newBalancedBackend(config, defaultResolver)
if err != nil {
return nil, err
}
var tlsConfig *tls.Config
if config.TLSConfig != nil {
tlsConfig, err = config.TLSConfig.TLSConfig()
if err != nil {
return nil, err
}
}
if config.Sharded {
return &replicatedClient{
u: u,
c: newHTTPClient(u, tlsConfig),
}, nil
}
return &shardedClient{
baseURL: u,
tlsConfig: tlsConfig,
urls: make(map[string]*url.URL),
shards: make(map[string]*http.Client),
}, nil
}
type replicatedClient struct {
c *http.Client
u *url.URL
}
func (r *replicatedClient) Client(_ string) *http.Client { return r.c }
func (r *replicatedClient) URL(_ string) string { return r.u.String() }
type shardedClient struct {
baseURL *url.URL
tlsConfig *tls.Config
mx sync.Mutex
urls map[string]*url.URL
shards map[string]*http.Client
}
func (s *shardedClient) getShardURL(shard string) *url.URL {
if shard == "" {
return s.baseURL
}
u, ok := s.urls[shard]
if !ok {
var tmp = *s.baseURL
tmp.Host = fmt.Sprintf("%s.%s", shard, tmp.Host)
u = &tmp
s.urls[shard] = u
}
return u
}
func (s *shardedClient) URL(shard string) string {
s.mx.Lock()
defer s.mx.Unlock()
return s.getShardURL(shard).String()
}
func (s *shardedClient) Client(shard string) *http.Client {
s.mx.Lock()
defer s.mx.Unlock()
client, ok := s.shards[shard]
if !ok {
u := s.getShardURL(shard)
client = newHTTPClient(u, s.tlsConfig)
s.shards[shard] = client
}
return client
}
func newHTTPClient(u *url.URL, tlsConfig *tls.Config) *http.Client {
return &http.Client{
Transport: NewTransport([]string{u.Host}, tlsConfig, nil),
Timeout: 30 * time.Second,
}
} }
package clientutil
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"log"
"math/rand"
"net/http"
"net/url"
"os"
"strconv"
"strings"
"time"
"github.com/cenkalti/backoff"
)
// Our own narrow logger interface.
type logger interface {
Printf(string, ...interface{})
}
// A nilLogger is used when Config.Debug is false.
type nilLogger struct{}
func (l nilLogger) Printf(_ string, _ ...interface{}) {}
// Parameters that define the exponential backoff algorithm used.
var (
ExponentialBackOffInitialInterval = 100 * time.Millisecond
ExponentialBackOffMultiplier = 1.4142
)
// newExponentialBackOff creates a backoff.ExponentialBackOff object
// with our own default values.
func newExponentialBackOff() *backoff.ExponentialBackOff {
b := backoff.NewExponentialBackOff()
b.InitialInterval = ExponentialBackOffInitialInterval
b.Multiplier = ExponentialBackOffMultiplier
// Set MaxElapsedTime to 0 because we expect the overall
// timeout to be dictated by the request Context.
b.MaxElapsedTime = 0
return b
}
// Balancer for HTTP connections. It will round-robin across available
// backends, trying to avoid ones that are erroring out, until one
// succeeds or returns a permanent error.
//
// This object should not be used for load balancing of individual
// HTTP requests: it doesn't do anything smart beyond trying to avoid
// broken targets. It's meant to provide a *reliable* connection to a
// set of equivalent services for HA purposes.
type balancedBackend struct {
*backendTracker
*transportCache
baseURI *url.URL
sharded bool
resolver resolver
log logger
}
func newBalancedBackend(config *BackendConfig, resolver resolver) (*balancedBackend, error) {
u, err := url.Parse(config.URL)
if err != nil {
return nil, err
}
var tlsConfig *tls.Config
if config.TLSConfig != nil {
tlsConfig, err = config.TLSConfig.TLSConfig()
if err != nil {
return nil, err
}
}
var logger logger = &nilLogger{}
if config.Debug {
logger = log.New(os.Stderr, fmt.Sprintf("backend %s: ", u.Host), 0)
}
return &balancedBackend{
backendTracker: newBackendTracker(u.Host, resolver, logger),
transportCache: newTransportCache(tlsConfig),
sharded: config.Sharded,
baseURI: u,
resolver: resolver,
log: logger,
}, nil
}
// Call the backend. Makes an HTTP POST request to the specified uri,
// with a JSON-encoded request body. It will attempt to decode the
// response body as JSON.
func (b *balancedBackend) Call(ctx context.Context, shard, path string, req, resp interface{}) error {
data, err := json.Marshal(req)
if err != nil {
return err
}
var tg targetGenerator = b.backendTracker
if b.sharded && shard != "" {
tg = newShardedGenerator(shard, b.baseURI.Host, b.resolver)
}
seq := newSequence(tg)
b.log.Printf("%016x: initialized", seq.ID())
var httpResp *http.Response
err = backoff.Retry(func() error {
req, rerr := b.newJSONRequest(path, shard, data)
if rerr != nil {
return rerr
}
httpResp, rerr = b.do(ctx, seq, req)
return rerr
}, backoff.WithContext(newExponentialBackOff(), ctx))
if err != nil {
return err
}
defer httpResp.Body.Close() // nolint
if httpResp.Header.Get("Content-Type") != "application/json" {
return errors.New("not a JSON response")
}
if resp == nil {
return nil
}
return json.NewDecoder(httpResp.Body).Decode(resp)
}
// Return the URI to be used for the request. This is used both in the
// Host HTTP header and as the TLS server name used to pick a server
// certificate (if using TLS).
func (b *balancedBackend) getURIForRequest(shard, path string) string {
u := *b.baseURI
if b.sharded && shard != "" {
u.Host = fmt.Sprintf("%s.%s", shard, u.Host)
}
u.Path = appendPath(u.Path, path)
return u.String()
}
// Build a http.Request object.
func (b *balancedBackend) newJSONRequest(path, shard string, data []byte) (*http.Request, error) {
req, err := http.NewRequest("POST", b.getURIForRequest(shard, path), bytes.NewReader(data))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Content-Length", strconv.FormatInt(int64(len(data)), 10))
return req, nil
}
// Select a new target from the given sequence and send the request to
// it. Wrap HTTP errors in a RemoteError object.
func (b *balancedBackend) do(ctx context.Context, seq *sequence, req *http.Request) (resp *http.Response, err error) {
target, terr := seq.Next()
if terr != nil {
return
}
b.log.Printf("sequence %016x: connecting to %s", seq.ID(), target)
client := &http.Client{
Transport: b.transportCache.getTransport(target),
}
resp, err = client.Do(req.WithContext(ctx))
if err == nil && resp.StatusCode != 200 {
err = remoteErrorFromResponse(resp)
if !isStatusTemporary(resp.StatusCode) {
err = backoff.Permanent(err)
}
resp.Body.Close() // nolint
resp = nil
}
seq.Done(target, err)
return
}
var errNoTargets = errors.New("no available backends")
type targetGenerator interface {
getTargets() []string
setStatus(string, bool)
}
// A replicatedSequence repeatedly iterates over available backends in order of
// preference. Once in a while it refreshes its list of available
// targets.
type sequence struct {
id uint64
tg targetGenerator
targets []string
pos int
}
func newSequence(tg targetGenerator) *sequence {
return &sequence{
id: rand.Uint64(),
tg: tg,
targets: tg.getTargets(),
}
}
func (s *sequence) ID() uint64 { return s.id }
func (s *sequence) reloadTargets() {
targets := s.tg.getTargets()
if len(targets) > 0 {
s.targets = targets
s.pos = 0
}
}
// Next returns the next target.
func (s *sequence) Next() (t string, err error) {
if s.pos >= len(s.targets) {
s.reloadTargets()
if len(s.targets) == 0 {
err = errNoTargets
return
}
}
t = s.targets[s.pos]
s.pos++
return
}
func (s *sequence) Done(t string, err error) {
s.tg.setStatus(t, err == nil)
}
// A shardedGenerator returns a single sharded target to a sequence.
type shardedGenerator struct {
id uint64
addrs []string
}
func newShardedGenerator(shard, base string, resolver resolver) *shardedGenerator {
return &shardedGenerator{
id: rand.Uint64(),
addrs: resolver.ResolveIP(fmt.Sprintf("%s.%s", shard, base)),
}
}
func (g *shardedGenerator) getTargets() []string { return g.addrs }
func (g *shardedGenerator) setStatus(_ string, _ bool) {}
// Concatenate two URI paths.
func appendPath(a, b string) string {
if strings.HasSuffix(a, "/") && strings.HasPrefix(b, "/") {
return a + b[1:]
}
return a + b
}
// Some HTTP status codes are treated are temporary errors.
func isStatusTemporary(code int) bool {
switch code {
case http.StatusTooManyRequests, http.StatusBadGateway, http.StatusServiceUnavailable, http.StatusGatewayTimeout:
return true
default:
return false
}
}
File added
package clientutil
import (
"log"
"net"
"sync"
"time"
"golang.org/x/sync/singleflight"
)
type resolver interface {
ResolveIP(string) []string
}
type dnsResolver struct{}
func (r *dnsResolver) ResolveIP(hostport string) []string {
var resolved []string
host, port, err := net.SplitHostPort(hostport)
if err != nil {
log.Printf("error parsing %s: %v", hostport, err)
return nil
}
hostIPs, err := net.LookupIP(host)
if err != nil {
log.Printf("error resolving %s: %v", host, err)
return nil
}
for _, ip := range hostIPs {
resolved = append(resolved, net.JoinHostPort(ip.String(), port))
}
return resolved
}
var defaultResolver = newDNSCache(&dnsResolver{})
type cacheDatum struct {
addrs []string
deadline time.Time
}
type dnsCache struct {
resolver resolver
sf singleflight.Group
mx sync.RWMutex
cache map[string]cacheDatum
}
func newDNSCache(resolver resolver) *dnsCache {
return &dnsCache{
resolver: resolver,
cache: make(map[string]cacheDatum),
}
}
func (c *dnsCache) get(host string) ([]string, bool) {
d, ok := c.cache[host]
if !ok {
return nil, false
}
return d.addrs, d.deadline.After(time.Now())
}
func (c *dnsCache) update(host string) []string {
v, _, _ := c.sf.Do(host, func() (interface{}, error) {
addrs := c.resolver.ResolveIP(host)
// By uncommenting this, we stop caching negative results.
// if len(addrs) == 0 {
// return nil, nil
// }
c.mx.Lock()
c.cache[host] = cacheDatum{
addrs: addrs,
deadline: time.Now().Add(60 * time.Second),
}
c.mx.Unlock()
return addrs, nil
})
return v.([]string)
}
func (c *dnsCache) ResolveIP(host string) []string {
c.mx.RLock()
addrs, ok := c.get(host)
c.mx.RUnlock()
if ok {
return addrs
}
if len(addrs) > 0 {
go c.update(host)
return addrs
}
return c.update(host)
}
// Package clientutil implements a very simple style of JSON RPC.
//
// Requests and responses are both encoded in JSON, and they should
// have the "application/json" Content-Type.
//
// HTTP response statuses other than 200 indicate an error: in this
// case, the response body may contain (in plain text) further details
// about the error. Some HTTP status codes are considered temporary
// errors (incl. 429 for throttling). The client will retry requests,
// if targets are available, until the context expires - so it's quite
// important to remember to set a timeout on the context given to the
// Call() function!
//
// The client handles both replicated services and sharded
// (partitioned) services. Users of this package that want to support
// sharded deployments are supposed to pass a shard ID to every
// Call(). At the deployment stage, sharding can be enabled via the
// configuration.
//
// For replicated services, the client will expect the provided
// hostname to resolve to one or more IP addresses, in which case it
// will pick a random IP address on every new request, while
// remembering which addresses have had errors and trying to avoid
// them. It will however send an occasional request to the failed
// targets, to see if they've come back.
//
// For sharded services, the client makes simple HTTP requests to the
// specific target identified by the shard. It does this by prepending
// the shard ID to the backend hostname (so a request to "example.com"
// with shard ID "1" becomes a request to "1.example.com").
//
// The difference with other JSON-RPC implementations is that we use a
// different URI for every method, and we force the usage of
// request/response types. This makes it easy for projects to
// eventually migrate to GRPC.
//
package clientutil
package clientutil
import (
"fmt"
"io/ioutil"
"net/http"
)
// RemoteError represents a HTTP error from the server. The status
// code and response body can be retrieved with the StatusCode() and
// Body() methods.
type RemoteError struct {
statusCode int
body string
}
func remoteErrorFromResponse(resp *http.Response) *RemoteError {
// Optimistically read the response body, ignoring errors.
var body string
if data, err := ioutil.ReadAll(resp.Body); err == nil {
body = string(data)
}
return &RemoteError{statusCode: resp.StatusCode, body: body}
}
// Error implements the error interface.
func (e *RemoteError) Error() string {
return fmt.Sprintf("%d - %s", e.statusCode, e.body)
}
// StatusCode returns the HTTP status code.
func (e *RemoteError) StatusCode() int { return e.statusCode }
// Body returns the response body.
func (e *RemoteError) Body() string { return e.body }
package clientutil
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
)
// DoJSONHTTPRequest makes an HTTP POST request to the specified uri,
// with a JSON-encoded request body. It will attempt to decode the
// response body as JSON.
func DoJSONHTTPRequest(ctx context.Context, client *http.Client, uri string, req, resp interface{}) error {
data, err := json.Marshal(req)
if err != nil {
return err
}
httpReq, err := http.NewRequest("POST", uri, bytes.NewReader(data))
if err != nil {
return err
}
httpReq.Header.Set("Content-Type", "application/json")
httpReq = httpReq.WithContext(ctx)
httpResp, err := RetryHTTPDo(client, httpReq, NewExponentialBackOff())
if err != nil {
return err
}
defer httpResp.Body.Close()
if httpResp.StatusCode != 200 {
return fmt.Errorf("HTTP status %d", httpResp.StatusCode)
}
if httpResp.Header.Get("Content-Type") != "application/json" {
return errors.New("not a JSON response")
}
if resp == nil {
return nil
}
return json.NewDecoder(httpResp.Body).Decode(resp)
}
package clientutil
import (
"errors"
"net/http"
"time"
"github.com/cenkalti/backoff"
)
// NewExponentialBackOff creates a backoff.ExponentialBackOff object
// with our own default values.
func NewExponentialBackOff() *backoff.ExponentialBackOff {
b := backoff.NewExponentialBackOff()
b.InitialInterval = 100 * time.Millisecond
//b.Multiplier = 1.4142
return b
}
// A temporary (retriable) error is something that has a Temporary method.
type tempError interface {
Temporary() bool
}
type tempErrorWrapper struct {
error
}
func (t tempErrorWrapper) Temporary() bool { return true }
// TempError makes a temporary (retriable) error out of a normal error.
func TempError(err error) error {
return tempErrorWrapper{err}
}
// Retry operation op until it succeeds according to the backoff
// policy b.
//
// Note that this function reverses the error semantics of
// backoff.Operation: all errors are permanent unless explicitly
// marked as temporary (i.e. they have a Temporary() method that
// returns true). This is to better align with the errors returned by
// the net package.
func Retry(op backoff.Operation, b backoff.BackOff) error {
innerOp := func() error {
err := op()
if err == nil {
return err
}
if tmpErr, ok := err.(tempError); ok && tmpErr.Temporary() {
return err
}
return backoff.Permanent(err)
}
return backoff.Retry(innerOp, b)
}
var errHTTPBackOff = TempError(errors.New("temporary http error"))
func isStatusTemporary(code int) bool {
switch code {
case http.StatusTooManyRequests, http.StatusBadGateway, http.StatusServiceUnavailable, http.StatusGatewayTimeout:
return true
default:
return false
}
}
// RetryHTTPDo retries an HTTP request until it succeeds, according to
// the backoff policy b. It will retry on temporary network errors and
// upon receiving specific temporary HTTP errors. It will use the
// context associated with the HTTP request object.
func RetryHTTPDo(client *http.Client, req *http.Request, b backoff.BackOff) (*http.Response, error) {
var resp *http.Response
op := func() error {
// Clear up previous response if set.
if resp != nil {
resp.Body.Close()
}
var err error
resp, err = client.Do(req)
if err == nil && isStatusTemporary(resp.StatusCode) {
resp.Body.Close()
return errHTTPBackOff
}
return err
}
err := Retry(op, backoff.WithContext(b, req.Context()))
return resp, err
}
package clientutil
import (
"math/rand"
"sync"
"time"
)
// The backendTracker tracks the state of the targets associated with
// a backend, and periodically checks DNS for updates.
type backendTracker struct {
log logger
addr string
resolver resolver
stopCh chan struct{}
mx sync.Mutex
resolved []string
failed map[string]time.Time
}
func newBackendTracker(addr string, resolver resolver, logger logger) *backendTracker {
// Resolve the targets once before returning.
b := &backendTracker{
addr: addr,
resolver: resolver,
resolved: resolver.ResolveIP(addr),
failed: make(map[string]time.Time),
stopCh: make(chan struct{}),
log: logger,
}
go b.updateProc()
return b
}
func (b *backendTracker) Close() {
close(b.stopCh)
}
// Return the full list of targets in reverse preference order.
func (b *backendTracker) getTargets() []string {
b.mx.Lock()
defer b.mx.Unlock()
var good, bad []string
for _, t := range b.resolved {
if _, ok := b.failed[t]; ok {
bad = append(bad, t)
} else {
good = append(good, t)
}
}
good = shuffle(good)
bad = shuffle(bad)
return append(good, bad...)
}
func (b *backendTracker) setStatus(addr string, ok bool) {
b.mx.Lock()
_, isFailed := b.failed[addr]
if isFailed && ok {
b.log.Printf("target %s now ok", addr)
delete(b.failed, addr)
} else if !isFailed && !ok {
b.log.Printf("target %s failed", addr)
b.failed[addr] = time.Now()
}
b.mx.Unlock()
}
var (
backendUpdateInterval = 60 * time.Second
backendFailureRetryInterval = 60 * time.Second
)
func (b *backendTracker) expireFailedTargets() {
b.mx.Lock()
now := time.Now()
for k, v := range b.failed {
if now.Sub(v) > backendFailureRetryInterval {
delete(b.failed, k)
}
}
b.mx.Unlock()
}
func (b *backendTracker) updateProc() {
tick := time.NewTicker(backendUpdateInterval)
defer tick.Stop()
for {
select {
case <-b.stopCh:
return
case <-tick.C:
b.expireFailedTargets()
resolved := b.resolver.ResolveIP(b.addr)
if len(resolved) > 0 {
b.mx.Lock()
b.resolved = resolved
b.mx.Unlock()
}
}
}
}
var shuffleSrc = rand.NewSource(time.Now().UnixNano())
// Re-order elements of a slice randomly.
func shuffle(values []string) []string {
if len(values) < 2 {
return values
}
rnd := rand.New(shuffleSrc)
for i := len(values) - 1; i > 0; i-- {
j := rnd.Intn(i + 1)
values[i], values[j] = values[j], values[i]
}
return values
}
...@@ -3,170 +3,63 @@ package clientutil ...@@ -3,170 +3,63 @@ package clientutil
import ( import (
"context" "context"
"crypto/tls" "crypto/tls"
"errors"
"log"
"net" "net"
"net/http" "net/http"
"sync" "sync"
"time" "time"
) )
var errAllBackendsFailed = errors.New("all backends failed") // The transportCache is just a cache of http transports, each
// connecting to a specific address.
type dnsResolver struct{}
func (r *dnsResolver) ResolveIPs(hosts []string) []string {
var resolved []string
for _, hostport := range hosts {
host, port, err := net.SplitHostPort(hostport)
if err != nil {
log.Printf("error parsing %s: %v", hostport, err)
continue
}
hostIPs, err := net.LookupIP(host)
if err != nil {
log.Printf("error resolving %s: %v", host, err)
continue
}
for _, ip := range hostIPs {
resolved = append(resolved, net.JoinHostPort(ip.String(), port))
}
}
return resolved
}
var defaultResolver = &dnsResolver{}
type resolver interface {
ResolveIPs([]string) []string
}
// Balancer for HTTP connections. It will round-robin across available
// backends, trying to avoid ones that are erroring out, until one
// succeeds or they all fail.
// //
// This object should not be used for load balancing of individual // We use this to control the HTTP Host header and the TLS ServerName
// HTTP requests: once a new connection is established, requests will // independently of the target address.
// be sent over it until it errors out. It's meant to provide a type transportCache struct {
// *reliable* connection to a set of equivalent backends for HA tlsConfig *tls.Config
// purposes.
type balancer struct {
hosts []string
resolver resolver
stop chan bool
// List of currently valid (or untested) backends, and ones mx sync.RWMutex
// that errored out at least once. transports map[string]http.RoundTripper
mx sync.Mutex
addrs []string
ok map[string]bool
} }
var backendUpdateInterval = 60 * time.Second func newTransportCache(tlsConfig *tls.Config) *transportCache {
return &transportCache{
tlsConfig: tlsConfig,
transports: make(map[string]http.RoundTripper),
}
}
// Periodically update the list of available backends. func (m *transportCache) newTransport(addr string) http.RoundTripper {
func (b *balancer) updateProc() { return &http.Transport{
tick := time.NewTicker(backendUpdateInterval) TLSClientConfig: m.tlsConfig,
for { DialContext: func(ctx context.Context, network, _ string) (net.Conn, error) {
select { return netDialContext(ctx, network, addr)
case <-b.stop: },
return
case <-tick.C:
resolved := b.resolver.ResolveIPs(b.hosts)
if len(resolved) > 0 {
b.mx.Lock()
b.addrs = resolved
b.mx.Unlock()
}
}
} }
} }
// Returns a list of all available backends, split into "good ones" func (m *transportCache) getTransport(addr string) http.RoundTripper {
// (no errors seen since last successful connection) and "bad ones". m.mx.RLock()
func (b *balancer) getBackends() ([]string, []string) { t, ok := m.transports[addr]
b.mx.Lock() m.mx.RUnlock()
defer b.mx.Unlock()
var good, bad []string if !ok {
for _, addr := range b.addrs { m.mx.Lock()
if ok := b.ok[addr]; ok { if t, ok = m.transports[addr]; !ok {
good = append(good, addr) t = m.newTransport(addr)
} else { m.transports[addr] = t
bad = append(bad, addr)
} }
m.mx.Unlock()
} }
return good, bad
}
func (b *balancer) notify(addr string, ok bool) { return t
b.mx.Lock()
b.ok[addr] = ok
b.mx.Unlock()
} }
// Go < 1.9 does not have net.DialContext, reimplement it in terms of
// net.DialTimeout.
func netDialContext(ctx context.Context, network, addr string) (net.Conn, error) { func netDialContext(ctx context.Context, network, addr string) (net.Conn, error) {
timeout := 30 * time.Second timeout := 60 * time.Second // some arbitrary max timeout
// Go < 1.9 does not have net.DialContext, reimplement it in
// terms of net.DialTimeout.
if deadline, ok := ctx.Deadline(); ok { if deadline, ok := ctx.Deadline(); ok {
timeout = time.Until(deadline) timeout = time.Until(deadline)
} }
return net.DialTimeout(network, addr, timeout) return net.DialTimeout(network, addr, timeout)
} }
func (b *balancer) dial(ctx context.Context, network, addr string) (net.Conn, error) {
// Start by attempting a connection on 'good' targets.
good, bad := b.getBackends()
for _, addr := range good {
// Go < 1.9 does not have DialContext, deal with it
conn, err := netDialContext(ctx, network, addr)
if err == nil {
return conn, nil
} else if err == context.Canceled {
// A timeout might be bad, set the error bit
// on the connection.
b.notify(addr, false)
return nil, err
}
b.notify(addr, false)
}
for _, addr := range bad {
conn, err := netDialContext(ctx, network, addr)
if err == nil {
b.notify(addr, true)
return conn, nil
} else if err == context.Canceled {
return nil, err
}
}
return nil, errAllBackendsFailed
}
// NewTransport returns a suitably configured http.RoundTripper that
// talks to a specific backend service. It performs discovery of
// available backends via DNS (using A or AAAA record lookups), tries
// to route traffic away from faulty backends.
//
// It will periodically attempt to rediscover new backends.
func NewTransport(backends []string, tlsConf *tls.Config, resolver resolver) http.RoundTripper {
if resolver == nil {
resolver = defaultResolver
}
addrs := resolver.ResolveIPs(backends)
b := &balancer{
hosts: backends,
resolver: resolver,
addrs: addrs,
ok: make(map[string]bool),
}
go b.updateProc()
return &http.Transport{
DialContext: b.dial,
TLSClientConfig: tlsConf,
}
}
File deleted
...@@ -31,17 +31,17 @@ func newClient(config *clientutil.BackendConfig) (*client, error) { ...@@ -31,17 +31,17 @@ func newClient(config *clientutil.BackendConfig) (*client, error) {
func (c *client) internalGetNodes(ctx context.Context, req *internalGetNodesRequest) (*internalGetNodesResponse, error) { func (c *client) internalGetNodes(ctx context.Context, req *internalGetNodesRequest) (*internalGetNodesResponse, error) {
var resp internalGetNodesResponse var resp internalGetNodesResponse
err := clientutil.DoJSONHTTPRequest(ctx, c.be.Client(""), c.be.URL("")+"/api/internal/get_nodes", req, &resp) err := c.be.Call(ctx, "", "/api/internal/get_nodes", req, &resp)
return &resp, err return &resp, err
} }
func (c *client) internalUpdateNodes(ctx context.Context, req *internalUpdateNodesRequest) error { func (c *client) internalUpdateNodes(ctx context.Context, req *internalUpdateNodesRequest) error {
return clientutil.DoJSONHTTPRequest(ctx, c.be.Client(""), c.be.URL("")+"/api/internal/update_nodes", req, nil) return c.be.Call(ctx, "", "/api/internal/update_nodes", req, nil)
} }
func (c *client) SetNodes(ctx context.Context, req *SetNodesRequest) (*SetNodesResponse, error) { func (c *client) SetNodes(ctx context.Context, req *SetNodesRequest) (*SetNodesResponse, error) {
var resp SetNodesResponse var resp SetNodesResponse
err := clientutil.DoJSONHTTPRequest(ctx, c.be.Client(""), c.be.URL("")+"/api/set_nodes", req, &resp) err := c.be.Call(ctx, "", "/api/set_nodes", req, &resp)
return &resp, err return &resp, err
} }
......
...@@ -21,7 +21,7 @@ var ( ...@@ -21,7 +21,7 @@ var (
pollPeriod = 120 * time.Second pollPeriod = 120 * time.Second
// Timeout for InternalGetNodes requests. // Timeout for InternalGetNodes requests.
getNodesTimeout = 60 * time.Second getNodesTimeout = 20 * time.Second
) )
// Node is an annotated path/value entry. // Node is an annotated path/value entry.
......
...@@ -24,7 +24,7 @@ See https://godoc.org/github.com/cenkalti/backoff#pkg-examples ...@@ -24,7 +24,7 @@ See https://godoc.org/github.com/cenkalti/backoff#pkg-examples
[coveralls]: https://coveralls.io/github/cenkalti/backoff?branch=master [coveralls]: https://coveralls.io/github/cenkalti/backoff?branch=master
[coveralls image]: https://coveralls.io/repos/github/cenkalti/backoff/badge.svg?branch=master [coveralls image]: https://coveralls.io/repos/github/cenkalti/backoff/badge.svg?branch=master
[google-http-java-client]: https://github.com/google/google-http-java-client [google-http-java-client]: https://github.com/google/google-http-java-client/blob/da1aa993e90285ec18579f1553339b00e19b3ab5/google-http-client/src/main/java/com/google/api/client/util/ExponentialBackOff.java
[exponential backoff wiki]: http://en.wikipedia.org/wiki/Exponential_backoff [exponential backoff wiki]: http://en.wikipedia.org/wiki/Exponential_backoff
[advanced example]: https://godoc.org/github.com/cenkalti/backoff#example_ [advanced example]: https://godoc.org/github.com/cenkalti/backoff#example_
package backoff package backoff
import ( import (
"context"
"time" "time"
"golang.org/x/net/context"
) )
// BackOffContext is a backoff policy that stops retrying after the context // BackOffContext is a backoff policy that stops retrying after the context
......
...@@ -15,7 +15,6 @@ type Notify func(error, time.Duration) ...@@ -15,7 +15,6 @@ type Notify func(error, time.Duration)
// Retry the operation o until it does not return error or BackOff stops. // Retry the operation o until it does not return error or BackOff stops.
// o is guaranteed to be run at least once. // o is guaranteed to be run at least once.
// It is the caller's responsibility to reset b after Retry returns.
// //
// If o returns a *PermanentError, the operation is not retried, and the // If o returns a *PermanentError, the operation is not retried, and the
// wrapped error is returned. // wrapped error is returned.
......
package backoff package backoff
import ( import (
"runtime"
"sync" "sync"
"time" "time"
) )
...@@ -34,7 +33,6 @@ func NewTicker(b BackOff) *Ticker { ...@@ -34,7 +33,6 @@ func NewTicker(b BackOff) *Ticker {
} }
t.b.Reset() t.b.Reset()
go t.run() go t.run()
runtime.SetFinalizer(t, (*Ticker).Stop)
return t return t
} }
......
...@@ -3,13 +3,13 @@ package backoff ...@@ -3,13 +3,13 @@ package backoff
import "time" import "time"
/* /*
WithMaxTries creates a wrapper around another BackOff, which will WithMaxRetries creates a wrapper around another BackOff, which will
return Stop if NextBackOff() has been called too many times since return Stop if NextBackOff() has been called too many times since
the last time Reset() was called the last time Reset() was called
Note: Implementation is not thread-safe. Note: Implementation is not thread-safe.
*/ */
func WithMaxTries(b BackOff, max uint64) BackOff { func WithMaxRetries(b BackOff, max uint64) BackOff {
return &backOffTries{delegate: b, maxTries: max} return &backOffTries{delegate: b, maxTries: max}
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment