Skip to content
Snippets Groups Projects
Commit eb63809a authored by ale's avatar ale
Browse files

Update ai3/go-common

parent 0cdc8fdc
No related branches found
No related tags found
No related merge requests found
ai3/go-common
===
Common code for ai3 services and tools.
A quick overview of the contents:
* [client](clientutil/) and [server](serverutil/) HTTP-based
"RPC" implementation, just JSON POST requests but with retries,
backoff, timeouts, tracing, etc.
* [server implementation of a generic line-based protocol over a UNIX
socket](unix/).
* a [LDAP connection pool](ldap/).
* utilities to [serialize composite data types](ldap/compositetypes/)
used in our LDAP database.
* a [password hashing library](pwhash/) that uses fancy advanced
crypto by default but is also backwards compatible with old
libc crypto.
* utilities to [manage encryption keys](userenckey/), themselves
encrypted with a password and a KDF.
......@@ -98,40 +98,69 @@ func newBalancedBackend(config *BackendConfig, resolver resolver) (*balancedBack
// with a JSON-encoded request body. It will attempt to decode the
// response body as JSON.
func (b *balancedBackend) Call(ctx context.Context, shard, path string, req, resp interface{}) error {
// Serialize the request body.
data, err := json.Marshal(req)
if err != nil {
return err
}
var tg targetGenerator = b.backendTracker
if b.sharded && shard != "" {
tg = newShardedGenerator(shard, b.baseURI.Host, b.resolver)
// Create the target sequence for this call. If there are multiple
// targets, reduce the timeout on each individual call accordingly to
// accomodate eventual failover.
seq, err := b.makeSequence(shard)
if err != nil {
return err
}
innerTimeout := 1 * time.Hour
if deadline, ok := ctx.Deadline(); ok {
innerTimeout = time.Until(deadline) / time.Duration(seq.Len())
}
seq := newSequence(tg)
b.log.Printf("%016x: initialized", seq.ID())
var httpResp *http.Response
err = backoff.Retry(func() error {
// Call the backends in the sequence until one succeeds, with an
// exponential backoff policy controlled by the outer Context.
return backoff.Retry(func() error {
req, rerr := b.newJSONRequest(path, shard, data)
if rerr != nil {
return rerr
}
httpResp, rerr = b.do(ctx, seq, req)
return rerr
innerCtx, cancel := context.WithTimeout(ctx, innerTimeout)
defer cancel()
// When do() returns successfully, we already know that the
// response had an HTTP status of 200.
httpResp, rerr := b.do(innerCtx, seq, req)
if rerr != nil {
return rerr
}
defer httpResp.Body.Close() // nolint
// Decode the response, unless the 'resp' output is nil.
if httpResp.Header.Get("Content-Type") != "application/json" {
return errors.New("not a JSON response")
}
if resp == nil {
return nil
}
return json.NewDecoder(httpResp.Body).Decode(resp)
}, backoff.WithContext(newExponentialBackOff(), ctx))
if err != nil {
return err
}
defer httpResp.Body.Close() // nolint
}
if httpResp.Header.Get("Content-Type") != "application/json" {
return errors.New("not a JSON response")
// Initialize a new target sequence.
func (b *balancedBackend) makeSequence(shard string) (*sequence, error) {
var tg targetGenerator = b.backendTracker
if b.sharded {
if shard == "" {
return nil, fmt.Errorf("call without shard to sharded service %s", b.baseURI.String())
}
tg = newShardedGenerator(shard, b.baseURI.Host, b.resolver)
}
if resp == nil {
return nil
seq := newSequence(tg)
if seq.Len() == 0 {
return nil, errNoTargets
}
return json.NewDecoder(httpResp.Body).Decode(resp)
b.log.Printf("%016x: initialized", seq.ID())
return seq, nil
}
// Return the URI to be used for the request. This is used both in the
......@@ -210,6 +239,8 @@ func newSequence(tg targetGenerator) *sequence {
func (s *sequence) ID() uint64 { return s.id }
func (s *sequence) Len() int { return len(s.targets) }
func (s *sequence) reloadTargets() {
targets := s.tg.getTargets()
if len(targets) > 0 {
......
......@@ -7,6 +7,8 @@ import (
"net/http"
"sync"
"time"
"git.autistici.org/ai3/go-common/tracing"
)
// The transportCache is just a cache of http transports, each
......@@ -29,12 +31,12 @@ func newTransportCache(tlsConfig *tls.Config) *transportCache {
}
func (m *transportCache) newTransport(addr string) http.RoundTripper {
return &http.Transport{
return tracing.WrapTransport(&http.Transport{
TLSClientConfig: m.tlsConfig,
DialContext: func(ctx context.Context, network, _ string) (net.Conn, error) {
return netDialContext(ctx, network, addr)
},
}
})
}
func (m *transportCache) getTransport(addr string) http.RoundTripper {
......
......@@ -3,16 +3,18 @@ package serverutil
import (
"context"
"crypto/tls"
"fmt"
"io"
"log"
"net"
"net/http"
"net/http/pprof"
_ "net/http/pprof"
"os"
"os/signal"
"syscall"
"time"
"git.autistici.org/ai3/go-common/tracing"
"github.com/coreos/go-systemd/daemon"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
......@@ -77,6 +79,10 @@ func (config *ServerConfig) buildHTTPServer(h http.Handler) (*http.Server, error
// the listener, otherwise it will handle graceful termination on
// SIGINT or SIGTERM and return nil.
func Serve(h http.Handler, config *ServerConfig, addr string) error {
// Wrap with tracing handler (exclude metrics and other
// debugging endpoints).
h = tracing.WrapHandler(h, guessEndpointName(addr))
// Create the HTTP server.
srv, err := config.buildHTTPServer(h)
if err != nil {
......@@ -139,8 +145,10 @@ func defaultHandler(h http.Handler) http.Handler {
// Add an endpoint to serve Prometheus metrics.
root.Handle("/metrics", promhttp.Handler())
// Add the net/http/pprof debug handlers.
root.Handle("/debug/pprof/", pprof.Handler(""))
// Let the default net/http handler deal with /debug/
// URLs. Packages such as net/http/pprof register their
// handlers there in ways that aren't reproducible.
root.Handle("/debug/", http.DefaultServeMux)
// Forward everything else to the main handler, adding
// Prometheus instrumentation (requests to /metrics and
......@@ -151,6 +159,18 @@ func defaultHandler(h http.Handler) http.Handler {
return root
}
func guessEndpointName(addr string) string {
_, port, err := net.SplitHostPort(addr)
if err != nil {
return addr
}
host, err := os.Hostname()
if err != nil {
return addr
}
return fmt.Sprintf("%s:%s", host, port)
}
// HTTP-related metrics.
var (
// Since we instrument the root HTTP handler, we don't really
......
......@@ -3,22 +3,22 @@
"ignore": "test",
"package": [
{
"checksumSHA1": "pLvPnUablirQucyALgrso9hLG4E=",
"checksumSHA1": "oUOxU+Tw1/jOzWVP05HuGvVSC/A=",
"path": "git.autistici.org/ai3/go-common",
"revision": "6916834dec86e761a3091c9628cbff9b6c389867",
"revisionTime": "2018-10-29T11:03:54Z"
"revision": "54f0ac4c46184ae44486a31ca2705076abcc5321",
"revisionTime": "2019-06-30T08:30:15Z"
},
{
"checksumSHA1": "Xd4ClmFykFMOg8b2ZFXimSS3Uj0=",
"checksumSHA1": "kJwm6y9JXhybelO2zUl7UbzIdP0=",
"path": "git.autistici.org/ai3/go-common/clientutil",
"revision": "6916834dec86e761a3091c9628cbff9b6c389867",
"revisionTime": "2018-10-29T11:03:54Z"
"revision": "54f0ac4c46184ae44486a31ca2705076abcc5321",
"revisionTime": "2019-06-30T08:30:15Z"
},
{
"checksumSHA1": "RyFydcBJvLBevfsriijLqHtZ0hs=",
"checksumSHA1": "TKGUNmKxj7KH3qhwiCh/6quUnwc=",
"path": "git.autistici.org/ai3/go-common/serverutil",
"revision": "6916834dec86e761a3091c9628cbff9b6c389867",
"revisionTime": "2018-10-29T11:03:54Z"
"revision": "54f0ac4c46184ae44486a31ca2705076abcc5321",
"revisionTime": "2019-06-30T08:30:15Z"
},
{
"checksumSHA1": "FRxoT4jwgKDffIm5RwpFWjVVilc=",
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment