Skip to content
Snippets Groups Projects
Commit a1a8e639 authored by ale's avatar ale
Browse files

Initial commit

parents
No related branches found
No related tags found
No related merge requests found
package clientutil
import (
"errors"
"net"
"net/http"
"time"
"github.com/cenkalti/backoff"
)
func NewExponentialBackOff() *backoff.ExponentialBackOff {
b := backoff.NewExponentialBackOff()
b.InitialInterval = 100 * time.Millisecond
//b.Multiplier = 1.4142
return b
}
func Retry(op backoff.Operation, b backoff.BackOff) error {
innerOp := func() error {
err := op()
if err == nil {
return err
}
if netErr, ok := err.(net.Error); ok && netErr.Temporary() {
return err
}
return backoff.Permanent(err)
}
return backoff.Retry(innerOp, b)
}
var errHTTPBackOff = errors.New("http status 503")
func RetryHTTPDo(client *http.Client, req *http.Request, b backoff.BackOff) (*http.Response, error) {
var resp *http.Response
op := func() error {
// Clear up previous response if set.
if resp != nil {
resp.Body.Close()
}
var err error
resp, err = client.Do(req)
if err == nil && resp.StatusCode == 503 {
resp.Body.Close()
return errHTTPBackOff
}
return err
}
err := Retry(op, b)
return resp, err
}
package clientutil
import (
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
"net/url"
"testing"
)
func testHTTPServer() *httptest.Server {
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/plain")
io.WriteString(w, "OK")
}))
}
type testBackends struct {
servers []*httptest.Server
addrs []string
}
func newTestBackends(n int) *testBackends {
b := new(testBackends)
for i := 0; i < n; i++ {
s := testHTTPServer()
u, _ := url.Parse(s.URL)
b.servers = append(b.servers, s)
b.addrs = append(b.addrs, u.Host)
}
return b
}
func (b *testBackends) ResolveIPs(_ []string) []string {
return b.addrs
}
func (b *testBackends) stop(i int) {
b.servers[i].Close()
}
func (b *testBackends) close() {
for _, s := range b.servers {
s.Close()
}
}
func doRequests(backends *testBackends, u string, n int) (int, int) {
c := &http.Client{
Transport: NewTransport([]string{"backend"}, nil, backends),
}
b := NewExponentialBackOff()
var errs, oks int
for i := 0; i < n; i++ {
req, _ := http.NewRequest("GET", u, nil)
resp, err := RetryHTTPDo(c, req, b)
if err != nil {
errs++
continue
}
ioutil.ReadAll(resp.Body)
resp.Body.Close()
oks++
}
return oks, errs
}
func TestRetryAndTransport(t *testing.T) {
b := newTestBackends(3)
defer b.close()
oks, errs := doRequests(b, "http://backend/", 100)
if errs > 0 {
t.Fatalf("errs=%d", errs)
}
if oks == 0 {
t.Fatal("oks=0")
}
b.stop(0)
oks, errs = doRequests(b, "http://backend/", 100)
if errs > 0 {
t.Fatalf("errs=%d", errs)
}
if oks == 0 {
t.Fatal("oks=0")
}
}
package clientutil
import (
"context"
"crypto/tls"
"errors"
"log"
"net"
"net/http"
"sync"
"time"
)
var errAllBackendsFailed = errors.New("all backends failed")
type dnsResolver struct{}
func (r *dnsResolver) ResolveIPs(hosts []string) []string {
var resolved []string
for _, hostport := range hosts {
host, port, err := net.SplitHostPort(hostport)
if err != nil {
log.Printf("error parsing %s: %v", hostport, err)
continue
}
hostIPs, err := net.LookupIP(host)
if err != nil {
log.Printf("error resolving %s: %v", host, err)
continue
}
for _, ip := range hostIPs {
resolved = append(resolved, net.JoinHostPort(ip.String(), port))
}
}
return resolved
}
var defaultResolver = &dnsResolver{}
type resolver interface {
ResolveIPs([]string) []string
}
// Balancer for HTTP connections. It will round-robin across available
// backends, trying to avoid ones that are erroring out, until one
// succeeds or they all fail.
//
// This object should not be used for load balancing of individual
// HTTP requests: once a new connection is established, requests will
// be sent over it until it errors out. It's meant to provide a
// *reliable* connection to a set of equivalent backends for HA
// purposes.
type balancer struct {
hosts []string
resolver resolver
stop chan bool
// List of currently valid (or untested) backends, and ones
// that errored out at least once.
mx sync.Mutex
addrs []string
ok map[string]bool
}
var backendUpdateInterval = 60 * time.Second
// Periodically update the list of available backends.
func (b *balancer) updateProc() {
tick := time.NewTicker(backendUpdateInterval)
for {
select {
case <-b.stop:
return
case <-tick.C:
resolved := b.resolver.ResolveIPs(b.hosts)
if len(resolved) > 0 {
b.mx.Lock()
b.addrs = resolved
b.mx.Unlock()
}
}
}
}
// Returns a list of all available backends, split into "good ones"
// (no errors seen since last successful connection) and "bad ones".
func (b *balancer) getBackends() ([]string, []string) {
b.mx.Lock()
defer b.mx.Unlock()
var good, bad []string
for _, addr := range b.addrs {
if ok := b.ok[addr]; ok {
good = append(good, addr)
} else {
bad = append(bad, addr)
}
}
return good, bad
}
func (b *balancer) notify(addr string, ok bool) {
b.mx.Lock()
b.ok[addr] = ok
b.mx.Unlock()
}
func netDialContext(ctx context.Context, network, addr string) (net.Conn, error) {
timeout := 30 * time.Second
// Go < 1.9 does not have net.DialContext, reimplement it in
// terms of net.DialTimeout.
if deadline, ok := ctx.Deadline(); ok {
timeout = time.Until(deadline)
}
return net.DialTimeout(network, addr, timeout)
}
func (b *balancer) dial(ctx context.Context, network, addr string) (net.Conn, error) {
// Start by attempting a connection on 'good' targets.
good, bad := b.getBackends()
for _, addr := range good {
// Go < 1.9 does not have DialContext, deal with it
conn, err := netDialContext(ctx, network, addr)
if err == nil {
return conn, nil
} else if err == context.Canceled {
return nil, err
}
b.notify(addr, false)
}
for _, addr := range bad {
conn, err := netDialContext(ctx, network, addr)
if err == nil {
b.notify(addr, true)
return conn, nil
} else if err == context.Canceled {
return nil, err
}
}
return nil, errAllBackendsFailed
}
// NewTransport returns a suitably configured http.RoundTripper that
// talks to a specific backend service. It performs discovery of
// available backends via DNS (using A or AAAA record lookups), tries
// to route traffic away from faulty backends.
//
// It will periodically attempt to rediscover new backends.
func NewTransport(backends []string, tlsConf *tls.Config, resolver resolver) http.RoundTripper {
if resolver == nil {
resolver = defaultResolver
}
addrs := resolver.ResolveIPs(backends)
b := &balancer{
hosts: backends,
resolver: resolver,
addrs: addrs,
ok: make(map[string]bool),
}
go b.updateProc()
return &http.Transport{
DialContext: b.dial,
TLSClientConfig: tlsConf,
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment