Skip to content
Snippets Groups Projects
Commit 9f1f2da0 authored by ale's avatar ale
Browse files

Update ai3/go-common

parent fc40991c
Branches
No related tags found
No related merge requests found
ai3/go-common
===
Common code for ai3 services and tools.
A quick overview of the contents:
* [client](clientutil/) and [server](serverutil/) HTTP-based
"RPC" implementation, just JSON POST requests but with retries,
backoff, timeouts, tracing, etc.
* [server implementation of a generic line-based protocol over a UNIX
socket](unix/).
* a [LDAP connection pool](ldap/).
* utilities to [serialize composite data types](ldap/compositetypes/)
used in our LDAP database.
* a [password hashing library](pwhash/) that uses fancy advanced
crypto by default but is also backwards compatible with old
libc crypto.
* utilities to [manage encryption keys](userenckey/), themselves
encrypted with a password and a KDF.
......@@ -98,43 +98,69 @@ func newBalancedBackend(config *BackendConfig, resolver resolver) (*balancedBack
// with a JSON-encoded request body. It will attempt to decode the
// response body as JSON.
func (b *balancedBackend) Call(ctx context.Context, shard, path string, req, resp interface{}) error {
// Serialize the request body.
data, err := json.Marshal(req)
if err != nil {
return err
}
var tg targetGenerator = b.backendTracker
if b.sharded {
if shard == "" {
return fmt.Errorf("call without shard to sharded service %s", b.baseURI.String())
}
tg = newShardedGenerator(shard, b.baseURI.Host, b.resolver)
// Create the target sequence for this call. If there are multiple
// targets, reduce the timeout on each individual call accordingly to
// accomodate eventual failover.
seq, err := b.makeSequence(shard)
if err != nil {
return err
}
innerTimeout := 1 * time.Hour
if deadline, ok := ctx.Deadline(); ok {
innerTimeout = time.Until(deadline) / time.Duration(seq.Len())
}
seq := newSequence(tg)
b.log.Printf("%016x: initialized", seq.ID())
var httpResp *http.Response
err = backoff.Retry(func() error {
// Call the backends in the sequence until one succeeds, with an
// exponential backoff policy controlled by the outer Context.
return backoff.Retry(func() error {
req, rerr := b.newJSONRequest(path, shard, data)
if rerr != nil {
return rerr
}
httpResp, rerr = b.do(ctx, seq, req)
return rerr
innerCtx, cancel := context.WithTimeout(ctx, innerTimeout)
defer cancel()
// When do() returns successfully, we already know that the
// response had an HTTP status of 200.
httpResp, rerr := b.do(innerCtx, seq, req)
if rerr != nil {
return rerr
}
defer httpResp.Body.Close() // nolint
// Decode the response, unless the 'resp' output is nil.
if httpResp.Header.Get("Content-Type") != "application/json" {
return errors.New("not a JSON response")
}
if resp == nil {
return nil
}
return json.NewDecoder(httpResp.Body).Decode(resp)
}, backoff.WithContext(newExponentialBackOff(), ctx))
if err != nil {
return err
}
defer httpResp.Body.Close() // nolint
}
if httpResp.Header.Get("Content-Type") != "application/json" {
return errors.New("not a JSON response")
// Initialize a new target sequence.
func (b *balancedBackend) makeSequence(shard string) (*sequence, error) {
var tg targetGenerator = b.backendTracker
if b.sharded {
if shard == "" {
return nil, fmt.Errorf("call without shard to sharded service %s", b.baseURI.String())
}
tg = newShardedGenerator(shard, b.baseURI.Host, b.resolver)
}
if resp == nil {
return nil
seq := newSequence(tg)
if seq.Len() == 0 {
return nil, errNoTargets
}
return json.NewDecoder(httpResp.Body).Decode(resp)
b.log.Printf("%016x: initialized", seq.ID())
return seq, nil
}
// Return the URI to be used for the request. This is used both in the
......@@ -213,6 +239,8 @@ func newSequence(tg targetGenerator) *sequence {
func (s *sequence) ID() uint64 { return s.id }
func (s *sequence) Len() int { return len(s.targets) }
func (s *sequence) reloadTargets() {
targets := s.tg.getTargets()
if len(targets) > 0 {
......
......@@ -103,7 +103,8 @@ func UnmarshalEncryptedKey(s string) (*EncryptedKey, error) {
//
// The serialized format follows part of the U2F standard and just
// stores 64 bytes of the public key immediately followed by the key
// handle data, with no encoding.
// handle data, with no encoding. Note that the public key itself is a
// serialization of the elliptic curve parameters.
//
// The data in U2FRegistration is still encoded, but it can be turned
// into a usable form (github.com/tstranex/u2f.Registration) later.
......@@ -122,13 +123,13 @@ func (r *U2FRegistration) Marshal() string {
// UnmarshalU2FRegistration parses a U2FRegistration from its serialized format.
func UnmarshalU2FRegistration(s string) (*U2FRegistration, error) {
if len(s) < 64 {
if len(s) < 65 {
return nil, errors.New("badly encoded u2f registration")
}
b := []byte(s)
return &U2FRegistration{
PublicKey: b[:64],
KeyHandle: b[64:],
PublicKey: b[:65],
KeyHandle: b[65:],
}, nil
}
......
......@@ -93,6 +93,9 @@ func init() {
}
func initTracing(endpointAddr string) {
if !Enabled {
return
}
initOnce.Do(func() {
localEndpoint, err := openzipkin.NewEndpoint(getServiceName(), endpointAddr)
if err != nil {
......@@ -126,6 +129,11 @@ func initTracing(endpointAddr string) {
})
}
// Init tracing support, if not using WrapHandler.
func Init() {
initTracing("")
}
// WrapTransport optionally wraps a http.RoundTripper with OpenCensus
// tracing functionality, if it is globally enabled.
func WrapTransport(t http.RoundTripper) http.RoundTripper {
......@@ -136,7 +144,7 @@ func WrapTransport(t http.RoundTripper) http.RoundTripper {
}
// WrapHandler wraps a http.Handler with OpenCensus tracing
// functionality, if globally enabled.
// functionality, if globally enabled. Automatically calls Init().
func WrapHandler(h http.Handler, endpointAddr string) http.Handler {
if Enabled {
initTracing(endpointAddr)
......
......@@ -2,6 +2,7 @@ package unix
import (
"bufio"
"container/list"
"context"
"errors"
"io"
......@@ -32,6 +33,11 @@ type SocketServer struct {
closing atomic.Value
wg sync.WaitGroup
handler Handler
// Keep track of active connections so we can shut them down
// on Close.
connMx sync.Mutex
conns list.List
}
func newServer(l net.Listener, lock *flock.Flock, h Handler) *SocketServer {
......@@ -99,7 +105,18 @@ func NewSystemdSocketServer(h Handler) (*SocketServer, error) {
// Waits for active connections to terminate before returning.
func (s *SocketServer) Close() {
s.closing.Store(true)
// Close the listener to stop incoming connections.
s.l.Close() // nolint
// Close all active connections (this will return an error to
// the client if the connection is not idle).
s.connMx.Lock()
for el := s.conns.Front(); el != nil; el = el.Next() {
el.Value.(net.Conn).Close() // nolint
}
s.connMx.Unlock()
s.wg.Wait()
if s.lock != nil {
s.lock.Unlock() // nolint
......@@ -120,10 +137,21 @@ func (s *SocketServer) Serve() error {
}
return err
}
s.wg.Add(1)
s.connMx.Lock()
connEl := s.conns.PushBack(conn)
s.connMx.Unlock()
go func() {
s.handler.ServeConnection(conn)
conn.Close() // nolint
if !s.isClosing() {
s.connMx.Lock()
s.conns.Remove(connEl)
s.connMx.Unlock()
}
s.wg.Done()
}()
}
......
......@@ -3,52 +3,52 @@
"ignore": "test",
"package": [
{
"checksumSHA1": "pLvPnUablirQucyALgrso9hLG4E=",
"checksumSHA1": "oUOxU+Tw1/jOzWVP05HuGvVSC/A=",
"path": "git.autistici.org/ai3/go-common",
"revision": "438dda6c699e73e612d0b16143076998d77c03ac",
"revisionTime": "2018-11-22T23:09:18Z"
"revision": "54f0ac4c46184ae44486a31ca2705076abcc5321",
"revisionTime": "2019-06-30T08:30:15Z"
},
{
"checksumSHA1": "1ChQcW9Biu/AgiKjsbJFg/+WhjQ=",
"checksumSHA1": "kJwm6y9JXhybelO2zUl7UbzIdP0=",
"path": "git.autistici.org/ai3/go-common/clientutil",
"revision": "438dda6c699e73e612d0b16143076998d77c03ac",
"revisionTime": "2018-11-22T23:09:18Z"
"revision": "54f0ac4c46184ae44486a31ca2705076abcc5321",
"revisionTime": "2019-06-30T08:30:15Z"
},
{
"checksumSHA1": "d8aQcSXveyjPfFJgfB8NnM+x8dg=",
"path": "git.autistici.org/ai3/go-common/ldap",
"revision": "2934fd63c275d37b0fe60afabb484a251662bd49",
"revisionTime": "2019-02-17T09:01:06Z"
"revision": "54f0ac4c46184ae44486a31ca2705076abcc5321",
"revisionTime": "2019-06-30T08:30:15Z"
},
{
"checksumSHA1": "X14iCbFCOfaIai/TPi4VJ/OBZjc=",
"checksumSHA1": "ETt1H7ZXeT+mOGVuWDvgGBVx98k=",
"path": "git.autistici.org/ai3/go-common/ldap/compositetypes",
"revision": "301958e3493e263eb6ea269bf7b8644fbcd97394",
"revisionTime": "2019-03-21T10:42:03Z"
"revision": "54f0ac4c46184ae44486a31ca2705076abcc5321",
"revisionTime": "2019-06-30T08:30:15Z"
},
{
"checksumSHA1": "TKGUNmKxj7KH3qhwiCh/6quUnwc=",
"path": "git.autistici.org/ai3/go-common/serverutil",
"revision": "438dda6c699e73e612d0b16143076998d77c03ac",
"revisionTime": "2018-11-22T23:09:18Z"
"revision": "54f0ac4c46184ae44486a31ca2705076abcc5321",
"revisionTime": "2019-06-30T08:30:15Z"
},
{
"checksumSHA1": "tWzCSEieYFl5VnKhFTosgveO6Ys=",
"checksumSHA1": "y5pRYZ/NhfEOCFslPEuUZTYXcro=",
"path": "git.autistici.org/ai3/go-common/tracing",
"revision": "438dda6c699e73e612d0b16143076998d77c03ac",
"revisionTime": "2018-11-22T23:09:18Z"
"revision": "54f0ac4c46184ae44486a31ca2705076abcc5321",
"revisionTime": "2019-06-30T08:30:15Z"
},
{
"checksumSHA1": "R18oCPkWjuPqDxPgKvG1KhiSJns=",
"checksumSHA1": "jRc0JfRUtCr3xxkgwRDVppsSnl0=",
"path": "git.autistici.org/ai3/go-common/unix",
"revision": "438dda6c699e73e612d0b16143076998d77c03ac",
"revisionTime": "2018-11-22T23:09:18Z"
"revision": "54f0ac4c46184ae44486a31ca2705076abcc5321",
"revisionTime": "2019-06-30T08:30:15Z"
},
{
"checksumSHA1": "witSYnNsDhNaoA85UYilt17H+ng=",
"path": "git.autistici.org/ai3/go-common/userenckey",
"revision": "438dda6c699e73e612d0b16143076998d77c03ac",
"revisionTime": "2018-11-22T23:09:18Z"
"revision": "54f0ac4c46184ae44486a31ca2705076abcc5321",
"revisionTime": "2019-06-30T08:30:15Z"
},
{
"checksumSHA1": "MszadHmYMr3JQMX2gRg7TfsQWVc=",
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment