Skip to content
Snippets Groups Projects
Commit bc2d1de6 authored by ale's avatar ale
Browse files

Upgrade ai3/go-common

parent 485a2b6c
No related branches found
No related tags found
No related merge requests found
...@@ -8,6 +8,7 @@ import ( ...@@ -8,6 +8,7 @@ import (
"time" "time"
"github.com/cenkalti/backoff" "github.com/cenkalti/backoff"
"go.opencensus.io/trace"
"gopkg.in/ldap.v2" "gopkg.in/ldap.v2"
) )
...@@ -147,9 +148,18 @@ func NewConnectionPool(uri, bindDN, bindPw string, cacheSize int) (*ConnectionPo ...@@ -147,9 +148,18 @@ func NewConnectionPool(uri, bindDN, bindPw string, cacheSize int) (*ConnectionPo
}, nil }, nil
} }
func (p *ConnectionPool) doRequest(ctx context.Context, fn func(*ldap.Conn) error) error { func (p *ConnectionPool) doRequest(ctx context.Context, name string, attrs []trace.Attribute, fn func(*ldap.Conn) error) error {
return backoff.Retry(func() error { // Tracing: initialize a new client span.
conn, err := p.Get(ctx) sctx, span := trace.StartSpan(ctx, name,
trace.WithSpanKind(trace.SpanKindClient))
defer span.End()
if len(attrs) > 0 {
span.AddAttributes(attrs...)
}
rerr := backoff.Retry(func() error {
conn, err := p.Get(sctx)
if err != nil { if err != nil {
// Here conn is nil, so we don't need to Release it. // Here conn is nil, so we don't need to Release it.
if isTemporaryLDAPError(err) { if isTemporaryLDAPError(err) {
...@@ -158,7 +168,7 @@ func (p *ConnectionPool) doRequest(ctx context.Context, fn func(*ldap.Conn) erro ...@@ -158,7 +168,7 @@ func (p *ConnectionPool) doRequest(ctx context.Context, fn func(*ldap.Conn) erro
return backoff.Permanent(err) return backoff.Permanent(err)
} }
if deadline, ok := ctx.Deadline(); ok { if deadline, ok := sctx.Deadline(); ok {
conn.SetTimeout(time.Until(deadline)) conn.SetTimeout(time.Until(deadline))
} }
...@@ -169,30 +179,42 @@ func (p *ConnectionPool) doRequest(ctx context.Context, fn func(*ldap.Conn) erro ...@@ -169,30 +179,42 @@ func (p *ConnectionPool) doRequest(ctx context.Context, fn func(*ldap.Conn) erro
} }
return err return err
}, backoff.WithContext(newExponentialBackOff(), ctx)) }, backoff.WithContext(newExponentialBackOff(), ctx))
// Tracing: set the final status.
span.SetStatus(errorToTraceStatus(rerr))
return rerr
} }
// Search performs the given search request. It will retry the request // Search performs the given search request. It will retry the request
// on temporary errors. // on temporary errors.
func (p *ConnectionPool) Search(ctx context.Context, searchRequest *ldap.SearchRequest) (*ldap.SearchResult, error) { func (p *ConnectionPool) Search(ctx context.Context, searchRequest *ldap.SearchRequest) (*ldap.SearchResult, error) {
var result *ldap.SearchResult var result *ldap.SearchResult
err := p.doRequest(ctx, func(conn *ldap.Conn) error { err := p.doRequest(ctx, "ldap.Search", []trace.Attribute{
var err error trace.StringAttribute("ldap.base", searchRequest.BaseDN),
result, err = conn.Search(searchRequest) trace.StringAttribute("ldap.filter", searchRequest.Filter),
return err trace.Int64Attribute("ldap.scope", int64(searchRequest.Scope)),
}, func(conn *ldap.Conn) (cerr error) {
result, cerr = conn.Search(searchRequest)
return
}) })
return result, err return result, err
} }
// Modify issues a ModifyRequest to the LDAP server. // Modify issues a ModifyRequest to the LDAP server.
func (p *ConnectionPool) Modify(ctx context.Context, modifyRequest *ldap.ModifyRequest) error { func (p *ConnectionPool) Modify(ctx context.Context, modifyRequest *ldap.ModifyRequest) error {
return p.doRequest(ctx, func(conn *ldap.Conn) error { return p.doRequest(ctx, "ldap.Modify", []trace.Attribute{
trace.StringAttribute("ldap.dn", modifyRequest.DN),
}, func(conn *ldap.Conn) error {
return conn.Modify(modifyRequest) return conn.Modify(modifyRequest)
}) })
} }
// Add issues an AddRequest to the LDAP server. // Add issues an AddRequest to the LDAP server.
func (p *ConnectionPool) Add(ctx context.Context, addRequest *ldap.AddRequest) error { func (p *ConnectionPool) Add(ctx context.Context, addRequest *ldap.AddRequest) error {
return p.doRequest(ctx, func(conn *ldap.Conn) error { return p.doRequest(ctx, "ldap.Add", []trace.Attribute{
trace.StringAttribute("ldap.dn", addRequest.DN),
}, func(conn *ldap.Conn) error {
return conn.Add(addRequest) return conn.Add(addRequest)
}) })
} }
...@@ -219,3 +241,16 @@ func isTemporaryLDAPError(err error) bool { ...@@ -219,3 +241,16 @@ func isTemporaryLDAPError(err error) bool {
return false return false
} }
} }
func errorToTraceStatus(err error) trace.Status {
switch err {
case nil:
return trace.Status{Code: trace.StatusCodeOK, Message: "OK"}
case context.Canceled:
return trace.Status{Code: trace.StatusCodeCancelled, Message: "CANCELED"}
case context.DeadlineExceeded:
return trace.Status{Code: trace.StatusCodeDeadlineExceeded, Message: "DEADLINE_EXCEEDED"}
default:
return trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()}
}
}
...@@ -8,6 +8,7 @@ import ( ...@@ -8,6 +8,7 @@ import (
"net/http" "net/http"
"os" "os"
"path/filepath" "path/filepath"
"strconv"
"sync" "sync"
openzipkin "github.com/openzipkin/zipkin-go" openzipkin "github.com/openzipkin/zipkin-go"
...@@ -31,6 +32,7 @@ const globalTracingConfigPath = "/etc/tracing/client.conf" ...@@ -31,6 +32,7 @@ const globalTracingConfigPath = "/etc/tracing/client.conf"
type tracingConfig struct { type tracingConfig struct {
ReportURL string `json:"report_url"` ReportURL string `json:"report_url"`
Sample string `json:"sample"`
} }
// Read the global tracing configuration file. Its location is // Read the global tracing configuration file. Its location is
...@@ -100,9 +102,23 @@ func initTracing(endpointAddr string) { ...@@ -100,9 +102,23 @@ func initTracing(endpointAddr string) {
reporter := zipkinHTTP.NewReporter(config.ReportURL) reporter := zipkinHTTP.NewReporter(config.ReportURL)
ze := zipkin.NewExporter(reporter, localEndpoint) ze := zipkin.NewExporter(reporter, localEndpoint)
trace.RegisterExporter(ze) trace.RegisterExporter(ze)
trace.ApplyConfig(trace.Config{DefaultSampler: trace.AlwaysSample()})
var tc trace.Config
switch config.Sample {
case "", "always":
tc.DefaultSampler = trace.AlwaysSample()
case "never":
tc.DefaultSampler = trace.NeverSample()
default:
frac, err := strconv.ParseFloat(config.Sample, 64)
if err != nil {
log.Printf("warning: error in tracing configuration: sample: %v, tracing disabled", err)
return
}
tc.DefaultSampler = trace.ProbabilitySampler(frac)
}
trace.ApplyConfig(tc)
log.Printf("tracing enabled (report_url %s)", config.ReportURL) log.Printf("tracing enabled (report_url %s)", config.ReportURL)
......
...@@ -14,6 +14,7 @@ import ( ...@@ -14,6 +14,7 @@ import (
"time" "time"
"github.com/coreos/go-systemd/activation" "github.com/coreos/go-systemd/activation"
"github.com/prometheus/client_golang/prometheus"
"github.com/theckman/go-flock" "github.com/theckman/go-flock"
) )
...@@ -51,7 +52,7 @@ func NewUNIXSocketServer(socketPath string, h Handler) (*SocketServer, error) { ...@@ -51,7 +52,7 @@ func NewUNIXSocketServer(socketPath string, h Handler) (*SocketServer, error) {
// successfully. We could remove it before starting, but that // successfully. We could remove it before starting, but that
// would be dangerous if another instance was listening on // would be dangerous if another instance was listening on
// that socket. So we wrap socket access with a file lock. // that socket. So we wrap socket access with a file lock.
lock := flock.NewFlock(socketPath + ".lock") lock := flock.New(socketPath + ".lock")
locked, err := lock.TryLock() locked, err := lock.TryLock()
if err != nil { if err != nil {
return nil, err return nil, err
...@@ -66,7 +67,7 @@ func NewUNIXSocketServer(socketPath string, h Handler) (*SocketServer, error) { ...@@ -66,7 +67,7 @@ func NewUNIXSocketServer(socketPath string, h Handler) (*SocketServer, error) {
} }
// Always try to unlink the socket before creating it. // Always try to unlink the socket before creating it.
os.Remove(socketPath) os.Remove(socketPath) // nolint
l, err := net.ListenUnix("unix", addr) l, err := net.ListenUnix("unix", addr)
if err != nil { if err != nil {
...@@ -98,10 +99,10 @@ func NewSystemdSocketServer(h Handler) (*SocketServer, error) { ...@@ -98,10 +99,10 @@ func NewSystemdSocketServer(h Handler) (*SocketServer, error) {
// Waits for active connections to terminate before returning. // Waits for active connections to terminate before returning.
func (s *SocketServer) Close() { func (s *SocketServer) Close() {
s.closing.Store(true) s.closing.Store(true)
s.l.Close() s.l.Close() // nolint
s.wg.Wait() s.wg.Wait()
if s.lock != nil { if s.lock != nil {
s.lock.Unlock() s.lock.Unlock() // nolint
} }
} }
...@@ -122,7 +123,7 @@ func (s *SocketServer) Serve() error { ...@@ -122,7 +123,7 @@ func (s *SocketServer) Serve() error {
s.wg.Add(1) s.wg.Add(1)
go func() { go func() {
s.handler.ServeConnection(conn) s.handler.ServeConnection(conn)
conn.Close() conn.Close() // nolint
s.wg.Done() s.wg.Done()
}() }()
} }
...@@ -198,11 +199,15 @@ func (w *lrWriter) WriteLineCRLF(data []byte) error { ...@@ -198,11 +199,15 @@ func (w *lrWriter) WriteLineCRLF(data []byte) error {
return w.Writer.Flush() return w.Writer.Flush()
} }
// ServeConnection handles a new connection. It will accept multiple
// requests on the same connection (or not, depending on the client
// preference).
func (l *LineServer) ServeConnection(nc net.Conn) { func (l *LineServer) ServeConnection(nc net.Conn) {
totalConnections.Inc()
c := textproto.NewConn(nc) c := textproto.NewConn(nc)
rw := &lrWriter{bufio.NewWriter(nc)} rw := &lrWriter{bufio.NewWriter(nc)}
for { for {
nc.SetReadDeadline(time.Now().Add(l.IdleTimeout)) nc.SetReadDeadline(time.Now().Add(l.IdleTimeout)) // nolint
line, err := c.ReadLineBytes() line, err := c.ReadLineBytes()
if err == io.EOF { if err == io.EOF {
break break
...@@ -215,17 +220,59 @@ func (l *LineServer) ServeConnection(nc net.Conn) { ...@@ -215,17 +220,59 @@ func (l *LineServer) ServeConnection(nc net.Conn) {
// handler with it. Set a write deadline on the // handler with it. Set a write deadline on the
// connection to allow the full RequestTimeout time to // connection to allow the full RequestTimeout time to
// generate the response. // generate the response.
nc.SetWriteDeadline(time.Now().Add(l.RequestTimeout + l.WriteTimeout)) start := time.Now()
nc.SetWriteDeadline(start.Add(l.RequestTimeout + l.WriteTimeout)) // nolint
ctx, cancel := context.WithTimeout(context.Background(), l.RequestTimeout) ctx, cancel := context.WithTimeout(context.Background(), l.RequestTimeout)
err = l.handler.ServeLine(ctx, rw, line) err = l.handler.ServeLine(ctx, rw, line)
elapsedMs := time.Since(start).Nanoseconds() / 1000000
requestLatencyHist.Observe(float64(elapsedMs))
cancel() cancel()
// Close the connection on error, or on empty response. // Close the connection on error, or on empty response.
if err != nil { if err != nil {
totalRequests.With(prometheus.Labels{
"status": "error",
}).Inc()
if err != ErrCloseConnection { if err != ErrCloseConnection {
log.Printf("request error: %v", err) log.Printf("request error: %v", err)
} }
break break
} }
totalRequests.With(prometheus.Labels{
"status": "ok",
}).Inc()
} }
} }
// Instrumentation metrics.
var (
totalConnections = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "unix_connections_total",
Help: "Total number of connections.",
},
)
totalRequests = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "unix_requests_total",
Help: "Total number of requests.",
},
[]string{"status"},
)
// Histogram buckets are tuned for the low-milliseconds range
// (the largest bucket sits at ~1s).
requestLatencyHist = prometheus.NewHistogram(
prometheus.HistogramOpts{
Name: "unix_requests_latency_ms",
Help: "Latency of requests.",
Buckets: prometheus.ExponentialBuckets(5, 1.4142, 16),
},
)
)
func init() {
prometheus.MustRegister(totalConnections)
prometheus.MustRegister(totalRequests)
prometheus.MustRegister(requestLatencyHist)
}
...@@ -5,44 +5,44 @@ ...@@ -5,44 +5,44 @@
{ {
"checksumSHA1": "pLvPnUablirQucyALgrso9hLG4E=", "checksumSHA1": "pLvPnUablirQucyALgrso9hLG4E=",
"path": "git.autistici.org/ai3/go-common", "path": "git.autistici.org/ai3/go-common",
"revision": "1f95fcdd58ebf63d338f05ceae29d2de811a2d2f", "revision": "438dda6c699e73e612d0b16143076998d77c03ac",
"revisionTime": "2018-11-18T16:11:30Z" "revisionTime": "2018-11-22T23:09:18Z"
}, },
{ {
"checksumSHA1": "1ChQcW9Biu/AgiKjsbJFg/+WhjQ=", "checksumSHA1": "1ChQcW9Biu/AgiKjsbJFg/+WhjQ=",
"path": "git.autistici.org/ai3/go-common/clientutil", "path": "git.autistici.org/ai3/go-common/clientutil",
"revision": "1f95fcdd58ebf63d338f05ceae29d2de811a2d2f", "revision": "438dda6c699e73e612d0b16143076998d77c03ac",
"revisionTime": "2018-11-18T16:11:30Z" "revisionTime": "2018-11-22T23:09:18Z"
}, },
{ {
"checksumSHA1": "kQbBWZqrXc95wodlrOKEshQVaBo=", "checksumSHA1": "npDdYYmInhdVxcCpOQ844cVlfzQ=",
"path": "git.autistici.org/ai3/go-common/ldap", "path": "git.autistici.org/ai3/go-common/ldap",
"revision": "1f95fcdd58ebf63d338f05ceae29d2de811a2d2f", "revision": "438dda6c699e73e612d0b16143076998d77c03ac",
"revisionTime": "2018-11-18T16:11:30Z" "revisionTime": "2018-11-22T23:09:18Z"
}, },
{ {
"checksumSHA1": "TKGUNmKxj7KH3qhwiCh/6quUnwc=", "checksumSHA1": "TKGUNmKxj7KH3qhwiCh/6quUnwc=",
"path": "git.autistici.org/ai3/go-common/serverutil", "path": "git.autistici.org/ai3/go-common/serverutil",
"revision": "1f95fcdd58ebf63d338f05ceae29d2de811a2d2f", "revision": "438dda6c699e73e612d0b16143076998d77c03ac",
"revisionTime": "2018-11-18T16:11:30Z" "revisionTime": "2018-11-22T23:09:18Z"
}, },
{ {
"checksumSHA1": "WvuSF0pz3rk7bu+5g9lqTqq97Ow=", "checksumSHA1": "tWzCSEieYFl5VnKhFTosgveO6Ys=",
"path": "git.autistici.org/ai3/go-common/tracing", "path": "git.autistici.org/ai3/go-common/tracing",
"revision": "1f95fcdd58ebf63d338f05ceae29d2de811a2d2f", "revision": "438dda6c699e73e612d0b16143076998d77c03ac",
"revisionTime": "2018-11-18T16:11:30Z" "revisionTime": "2018-11-22T23:09:18Z"
}, },
{ {
"checksumSHA1": "Okvoje2tgehkMo1N9Q601JPgGoE=", "checksumSHA1": "R18oCPkWjuPqDxPgKvG1KhiSJns=",
"path": "git.autistici.org/ai3/go-common/unix", "path": "git.autistici.org/ai3/go-common/unix",
"revision": "1f95fcdd58ebf63d338f05ceae29d2de811a2d2f", "revision": "438dda6c699e73e612d0b16143076998d77c03ac",
"revisionTime": "2018-11-18T16:11:30Z" "revisionTime": "2018-11-22T23:09:18Z"
}, },
{ {
"checksumSHA1": "witSYnNsDhNaoA85UYilt17H+ng=", "checksumSHA1": "witSYnNsDhNaoA85UYilt17H+ng=",
"path": "git.autistici.org/ai3/go-common/userenckey", "path": "git.autistici.org/ai3/go-common/userenckey",
"revision": "1f95fcdd58ebf63d338f05ceae29d2de811a2d2f", "revision": "438dda6c699e73e612d0b16143076998d77c03ac",
"revisionTime": "2018-11-18T16:11:30Z" "revisionTime": "2018-11-22T23:09:18Z"
}, },
{ {
"checksumSHA1": "MszadHmYMr3JQMX2gRg7TfsQWVc=", "checksumSHA1": "MszadHmYMr3JQMX2gRg7TfsQWVc=",
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment