Skip to content
Snippets Groups Projects
Commit fd088454 authored by ale's avatar ale
Browse files

Add instrumentation to the UNIX socket server

parent 4e75d592
No related branches found
No related tags found
No related merge requests found
...@@ -14,6 +14,7 @@ import ( ...@@ -14,6 +14,7 @@ import (
"time" "time"
"github.com/coreos/go-systemd/activation" "github.com/coreos/go-systemd/activation"
"github.com/prometheus/client_golang/prometheus"
"github.com/theckman/go-flock" "github.com/theckman/go-flock"
) )
...@@ -51,7 +52,7 @@ func NewUNIXSocketServer(socketPath string, h Handler) (*SocketServer, error) { ...@@ -51,7 +52,7 @@ func NewUNIXSocketServer(socketPath string, h Handler) (*SocketServer, error) {
// successfully. We could remove it before starting, but that // successfully. We could remove it before starting, but that
// would be dangerous if another instance was listening on // would be dangerous if another instance was listening on
// that socket. So we wrap socket access with a file lock. // that socket. So we wrap socket access with a file lock.
lock := flock.NewFlock(socketPath + ".lock") lock := flock.New(socketPath + ".lock")
locked, err := lock.TryLock() locked, err := lock.TryLock()
if err != nil { if err != nil {
return nil, err return nil, err
...@@ -66,7 +67,7 @@ func NewUNIXSocketServer(socketPath string, h Handler) (*SocketServer, error) { ...@@ -66,7 +67,7 @@ func NewUNIXSocketServer(socketPath string, h Handler) (*SocketServer, error) {
} }
// Always try to unlink the socket before creating it. // Always try to unlink the socket before creating it.
os.Remove(socketPath) os.Remove(socketPath) // nolint
l, err := net.ListenUnix("unix", addr) l, err := net.ListenUnix("unix", addr)
if err != nil { if err != nil {
...@@ -98,10 +99,10 @@ func NewSystemdSocketServer(h Handler) (*SocketServer, error) { ...@@ -98,10 +99,10 @@ func NewSystemdSocketServer(h Handler) (*SocketServer, error) {
// Waits for active connections to terminate before returning. // Waits for active connections to terminate before returning.
func (s *SocketServer) Close() { func (s *SocketServer) Close() {
s.closing.Store(true) s.closing.Store(true)
s.l.Close() s.l.Close() // nolint
s.wg.Wait() s.wg.Wait()
if s.lock != nil { if s.lock != nil {
s.lock.Unlock() s.lock.Unlock() // nolint
} }
} }
...@@ -122,7 +123,7 @@ func (s *SocketServer) Serve() error { ...@@ -122,7 +123,7 @@ func (s *SocketServer) Serve() error {
s.wg.Add(1) s.wg.Add(1)
go func() { go func() {
s.handler.ServeConnection(conn) s.handler.ServeConnection(conn)
conn.Close() conn.Close() // nolint
s.wg.Done() s.wg.Done()
}() }()
} }
...@@ -198,11 +199,15 @@ func (w *lrWriter) WriteLineCRLF(data []byte) error { ...@@ -198,11 +199,15 @@ func (w *lrWriter) WriteLineCRLF(data []byte) error {
return w.Writer.Flush() return w.Writer.Flush()
} }
// ServeConnection handles a new connection. It will accept multiple
// requests on the same connection (or not, depending on the client
// preference).
func (l *LineServer) ServeConnection(nc net.Conn) { func (l *LineServer) ServeConnection(nc net.Conn) {
totalConnections.Inc()
c := textproto.NewConn(nc) c := textproto.NewConn(nc)
rw := &lrWriter{bufio.NewWriter(nc)} rw := &lrWriter{bufio.NewWriter(nc)}
for { for {
nc.SetReadDeadline(time.Now().Add(l.IdleTimeout)) nc.SetReadDeadline(time.Now().Add(l.IdleTimeout)) // nolint
line, err := c.ReadLineBytes() line, err := c.ReadLineBytes()
if err == io.EOF { if err == io.EOF {
break break
...@@ -215,17 +220,59 @@ func (l *LineServer) ServeConnection(nc net.Conn) { ...@@ -215,17 +220,59 @@ func (l *LineServer) ServeConnection(nc net.Conn) {
// handler with it. Set a write deadline on the // handler with it. Set a write deadline on the
// connection to allow the full RequestTimeout time to // connection to allow the full RequestTimeout time to
// generate the response. // generate the response.
nc.SetWriteDeadline(time.Now().Add(l.RequestTimeout + l.WriteTimeout)) start := time.Now()
nc.SetWriteDeadline(start.Add(l.RequestTimeout + l.WriteTimeout)) // nolint
ctx, cancel := context.WithTimeout(context.Background(), l.RequestTimeout) ctx, cancel := context.WithTimeout(context.Background(), l.RequestTimeout)
err = l.handler.ServeLine(ctx, rw, line) err = l.handler.ServeLine(ctx, rw, line)
elapsedMs := time.Since(start).Nanoseconds() / 1000000
requestLatencyHist.Observe(float64(elapsedMs))
cancel() cancel()
// Close the connection on error, or on empty response. // Close the connection on error, or on empty response.
if err != nil { if err != nil {
totalRequests.With(prometheus.Labels{
"status": "error",
}).Inc()
if err != ErrCloseConnection { if err != ErrCloseConnection {
log.Printf("request error: %v", err) log.Printf("request error: %v", err)
} }
break break
} }
totalRequests.With(prometheus.Labels{
"status": "ok",
}).Inc()
} }
} }
// Instrumentation metrics.
var (
totalConnections = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "unix_connections_total",
Help: "Total number of connections.",
},
)
totalRequests = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "unix_requests_total",
Help: "Total number of requests.",
},
[]string{"status"},
)
// Histogram buckets are tuned for the low-milliseconds range
// (the largest bucket sits at ~1s).
requestLatencyHist = prometheus.NewHistogram(
prometheus.HistogramOpts{
Name: "unix_requests_latency_ms",
Help: "Latency of requests.",
Buckets: prometheus.ExponentialBuckets(5, 1.4142, 16),
},
)
)
func init() {
prometheus.MustRegister(totalConnections)
prometheus.MustRegister(totalRequests)
prometheus.MustRegister(requestLatencyHist)
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment