Commit 882128fa authored by ale's avatar ale

Re-use connections in the client

Maintain a pool of cached connections.
parent 288cf94e
Pipeline #8386 passed with stages
in 1 minute and 38 seconds
......@@ -3,15 +3,19 @@ package client
import (
"context"
"net"
"net/textproto"
"strings"
"github.com/cenkalti/backoff"
"go.opencensus.io/trace"
"git.autistici.org/id/auth"
"git.autistici.org/id/auth/lineproto"
)
var DefaultSocketPath = "/run/auth/socket"
var (
DefaultSocketPath = "/run/auth/socket"
DefaultPoolSize = 3
)
type Client interface {
Authenticate(context.Context, *auth.Request) (*auth.Response, error)
......@@ -20,12 +24,20 @@ type Client interface {
type socketClient struct {
socketPath string
codec auth.Codec
pool *pool
}
func New(socketPath string) Client {
return &socketClient{
socketPath: socketPath,
codec: auth.DefaultCodec,
pool: newPool(func() (*lineproto.Conn, error) {
c, err := net.Dial("unix", socketPath)
if err != nil {
return nil, err
}
return lineproto.NewConn(c, ""), nil
}, DefaultPoolSize),
}
}
......@@ -47,6 +59,8 @@ func (c *socketClient) Authenticate(ctx context.Context, req *auth.Request) (*au
resp, err = c.doAuthenticate(sctx, req)
if err == nil {
return nil
} else if strings.Contains(err.Error(), "use of closed network connection") {
return err
} else if netErr, ok := err.(net.Error); ok && netErr.Temporary() {
return netErr
}
......@@ -59,52 +73,45 @@ func (c *socketClient) Authenticate(ctx context.Context, req *auth.Request) (*au
}
func (c *socketClient) doAuthenticate(ctx context.Context, req *auth.Request) (*auth.Response, error) {
// Create the connection outside of the timed goroutine, so
// that we can call Close() on exit regardless of the reason:
// this way, when a timeout occurs or the context is canceled,
// the pending request terminates immediately.
conn, err := textproto.Dial("unix", c.socketPath)
if err != nil {
return nil, err
}
defer conn.Close()
// Make space in the channel for at least one element, or we
// will leak a goroutine whenever the authentication request
// times out.
done := make(chan error, 1)
var resp auth.Response
go func() {
defer close(done)
// Write the auth command to the connection.
if err := conn.PrintfLine("auth %s", string(c.codec.Encode(req))); err != nil {
done <- err
return
err := c.pool.withConn(func(conn *lineproto.Conn) error {
// Make space in the channel for at least one element, or we
// will leak a goroutine whenever the authentication request
// times out.
done := make(chan error, 1)
go func() {
defer close(done)
// Write the auth command to the connection.
if err := conn.WriteLine([]byte("auth "), c.codec.Encode(req)); err != nil {
done <- err
return
}
// Read the response.
line, err := conn.ReadLine()
if err != nil {
done <- err
return
}
if err := c.codec.Decode(line, &resp); err != nil {
done <- err
return
}
done <- nil
}()
// Wait for the call to terminate, or the context to time out,
// whichever happens first.
select {
case err := <-done:
return err
case <-ctx.Done():
return ctx.Err()
}
// Read the response.
line, err := conn.ReadLineBytes()
if err != nil {
done <- err
return
}
if err := c.codec.Decode(line, &resp); err != nil {
done <- err
return
}
done <- nil
}()
// Wait for the call to terminate, or the context to time out,
// whichever happens first.
select {
case err := <-done:
return &resp, err
case <-ctx.Done():
return nil, ctx.Err()
}
})
return &resp, err
}
func responseToTraceStatus(resp *auth.Response, err error) trace.Status {
......
package client
import (
"git.autistici.org/id/auth/lineproto"
)
type poolDialer func() (*lineproto.Conn, error)
type pool struct {
ch chan *lineproto.Conn
dialer poolDialer
}
func newPool(dialer poolDialer, size int) *pool {
return &pool{
ch: make(chan *lineproto.Conn, size),
dialer: dialer,
}
}
func (p *pool) withConn(f func(*lineproto.Conn) error) error {
// Acquire a connection.
var conn *lineproto.Conn
select {
case conn = <-p.ch:
default:
var err error
conn, err = p.dialer()
if err != nil {
return err
}
}
// Run the function and inspect its return value.
err := f(conn)
if err != nil {
conn.Close()
} else {
select {
case p.ch <- conn:
default:
conn.Close()
}
}
return err
}
......@@ -64,3 +64,7 @@ func NewConn(c net.Conn, name string) *Conn {
ServerName: name,
}
}
func (c *Conn) Close() error {
return c.Conn.Close()
}
......@@ -5,6 +5,7 @@ import (
"errors"
"io"
"log"
"strings"
"time"
"github.com/prometheus/client_golang/prometheus"
......@@ -65,7 +66,9 @@ func (l *LineServer) ServeConnection(c *Conn) {
if err == io.EOF {
break
} else if err != nil {
log.Printf("client error: %v", err)
if !strings.Contains(err.Error(), "use of closed network connection") {
log.Printf("client error: %v", err)
}
break
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment