Skip to content
Snippets Groups Projects
Commit d4396660 authored by ale's avatar ale
Browse files

Propagate context deadlines across RPC boundaries

parent 8a072922
No related branches found
No related tags found
No related merge requests found
......@@ -221,7 +221,7 @@ func (b *balancedBackend) do(ctx context.Context, seq *sequence, req *http.Reque
client := &http.Client{
Transport: b.transportCache.getTransport(target),
}
resp, err = client.Do(req.WithContext(ctx))
resp, err = client.Do(propagateDeadline(ctx, req))
if err == nil && resp.StatusCode != 200 {
err = remoteErrorFromResponse(resp)
if !isStatusTemporary(resp.StatusCode) {
......@@ -235,6 +235,19 @@ func (b *balancedBackend) do(ctx context.Context, seq *sequence, req *http.Reque
return
}
const deadlineHeader = "X-RPC-Deadline"
// Propagate context deadline to the server using a HTTP header.
func propagateDeadline(ctx context.Context, req *http.Request) *http.Request {
req = req.WithContext(ctx)
if deadline, ok := ctx.Deadline(); ok {
req.Header.Set(deadlineHeader, strconv.FormatInt(deadline.UTC().UnixNano(), 10))
} else {
req.Header.Del(deadlineHeader)
}
return req
}
var errNoTargets = errors.New("no available backends")
type targetGenerator interface {
......
......@@ -11,6 +11,7 @@ import (
_ "net/http/pprof"
"os"
"os/signal"
"strconv"
"syscall"
"time"
......@@ -162,11 +163,30 @@ func addDefaultHandlers(h http.Handler) http.Handler {
// Prometheus instrumentation (requests to /metrics and
// /health are not included).
root.Handle("/", promhttp.InstrumentHandlerInFlight(inFlightRequests,
promhttp.InstrumentHandlerCounter(totalRequests, h)))
promhttp.InstrumentHandlerCounter(totalRequests,
propagateDeadline(h))))
return root
}
const deadlineHeader = "X-RPC-Deadline"
// Read an eventual deadline from the HTTP request, and set it as the
// deadline of the request context.
func propagateDeadline(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
if hdr := req.Header.Get(deadlineHeader); hdr != "" {
if deadlineNano, err := strconv.ParseInt(hdr, 10, 64); err == nil {
deadline := time.Unix(0, deadlineNano)
ctx, cancel := context.WithDeadline(req.Context(), deadline)
defer cancel()
req = req.WithContext(ctx)
}
}
h.ServeHTTP(w, req)
})
}
func guessEndpointName(addr string) string {
_, port, err := net.SplitHostPort(addr)
if err != nil {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment