Commit 81eb87b7 authored by ale's avatar ale

Update vendored deps

parent 00fb053f
package clientutil
import (
"crypto/tls"
"fmt"
"net/http"
"net/url"
"sync"
"time"
)
// BackendConfig specifies the configuration to access a service.
//
// Services with multiple backends can be replicated or partitioned,
// depending on a configuration switch, making it a deployment-time
// decision. Clients are expected to compute their own sharding
// function (either by database lookup or other methods), and expose a
// 'shard' parameter on their APIs.
type BackendConfig struct {
URL string `yaml:"url"`
Sharded bool `yaml:"sharded"`
TLSConfig *TLSClientConfig `yaml:"tls_config"`
}
// Backend is a runtime class that provides http Clients for use with
// a specific service backend. If the service can't be partitioned,
// pass an empty string to the Client method.
type Backend interface {
// URL for the service for a specific shard.
URL(string) string
// Client that can be used to make a request to the service.
Client(string) *http.Client
}
// NewBackend returns a new Backend with the given config.
func NewBackend(config *BackendConfig) (Backend, error) {
u, err := url.Parse(config.URL)
if err != nil {
return nil, err
}
var tlsConfig *tls.Config
if config.TLSConfig != nil {
tlsConfig, err = config.TLSConfig.TLSConfig()
if err != nil {
return nil, err
}
}
if config.Sharded {
return &replicatedClient{
u: u,
c: newHTTPClient(u, tlsConfig),
}, nil
}
return &shardedClient{
baseURL: u,
tlsConfig: tlsConfig,
urls: make(map[string]*url.URL),
shards: make(map[string]*http.Client),
}, nil
}
type replicatedClient struct {
c *http.Client
u *url.URL
}
func (r *replicatedClient) Client(_ string) *http.Client { return r.c }
func (r *replicatedClient) URL(_ string) string { return r.u.String() }
type shardedClient struct {
baseURL *url.URL
tlsConfig *tls.Config
mx sync.Mutex
urls map[string]*url.URL
shards map[string]*http.Client
}
func (s *shardedClient) getShardURL(shard string) *url.URL {
if shard == "" {
return s.baseURL
}
u, ok := s.urls[shard]
if !ok {
var tmp = *s.baseURL
tmp.Host = fmt.Sprintf("%s.%s", shard, tmp.Host)
u = &tmp
s.urls[shard] = u
}
return u
}
func (s *shardedClient) URL(shard string) string {
s.mx.Lock()
defer s.mx.Unlock()
return s.getShardURL(shard).String()
}
func (s *shardedClient) Client(shard string) *http.Client {
s.mx.Lock()
defer s.mx.Unlock()
client, ok := s.shards[shard]
if !ok {
u := s.getShardURL(shard)
client = newHTTPClient(u, s.tlsConfig)
s.shards[shard] = client
}
return client
}
func newHTTPClient(u *url.URL, tlsConfig *tls.Config) *http.Client {
return &http.Client{
Transport: NewTransport([]string{u.Host}, tlsConfig, nil),
Timeout: 30 * time.Second,
}
}
package clientutil
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
)
// DoJSONHTTPRequest makes an HTTP POST request to the specified uri,
// with a JSON-encoded request body. It will attempt to decode the
// response body as JSON.
func DoJSONHTTPRequest(ctx context.Context, client *http.Client, uri string, req, resp interface{}) error {
data, err := json.Marshal(req)
if err != nil {
return err
}
httpReq, err := http.NewRequest("POST", uri, bytes.NewReader(data))
if err != nil {
return err
}
httpReq.Header.Set("Content-Type", "application/json")
httpReq = httpReq.WithContext(ctx)
httpResp, err := RetryHTTPDo(client, httpReq, NewExponentialBackOff())
if err != nil {
return err
}
defer httpResp.Body.Close()
if httpResp.StatusCode != 200 {
return fmt.Errorf("HTTP status %d", httpResp.StatusCode)
}
if httpResp.Header.Get("Content-Type") != "application/json" {
return errors.New("not a JSON response")
}
if resp == nil {
return nil
}
return json.NewDecoder(httpResp.Body).Decode(resp)
}
package clientutil
import (
"errors"
"net/http"
"time"
"github.com/cenkalti/backoff"
)
// NewExponentialBackOff creates a backoff.ExponentialBackOff object
// with our own default values.
func NewExponentialBackOff() *backoff.ExponentialBackOff {
b := backoff.NewExponentialBackOff()
b.InitialInterval = 100 * time.Millisecond
//b.Multiplier = 1.4142
return b
}
// A temporary (retriable) error is something that has a Temporary method.
type tempError interface {
Temporary() bool
}
type tempErrorWrapper struct {
error
}
func (t tempErrorWrapper) Temporary() bool { return true }
// TempError makes a temporary (retriable) error out of a normal error.
func TempError(err error) error {
return tempErrorWrapper{err}
}
// Retry operation op until it succeeds according to the backoff
// policy b.
//
// Note that this function reverses the error semantics of
// backoff.Operation: all errors are permanent unless explicitly
// marked as temporary (i.e. they have a Temporary() method that
// returns true). This is to better align with the errors returned by
// the net package.
func Retry(op backoff.Operation, b backoff.BackOff) error {
innerOp := func() error {
err := op()
if err == nil {
return err
}
if tmpErr, ok := err.(tempError); ok && tmpErr.Temporary() {
return err
}
return backoff.Permanent(err)
}
return backoff.Retry(innerOp, b)
}
var errHTTPBackOff = TempError(errors.New("temporary http error"))
func isStatusTemporary(code int) bool {
switch code {
case http.StatusTooManyRequests, http.StatusBadGateway, http.StatusServiceUnavailable, http.StatusGatewayTimeout:
return true
default:
return false
}
}
// RetryHTTPDo retries an HTTP request until it succeeds, according to
// the backoff policy b. It will retry on temporary network errors and
// upon receiving specific temporary HTTP errors. It will use the
// context associated with the HTTP request object.
func RetryHTTPDo(client *http.Client, req *http.Request, b backoff.BackOff) (*http.Response, error) {
var resp *http.Response
op := func() error {
// Clear up previous response if set.
if resp != nil {
resp.Body.Close()
}
var err error
resp, err = client.Do(req)
if err == nil && isStatusTemporary(resp.StatusCode) {
resp.Body.Close()
return errHTTPBackOff
}
return err
}
err := Retry(op, backoff.WithContext(b, req.Context()))
return resp, err
}
package clientutil
import (
"crypto/tls"
common "git.autistici.org/ai3/go-common"
)
// TLSClientConfig defines the TLS parameters for a client connection
// that should use a client X509 certificate for authentication.
type TLSClientConfig struct {
Cert string `yaml:"cert"`
Key string `yaml:"key"`
CA string `yaml:"ca"`
}
// TLSConfig returns a tls.Config object with the current configuration.
func (c *TLSClientConfig) TLSConfig() (*tls.Config, error) {
cert, err := tls.LoadX509KeyPair(c.Cert, c.Key)
if err != nil {
return nil, err
}
tlsConf := &tls.Config{
Certificates: []tls.Certificate{cert},
}
if c.CA != "" {
cas, err := common.LoadCA(c.CA)
if err != nil {
return nil, err
}
tlsConf.RootCAs = cas
}
tlsConf.BuildNameToCertificate()
return tlsConf, nil
}
package clientutil
import (
"context"
"crypto/tls"
"errors"
"log"
"net"
"net/http"
"sync"
"time"
)
var errAllBackendsFailed = errors.New("all backends failed")
type dnsResolver struct{}
func (r *dnsResolver) ResolveIPs(hosts []string) []string {
var resolved []string
for _, hostport := range hosts {
host, port, err := net.SplitHostPort(hostport)
if err != nil {
log.Printf("error parsing %s: %v", hostport, err)
continue
}
hostIPs, err := net.LookupIP(host)
if err != nil {
log.Printf("error resolving %s: %v", host, err)
continue
}
for _, ip := range hostIPs {
resolved = append(resolved, net.JoinHostPort(ip.String(), port))
}
}
return resolved
}
var defaultResolver = &dnsResolver{}
type resolver interface {
ResolveIPs([]string) []string
}
// Balancer for HTTP connections. It will round-robin across available
// backends, trying to avoid ones that are erroring out, until one
// succeeds or they all fail.
//
// This object should not be used for load balancing of individual
// HTTP requests: once a new connection is established, requests will
// be sent over it until it errors out. It's meant to provide a
// *reliable* connection to a set of equivalent backends for HA
// purposes.
type balancer struct {
hosts []string
resolver resolver
stop chan bool
// List of currently valid (or untested) backends, and ones
// that errored out at least once.
mx sync.Mutex
addrs []string
ok map[string]bool
}
var backendUpdateInterval = 60 * time.Second
// Periodically update the list of available backends.
func (b *balancer) updateProc() {
tick := time.NewTicker(backendUpdateInterval)
for {
select {
case <-b.stop:
return
case <-tick.C:
resolved := b.resolver.ResolveIPs(b.hosts)
if len(resolved) > 0 {
b.mx.Lock()
b.addrs = resolved
b.mx.Unlock()
}
}
}
}
// Returns a list of all available backends, split into "good ones"
// (no errors seen since last successful connection) and "bad ones".
func (b *balancer) getBackends() ([]string, []string) {
b.mx.Lock()
defer b.mx.Unlock()
var good, bad []string
for _, addr := range b.addrs {
if ok := b.ok[addr]; ok {
good = append(good, addr)
} else {
bad = append(bad, addr)
}
}
return good, bad
}
func (b *balancer) notify(addr string, ok bool) {
b.mx.Lock()
b.ok[addr] = ok
b.mx.Unlock()
}
func netDialContext(ctx context.Context, network, addr string) (net.Conn, error) {
timeout := 30 * time.Second
// Go < 1.9 does not have net.DialContext, reimplement it in
// terms of net.DialTimeout.
if deadline, ok := ctx.Deadline(); ok {
timeout = time.Until(deadline)
}
return net.DialTimeout(network, addr, timeout)
}
func (b *balancer) dial(ctx context.Context, network, addr string) (net.Conn, error) {
// Start by attempting a connection on 'good' targets.
good, bad := b.getBackends()
for _, addr := range good {
// Go < 1.9 does not have DialContext, deal with it
conn, err := netDialContext(ctx, network, addr)
if err == nil {
return conn, nil
} else if err == context.Canceled {
// A timeout might be bad, set the error bit
// on the connection.
b.notify(addr, false)
return nil, err
}
b.notify(addr, false)
}
for _, addr := range bad {
conn, err := netDialContext(ctx, network, addr)
if err == nil {
b.notify(addr, true)
return conn, nil
} else if err == context.Canceled {
return nil, err
}
}
return nil, errAllBackendsFailed
}
// NewTransport returns a suitably configured http.RoundTripper that
// talks to a specific backend service. It performs discovery of
// available backends via DNS (using A or AAAA record lookups), tries
// to route traffic away from faulty backends.
//
// It will periodically attempt to rediscover new backends.
func NewTransport(backends []string, tlsConf *tls.Config, resolver resolver) http.RoundTripper {
if resolver == nil {
resolver = defaultResolver
}
addrs := resolver.ResolveIPs(backends)
b := &balancer{
hosts: backends,
resolver: resolver,
addrs: addrs,
ok: make(map[string]bool),
}
go b.updateProc()
return &http.Transport{
DialContext: b.dial,
TLSClientConfig: tlsConf,
}
}
......@@ -7,11 +7,30 @@ import (
"net/url"
"time"
"git.autistici.org/ai3/go-common/clientutil"
"github.com/cenkalti/backoff"
"gopkg.in/ldap.v2"
)
// Parameters that define the exponential backoff algorithm used.
var (
ExponentialBackOffInitialInterval = 100 * time.Millisecond
ExponentialBackOffMultiplier = 1.4142
)
// newExponentialBackOff creates a backoff.ExponentialBackOff object
// with our own default values.
func newExponentialBackOff() *backoff.ExponentialBackOff {
b := backoff.NewExponentialBackOff()
b.InitialInterval = ExponentialBackOffInitialInterval
b.Multiplier = ExponentialBackOffMultiplier
// Set MaxElapsedTime to 0 because we expect the overall
// timeout to be dictated by the request Context.
b.MaxElapsedTime = 0
return b
}
// ConnectionPool provides a goroutine-safe pool of long-lived LDAP
// connections that will reconnect on errors.
type ConnectionPool struct {
......@@ -129,14 +148,14 @@ func NewConnectionPool(uri, bindDN, bindPw string, cacheSize int) (*ConnectionPo
}
func (p *ConnectionPool) doRequest(ctx context.Context, fn func(*ldap.Conn) error) error {
return clientutil.Retry(func() error {
return backoff.Retry(func() error {
conn, err := p.Get(ctx)
if err != nil {
// Here conn is nil, so we don't need to Release it.
if isTemporaryLDAPError(err) {
return clientutil.TempError(err)
return err
}
return err
return backoff.Permanent(err)
}
if deadline, ok := ctx.Deadline(); ok {
......@@ -144,13 +163,12 @@ func (p *ConnectionPool) doRequest(ctx context.Context, fn func(*ldap.Conn) erro
}
err = fn(conn)
if err != nil && isTemporaryLDAPError(err) {
p.Release(conn, err)
return clientutil.TempError(err)
}
p.Release(conn, err)
if err != nil && !isTemporaryLDAPError(err) {
err = backoff.Permanent(err)
}
return err
}, backoff.WithContext(clientutil.NewExponentialBackOff(), ctx))
}, backoff.WithContext(newExponentialBackOff(), ctx))
}
// Search performs the given search request. It will retry the request
......
......@@ -24,7 +24,7 @@ See https://godoc.org/github.com/cenkalti/backoff#pkg-examples
[coveralls]: https://coveralls.io/github/cenkalti/backoff?branch=master
[coveralls image]: https://coveralls.io/repos/github/cenkalti/backoff/badge.svg?branch=master
[google-http-java-client]: https://github.com/google/google-http-java-client
[google-http-java-client]: https://github.com/google/google-http-java-client/blob/da1aa993e90285ec18579f1553339b00e19b3ab5/google-http-client/src/main/java/com/google/api/client/util/ExponentialBackOff.java
[exponential backoff wiki]: http://en.wikipedia.org/wiki/Exponential_backoff
[advanced example]: https://godoc.org/github.com/cenkalti/backoff#example_
package backoff
import (
"context"
"time"
"golang.org/x/net/context"
)
// BackOffContext is a backoff policy that stops retrying after the context
......
......@@ -15,7 +15,6 @@ type Notify func(error, time.Duration)
// Retry the operation o until it does not return error or BackOff stops.
// o is guaranteed to be run at least once.
// It is the caller's responsibility to reset b after Retry returns.
//
// If o returns a *PermanentError, the operation is not retried, and the
// wrapped error is returned.
......
package backoff
import (
"runtime"
"sync"
"time"
)
......@@ -34,7 +33,6 @@ func NewTicker(b BackOff) *Ticker {
}
t.b.Reset()
go t.run()
runtime.SetFinalizer(t, (*Ticker).Stop)
return t
}
......
......@@ -3,13 +3,13 @@ package backoff
import "time"
/*
WithMaxTries creates a wrapper around another BackOff, which will
WithMaxRetries creates a wrapper around another BackOff, which will
return Stop if NextBackOff() has been called too many times since
the last time Reset() was called
Note: Implementation is not thread-safe.
*/
func WithMaxTries(b BackOff, max uint64) BackOff {
func WithMaxRetries(b BackOff, max uint64) BackOff {
return &backOffTries{delegate: b, maxTries: max}
}
......
......@@ -6,7 +6,10 @@
// https://ed25519.cr.yp.to/.
//
// These functions are also compatible with the “Ed25519” function defined in
// https://tools.ietf.org/html/draft-irtf-cfrg-eddsa-05.
// RFC 8032. However, unlike RFC 8032's formulation, this package's private key
// representation includes a public key suffix to make multiple signing
// operations with the same key more efficient. This package refers to the RFC
// 8032 private key as the “seed”.
package ed25519
// This code is a port of the public domain, “ref10” implementation of ed25519
......@@ -31,6 +34,8 @@ const (
PrivateKeySize = 64
// SignatureSize is the size, in bytes, of signatures generated and verified by this package.
SignatureSize = 64
// SeedSize is the size, in bytes, of private key seeds. These are the private key representations used by RFC 8032.
SeedSize = 32
)
// PublicKey is the type of Ed25519 public keys.
......@@ -46,6 +51,15 @@ func (priv PrivateKey) Public() crypto.PublicKey {