Commit 2746cdca authored by ale's avatar ale
Browse files

Add vendor deps

parent 8635e168
package clientutil
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
)
// DoJSONHTTPRequest makes an HTTP POST request to the specified uri,
// with a JSON-encoded request body. It will attempt to decode the
// response body as JSON.
func DoJSONHTTPRequest(ctx context.Context, client *http.Client, uri string, req, resp interface{}) error {
data, err := json.Marshal(req)
if err != nil {
return err
}
httpReq, err := http.NewRequest("POST", uri, bytes.NewReader(data))
if err != nil {
return err
}
httpReq.Header.Set("Content-Type", "application/json")
httpReq = httpReq.WithContext(ctx)
httpResp, err := RetryHTTPDo(client, httpReq, NewExponentialBackOff())
if err != nil {
return err
}
defer httpResp.Body.Close()
if httpResp.StatusCode != 200 {
return fmt.Errorf("HTTP status %d", httpResp.StatusCode)
}
if httpResp.Header.Get("Content-Type") != "application/json" {
return errors.New("not a JSON response")
}
if resp == nil {
return nil
}
return json.NewDecoder(httpResp.Body).Decode(resp)
}
package clientutil
import (
"errors"
"net/http"
"time"
"github.com/cenkalti/backoff"
)
// NewExponentialBackOff creates a backoff.ExponentialBackOff object
// with our own default values.
func NewExponentialBackOff() *backoff.ExponentialBackOff {
b := backoff.NewExponentialBackOff()
b.InitialInterval = 100 * time.Millisecond
//b.Multiplier = 1.4142
return b
}
// A temporary (retriable) error is something that has a Temporary method.
type tempError interface {
Temporary() bool
}
type tempErrorWrapper struct {
error
}
func (t tempErrorWrapper) Temporary() bool { return true }
// TempError makes a temporary (retriable) error out of a normal error.
func TempError(err error) error {
return tempErrorWrapper{err}
}
// Retry operation op until it succeeds according to the backoff
// policy b.
//
// Note that this function reverses the error semantics of
// backoff.Operation: all errors are permanent unless explicitly
// marked as temporary (i.e. they have a Temporary() method that
// returns true). This is to better align with the errors returned by
// the net package.
func Retry(op backoff.Operation, b backoff.BackOff) error {
innerOp := func() error {
err := op()
if err == nil {
return err
}
if tmpErr, ok := err.(tempError); ok && tmpErr.Temporary() {
return err
}
return backoff.Permanent(err)
}
return backoff.Retry(innerOp, b)
}
var errHTTPBackOff = TempError(errors.New("temporary http error"))
func isStatusTemporary(code int) bool {
switch code {
case http.StatusTooManyRequests, http.StatusBadGateway, http.StatusServiceUnavailable, http.StatusGatewayTimeout:
return true
default:
return false
}
}
// RetryHTTPDo retries an HTTP request until it succeeds, according to
// the backoff policy b. It will retry on temporary network errors and
// upon receiving specific temporary HTTP errors. It will use the
// context associated with the HTTP request object.
func RetryHTTPDo(client *http.Client, req *http.Request, b backoff.BackOff) (*http.Response, error) {
var resp *http.Response
op := func() error {
// Clear up previous response if set.
if resp != nil {
resp.Body.Close()
}
var err error
resp, err = client.Do(req)
if err == nil && isStatusTemporary(resp.StatusCode) {
resp.Body.Close()
return errHTTPBackOff
}
return err
}
err := Retry(op, backoff.WithContext(b, req.Context()))
return resp, err
}
package clientutil
import (
"crypto/tls"
common "git.autistici.org/ai3/go-common"
)
// TLSClientConfig defines the TLS parameters for a client connection
// that should use a client X509 certificate for authentication.
type TLSClientConfig struct {
Cert string `yaml:"cert"`
Key string `yaml:"key"`
CA string `yaml:"ca"`
}
// TLSConfig returns a tls.Config object with the current configuration.
func (c *TLSClientConfig) TLSConfig() (*tls.Config, error) {
cert, err := tls.LoadX509KeyPair(c.Cert, c.Key)
if err != nil {
return nil, err
}
tlsConf := &tls.Config{
Certificates: []tls.Certificate{cert},
}
if c.CA != "" {
cas, err := common.LoadCA(c.CA)
if err != nil {
return nil, err
}
tlsConf.RootCAs = cas
}
tlsConf.BuildNameToCertificate()
return tlsConf, nil
}
package clientutil
import (
"context"
"crypto/tls"
"errors"
"log"
"net"
"net/http"
"sync"
"time"
)
var errAllBackendsFailed = errors.New("all backends failed")
type dnsResolver struct{}
func (r *dnsResolver) ResolveIPs(hosts []string) []string {
var resolved []string
for _, hostport := range hosts {
host, port, err := net.SplitHostPort(hostport)
if err != nil {
log.Printf("error parsing %s: %v", hostport, err)
continue
}
hostIPs, err := net.LookupIP(host)
if err != nil {
log.Printf("error resolving %s: %v", host, err)
continue
}
for _, ip := range hostIPs {
resolved = append(resolved, net.JoinHostPort(ip.String(), port))
}
}
return resolved
}
var defaultResolver = &dnsResolver{}
type resolver interface {
ResolveIPs([]string) []string
}
// Balancer for HTTP connections. It will round-robin across available
// backends, trying to avoid ones that are erroring out, until one
// succeeds or they all fail.
//
// This object should not be used for load balancing of individual
// HTTP requests: once a new connection is established, requests will
// be sent over it until it errors out. It's meant to provide a
// *reliable* connection to a set of equivalent backends for HA
// purposes.
type balancer struct {
hosts []string
resolver resolver
stop chan bool
// List of currently valid (or untested) backends, and ones
// that errored out at least once.
mx sync.Mutex
addrs []string
ok map[string]bool
}
var backendUpdateInterval = 60 * time.Second
// Periodically update the list of available backends.
func (b *balancer) updateProc() {
tick := time.NewTicker(backendUpdateInterval)
for {
select {
case <-b.stop:
return
case <-tick.C:
resolved := b.resolver.ResolveIPs(b.hosts)
if len(resolved) > 0 {
b.mx.Lock()
b.addrs = resolved
b.mx.Unlock()
}
}
}
}
// Returns a list of all available backends, split into "good ones"
// (no errors seen since last successful connection) and "bad ones".
func (b *balancer) getBackends() ([]string, []string) {
b.mx.Lock()
defer b.mx.Unlock()
var good, bad []string
for _, addr := range b.addrs {
if ok := b.ok[addr]; ok {
good = append(good, addr)
} else {
bad = append(bad, addr)
}
}
return good, bad
}
func (b *balancer) notify(addr string, ok bool) {
b.mx.Lock()
b.ok[addr] = ok
b.mx.Unlock()
}
func netDialContext(ctx context.Context, network, addr string) (net.Conn, error) {
timeout := 30 * time.Second
// Go < 1.9 does not have net.DialContext, reimplement it in
// terms of net.DialTimeout.
if deadline, ok := ctx.Deadline(); ok {
timeout = time.Until(deadline)
}
return net.DialTimeout(network, addr, timeout)
}
func (b *balancer) dial(ctx context.Context, network, addr string) (net.Conn, error) {
// Start by attempting a connection on 'good' targets.
good, bad := b.getBackends()
for _, addr := range good {
// Go < 1.9 does not have DialContext, deal with it
conn, err := netDialContext(ctx, network, addr)
if err == nil {
return conn, nil
} else if err == context.Canceled {
// A timeout might be bad, set the error bit
// on the connection.
b.notify(addr, false)
return nil, err
}
b.notify(addr, false)
}
for _, addr := range bad {
conn, err := netDialContext(ctx, network, addr)
if err == nil {
b.notify(addr, true)
return conn, nil
} else if err == context.Canceled {
return nil, err
}
}
return nil, errAllBackendsFailed
}
// NewTransport returns a suitably configured http.RoundTripper that
// talks to a specific backend service. It performs discovery of
// available backends via DNS (using A or AAAA record lookups), tries
// to route traffic away from faulty backends.
//
// It will periodically attempt to rediscover new backends.
func NewTransport(backends []string, tlsConf *tls.Config, resolver resolver) http.RoundTripper {
if resolver == nil {
resolver = defaultResolver
}
addrs := resolver.ResolveIPs(backends)
b := &balancer{
hosts: backends,
resolver: resolver,
addrs: addrs,
ok: make(map[string]bool),
}
go b.updateProc()
return &http.Transport{
DialContext: b.dial,
TLSClientConfig: tlsConf,
}
}
package common
import (
"crypto/x509"
"io/ioutil"
)
// LoadCA loads a file containing CA certificates into a x509.CertPool.
func LoadCA(path string) (*x509.CertPool, error) {
data, err := ioutil.ReadFile(path)
if err != nil {
return nil, err
}
cas := x509.NewCertPool()
cas.AppendCertsFromPEM(data)
return cas, nil
}
package serverutil
import (
"context"
"crypto/tls"
"log"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
var gracefulShutdownTimeout = 3 * time.Second
// ServerConfig stores common HTTP/HTTPS server configuration parameters.
type ServerConfig struct {
TLS *TLSServerConfig `yaml:"tls"`
MaxInflightRequests int `yaml:"max_inflight_requests"`
}
// Serve HTTP(S) content on the specified address. If serverConfig is
// not nil, enable HTTPS and TLS authentication.
//
// This function will return an error if there are problems creating
// the listener, otherwise it will handle graceful termination on
// SIGINT or SIGTERM and return nil.
func Serve(h http.Handler, serverConfig *ServerConfig, addr string) (err error) {
var tlsConfig *tls.Config
if serverConfig != nil {
if serverConfig.TLS != nil {
tlsConfig, err = serverConfig.TLS.TLSConfig()
if err != nil {
return err
}
h, err = serverConfig.TLS.TLSAuthWrapper(h)
if err != nil {
return err
}
}
if serverConfig.MaxInflightRequests > 0 {
h = newLoadSheddingWrapper(serverConfig.MaxInflightRequests, h)
}
}
// These are not meant to be external-facing servers, so we
// can be generous with the timeouts to keep the number of
// reconnections low.
srv := &http.Server{
Addr: addr,
Handler: instrumentHandler(h),
ReadTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second,
IdleTimeout: 600 * time.Second,
TLSConfig: tlsConfig,
}
// Install a signal handler for gentle process termination.
done := make(chan struct{})
sigCh := make(chan os.Signal, 1)
go func() {
<-sigCh
log.Printf("exiting")
// Gracefully terminate for 3 seconds max, then shut
// down remaining clients.
ctx, cancel := context.WithTimeout(context.Background(), gracefulShutdownTimeout)
defer cancel()
if err := srv.Shutdown(ctx); err == context.Canceled {
if err := srv.Close(); err != nil {
log.Printf("error terminating server: %v", err)
}
}
close(done)
}()
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
if err := srv.ListenAndServe(); err != http.ErrServerClosed {
return err
}
<-done
return nil
}
func instrumentHandler(h http.Handler) http.Handler {
root := http.NewServeMux()
root.Handle("/metrics", promhttp.Handler())
root.Handle("/", h)
return promhttp.InstrumentHandlerInFlight(inFlightRequests,
promhttp.InstrumentHandlerCounter(totalRequests, root))
}
// HTTP-related metrics.
var (
totalRequests = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "total_requests",
Help: "Total number of requests.",
},
[]string{"code"},
)
inFlightRequests = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "inflight_requests",
Help: "Number of in-flight requests.",
},
)
)
func init() {
prometheus.MustRegister(totalRequests, inFlightRequests)
}
package serverutil
import (
"encoding/json"
"net/http"
)
// DecodeJSONRequest decodes a JSON object from an incoming HTTP POST
// request and return true when successful. In case of errors, it will
// write an error response to w and return false.
func DecodeJSONRequest(w http.ResponseWriter, r *http.Request, obj interface{}) bool {
if r.Method != "POST" {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return false
}
if r.Header.Get("Content-Type") != "application/json" {
http.Error(w, "Need JSON request", http.StatusBadRequest)
return false
}
if err := json.NewDecoder(r.Body).Decode(obj); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return false
}
return true
}
// EncodeJSONResponse writes an application/json response to w.
func EncodeJSONResponse(w http.ResponseWriter, obj interface{}) {
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Pragma", "no-cache")
w.Header().Set("Cache-Control", "no-store")
w.Header().Set("Expires", "-1")
w.Header().Set("X-Content-Type-Options", "nosniff")
_ = json.NewEncoder(w).Encode(obj)
}
package serverutil
import (
"net/http"
"sync/atomic"
"github.com/prometheus/client_golang/prometheus"
)
type loadSheddingWrapper struct {
limit, inflight int32
h http.Handler
}
func newLoadSheddingWrapper(limit int, h http.Handler) *loadSheddingWrapper {
return &loadSheddingWrapper{limit: int32(limit), h: h}
}
func (l *loadSheddingWrapper) ServeHTTP(w http.ResponseWriter, r *http.Request) {
inflight := atomic.AddInt32(&l.inflight, 1)
defer atomic.AddInt32(&l.inflight, -1)
if inflight > l.limit {
throttledRequests.Inc()
w.Header().Set("Connection", "close")
http.Error(w, "Throttled", http.StatusTooManyRequests)
return
}
allowedRequests.Inc()
l.h.ServeHTTP(w, r)
}
var (
throttledRequests = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "ls_throttled_requests",
Help: "Requests throttled by the load shedding wrapper.",
},
)
allowedRequests = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "ls_allowed_requests",
Help: "Requests allowed by the load shedding wrapper.",
},
)
)
func init() {
prometheus.MustRegister(throttledRequests, allowedRequests)
}
package serverutil
import (
"crypto/tls"
"net/http"
"regexp"
common "git.autistici.org/ai3/go-common"
)
// TLSAuthACL describes a single access control entry. Path and
// CommonName are anchored regular expressions (they must match the