Commit 85c333c9 authored by ale's avatar ale

Update dependencies

parent c4a888c7
Pipeline #1979 failed with stages
in 36 seconds
ai3/go-common
===
Common code for ai3 services and tools.
A quick overview of the contents:
* [client](clientutil/) and [server](serverutil/) HTTP-based
"RPC" implementation, just JSON POST requests but with retries,
backoff, timeouts, tracing, etc.
* [server implementation of a line-based protocol over a UNIX socket](unix/)
* a [LDAP connection pool](ldap/)
* a [password hashing library](pwhash/) that uses fancy advanced
crypto by default but is also backwards compatible with old
libc crypto.
* utilities to [manage encryption keys](userenckey/), themselves
encrypted with a password and a KDF.
......@@ -104,7 +104,10 @@ func (b *balancedBackend) Call(ctx context.Context, shard, path string, req, res
}
var tg targetGenerator = b.backendTracker
if b.sharded && shard != "" {
if b.sharded {
if shard == "" {
return fmt.Errorf("call without shard to sharded service %s", b.baseURI.String())
}
tg = newShardedGenerator(shard, b.baseURI.Host, b.resolver)
}
seq := newSequence(tg)
......
......@@ -40,6 +40,8 @@ type cacheDatum struct {
deadline time.Time
}
var dnsCacheTTL = 1 * time.Minute
type dnsCache struct {
resolver resolver
sf singleflight.Group
......@@ -72,7 +74,7 @@ func (c *dnsCache) update(host string) []string {
c.mx.Lock()
c.cache[host] = cacheDatum{
addrs: addrs,
deadline: time.Now().Add(60 * time.Second),
deadline: time.Now().Add(dnsCacheTTL),
}
c.mx.Unlock()
return addrs, nil
......
......@@ -7,6 +7,8 @@ import (
"net/http"
"sync"
"time"
"git.autistici.org/ai3/go-common/tracing"
)
// The transportCache is just a cache of http transports, each
......@@ -29,12 +31,12 @@ func newTransportCache(tlsConfig *tls.Config) *transportCache {
}
func (m *transportCache) newTransport(addr string) http.RoundTripper {
return &http.Transport{
return tracing.WrapTransport(&http.Transport{
TLSClientConfig: m.tlsConfig,
DialContext: func(ctx context.Context, network, _ string) (net.Conn, error) {
return netDialContext(ctx, network, addr)
},
}
})
}
func (m *transportCache) getTransport(addr string) http.RoundTripper {
......
package clientutil
import (
"fmt"
"sync"
"time"
)
var dnsWatcherInterval = 1 * time.Minute
// A DNSWatcher monitors a DNS name for changes, constantly attempting
// to resolve it every minute and notifying a channel when the list of
// returned IP addresses changes. All addresses must be in host:port
// format.
type DNSWatcher struct {
hostport string
resolver resolver
addrs []string
updateCh chan []string
stopCh chan struct{}
}
// NewDNSWatcher creates a new DNSWatcher.
func NewDNSWatcher(hostport string) (*DNSWatcher, error) {
return newDNSWatcherWithResolver(hostport, defaultResolver)
}
func newDNSWatcherWithResolver(hostport string, resolver resolver) (*DNSWatcher, error) {
// Resolve names once before returning. Return a fatal error
// when there are no results, as it may indicate a syntax
// error in hostport.
addrs := resolver.ResolveIP(hostport)
if len(addrs) == 0 {
return nil, fmt.Errorf("can't resolve %s", hostport)
}
w := &DNSWatcher{
hostport: hostport,
resolver: resolver,
addrs: addrs,
updateCh: make(chan []string, 10),
stopCh: make(chan struct{}),
}
w.updateCh <- addrs
go w.loop()
return w, nil
}
// Stop the watcher.
func (w *DNSWatcher) Stop() {
close(w.stopCh)
}
// Changes returns a channel where the resolved addresses are sent
// whenever they change.
func (w *DNSWatcher) Changes() <-chan []string {
return w.updateCh
}
func (w *DNSWatcher) check() {
addrs := w.resolver.ResolveIP(w.hostport)
if len(addrs) > 0 && !addrListEqual(addrs, w.addrs) {
w.addrs = addrs
w.updateCh <- addrs
}
}
func (w *DNSWatcher) loop() {
defer close(w.updateCh)
tick := time.NewTicker(dnsWatcherInterval)
defer tick.Stop()
for {
select {
case <-tick.C:
w.check()
case <-w.stopCh:
return
}
}
}
type multiDNSUpdate struct {
hostport string
addrs []string
}
// A MultiDNSWatcher watches multiple addresses for DNS changes. The
// results are merged and returned as a list of addresses.
type MultiDNSWatcher struct {
watchers []*DNSWatcher
addrmap map[string][]string
faninCh chan multiDNSUpdate
updateCh chan []string
}
// NewMultiDNSWatcher creates a new MultiDNSWatcher.
func NewMultiDNSWatcher(hostports []string) (*MultiDNSWatcher, error) {
return newMultiDNSWatcherWithResolver(hostports, defaultResolver)
}
func newMultiDNSWatcherWithResolver(hostports []string, resolver resolver) (*MultiDNSWatcher, error) {
mw := &MultiDNSWatcher{
addrmap: make(map[string][]string),
faninCh: make(chan multiDNSUpdate, 10),
updateCh: make(chan []string, 10),
}
// All the MultiDNSWatcher does is multiplex updates from the
// individual DNSWatchers onto faninCh, then merging those
// updates with all the others and sending the result to
// updateCh.
go func() {
defer close(mw.updateCh)
for up := range mw.faninCh {
mw.addrmap[up.hostport] = up.addrs
mw.updateCh <- mw.allAddrs()
}
}()
var wg sync.WaitGroup
for _, hostport := range hostports {
w, err := newDNSWatcherWithResolver(hostport, resolver)
if err != nil {
return nil, err
}
mw.watchers = append(mw.watchers, w)
wg.Add(1)
go func(hostport string) {
for addrs := range w.Changes() {
mw.faninCh <- multiDNSUpdate{
hostport: hostport,
addrs: addrs,
}
}
wg.Done()
}(hostport)
}
go func() {
wg.Wait()
close(mw.faninCh)
}()
return mw, nil
}
func (mw *MultiDNSWatcher) allAddrs() []string {
var out []string
for _, addrs := range mw.addrmap {
out = append(out, addrs...)
}
return out
}
// Stop the watcher.
func (mw *MultiDNSWatcher) Stop() {
for _, w := range mw.watchers {
w.Stop()
}
}
// Changes returns a channel where the aggregate resolved addresses
// are sent whenever they change.
func (mw *MultiDNSWatcher) Changes() <-chan []string {
return mw.updateCh
}
func addrListEqual(a, b []string) bool {
if len(a) != len(b) {
return false
}
tmp := make(map[string]struct{})
for _, aa := range a {
tmp[aa] = struct{}{}
}
for _, bb := range b {
if _, ok := tmp[bb]; !ok {
return false
}
delete(tmp, bb)
}
return len(tmp) == 0
}
......@@ -3,16 +3,18 @@ package serverutil
import (
"context"
"crypto/tls"
"fmt"
"io"
"log"
"net"
"net/http"
"net/http/pprof"
_ "net/http/pprof"
"os"
"os/signal"
"syscall"
"time"
"git.autistici.org/ai3/go-common/tracing"
"github.com/coreos/go-systemd/daemon"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
......@@ -77,6 +79,10 @@ func (config *ServerConfig) buildHTTPServer(h http.Handler) (*http.Server, error
// the listener, otherwise it will handle graceful termination on
// SIGINT or SIGTERM and return nil.
func Serve(h http.Handler, config *ServerConfig, addr string) error {
// Wrap with tracing handler (exclude metrics and other
// debugging endpoints).
h = tracing.WrapHandler(h, guessEndpointName(addr))
// Create the HTTP server.
srv, err := config.buildHTTPServer(h)
if err != nil {
......@@ -115,8 +121,9 @@ func Serve(h http.Handler, config *ServerConfig, addr string) error {
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
// Notify systemd that we are ready to serve.
daemon.SdNotify(false, "READY=1")
// Notify systemd that we are ready to serve. This call is
// allowed to fail (in case there is no systemd).
daemon.SdNotify(false, "READY=1") // nolint
err = srv.Serve(l)
if err != http.ErrServerClosed {
......@@ -132,14 +139,16 @@ func defaultHandler(h http.Handler) http.Handler {
// Add an endpoint for HTTP health checking probes.
root.Handle("/health", http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
io.WriteString(w, "OK")
io.WriteString(w, "OK") // nolint
}))
// Add an endpoint to serve Prometheus metrics.
root.Handle("/metrics", promhttp.Handler())
// Add the net/http/pprof debug handlers.
root.Handle("/debug/pprof/", pprof.Handler(""))
// Let the default net/http handler deal with /debug/
// URLs. Packages such as net/http/pprof register their
// handlers there in ways that aren't reproducible.
root.Handle("/debug/", http.DefaultServeMux)
// Forward everything else to the main handler, adding
// Prometheus instrumentation (requests to /metrics and
......@@ -150,6 +159,18 @@ func defaultHandler(h http.Handler) http.Handler {
return root
}
func guessEndpointName(addr string) string {
_, port, err := net.SplitHostPort(addr)
if err != nil {
return addr
}
host, err := os.Hostname()
if err != nil {
return addr
}
return fmt.Sprintf("%s:%s", host, port)
}
// HTTP-related metrics.
var (
// Since we instrument the root HTTP handler, we don't really
......
......@@ -2,6 +2,8 @@ package serverutil
import (
"crypto/tls"
"fmt"
"log"
"net/http"
"regexp"
......@@ -119,6 +121,13 @@ func (c *TLSServerConfig) TLSAuthWrapper(h http.Handler) (http.Handler, error) {
h.ServeHTTP(w, r)
return
}
http.Error(w, "Unauthorized", http.StatusUnauthorized)
// Log the failed access, useful for debugging.
var tlsmsg string
if r.TLS != nil && len(r.TLS.PeerCertificates) > 0 {
tlsmsg = fmt.Sprintf(" TLS client '%s' at", r.TLS.PeerCertificates[0].Subject.CommonName)
}
log.Printf("unauthorized access to %s from %s%s", r.URL.Path, tlsmsg, r.RemoteAddr)
http.Error(w, "Forbidden", http.StatusForbidden)
}), nil
}
......@@ -77,15 +77,20 @@ func NewHighBiased(epsilon float64) *Stream {
// is guaranteed to be within (Quantile±Epsilon).
//
// See http://www.cs.rutgers.edu/~muthu/bquant.pdf for time, space, and error properties.
func NewTargeted(targets map[float64]float64) *Stream {
func NewTargeted(targetMap map[float64]float64) *Stream {
// Convert map to slice to avoid slow iterations on a map.
// ƒ is called on the hot path, so converting the map to a slice
// beforehand results in significant CPU savings.
targets := targetMapToSlice(targetMap)
ƒ := func(s *stream, r float64) float64 {
var m = math.MaxFloat64
var f float64
for quantile, epsilon := range targets {
if quantile*s.n <= r {
f = (2 * epsilon * r) / quantile
for _, t := range targets {
if t.quantile*s.n <= r {
f = (2 * t.epsilon * r) / t.quantile
} else {
f = (2 * epsilon * (s.n - r)) / (1 - quantile)
f = (2 * t.epsilon * (s.n - r)) / (1 - t.quantile)
}
if f < m {
m = f
......@@ -96,6 +101,25 @@ func NewTargeted(targets map[float64]float64) *Stream {
return newStream(ƒ)
}
type target struct {
quantile float64
epsilon float64
}
func targetMapToSlice(targetMap map[float64]float64) []target {
targets := make([]target, 0, len(targetMap))
for quantile, epsilon := range targetMap {
t := target{
quantile: quantile,
epsilon: epsilon,
}
targets = append(targets, t)
}
return targets
}
// Stream computes quantiles for a stream of float64s. It is not thread-safe by
// design. Take care when using across multiple goroutines.
type Stream struct {
......
......@@ -51,9 +51,13 @@ func (b *backOffContext) Context() context.Context {
func (b *backOffContext) NextBackOff() time.Duration {
select {
case <-b.Context().Done():
case <-b.ctx.Done():
return Stop
default:
return b.BackOff.NextBackOff()
}
next := b.BackOff.NextBackOff()
if deadline, ok := b.ctx.Deadline(); ok && deadline.Sub(time.Now()) < next {
return Stop
}
return next
}
......@@ -63,7 +63,6 @@ type ExponentialBackOff struct {
currentInterval time.Duration
startTime time.Time
random *rand.Rand
}
// Clock is an interface that returns current time for BackOff.
......@@ -89,7 +88,6 @@ func NewExponentialBackOff() *ExponentialBackOff {
MaxInterval: DefaultMaxInterval,
MaxElapsedTime: DefaultMaxElapsedTime,
Clock: SystemClock,
random: rand.New(rand.NewSource(time.Now().UnixNano())),
}
b.Reset()
return b
......@@ -118,10 +116,7 @@ func (b *ExponentialBackOff) NextBackOff() time.Duration {
return Stop
}
defer b.incrementCurrentInterval()
if b.random == nil {
b.random = rand.New(rand.NewSource(time.Now().UnixNano()))
}
return getRandomValueFromInterval(b.RandomizationFactor, b.random.Float64(), b.currentInterval)
return getRandomValueFromInterval(b.RandomizationFactor, rand.Float64(), b.currentInterval)
}
// GetElapsedTime returns the elapsed time since an ExponentialBackOff instance
......
......@@ -41,7 +41,7 @@ func RetryNotify(operation Operation, b BackOff, notify Notify) error {
return permanent.Err
}
if next = b.NextBackOff(); next == Stop {
if next = cb.NextBackOff(); next == Stop {
return err
}
......
CoreOS Project
Copyright 2018 CoreOS, Inc
This product includes software developed at CoreOS, Inc.
(http://www.coreos.com/).
// Copyright 2014 Docker, Inc.
// Copyright 2015-2018 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
......@@ -13,7 +14,11 @@
// limitations under the License.
//
// Code forked from Docker project
// Package daemon provides a Go implementation of the sd_notify protocol.
// It can be used to inform systemd of service start-up completion, watchdog
// events, and other status changes.
//
// https://www.freedesktop.org/software/systemd/man/sd_notify.html#Description
package daemon
import (
......@@ -21,6 +26,25 @@ import (
"os"
)
const (
// SdNotifyReady tells the service manager that service startup is finished
// or the service finished loading its configuration.
SdNotifyReady = "READY=1"
// SdNotifyStopping tells the service manager that the service is beginning
// its shutdown.
SdNotifyStopping = "STOPPING=1"
// SdNotifyReloading tells the service manager that this service is
// reloading its configuration. Note that you must call SdNotifyReady when
// it completed reloading.
SdNotifyReloading = "RELOADING=1"
// SdNotifyWatchdog tells the service manager to update the watchdog
// timestamp for the service.
SdNotifyWatchdog = "WATCHDOG=1"
)
// SdNotify sends a message to the init daemon. It is common to ignore the error.
// If `unsetEnvironment` is true, the environment variable `NOTIFY_SOCKET`
// will be unconditionally unset.
......@@ -29,7 +53,7 @@ import (
// (false, nil) - notification not supported (i.e. NOTIFY_SOCKET is unset)
// (false, err) - notification supported, but failure happened (e.g. error connecting to NOTIFY_SOCKET or while sending data)
// (true, nil) - notification supported, data has been sent
func SdNotify(unsetEnvironment bool, state string) (sent bool, err error) {
func SdNotify(unsetEnvironment bool, state string) (bool, error) {
socketAddr := &net.UnixAddr{
Name: os.Getenv("NOTIFY_SOCKET"),
Net: "unixgram",
......@@ -41,10 +65,9 @@ func SdNotify(unsetEnvironment bool, state string) (sent bool, err error) {
}
if unsetEnvironment {
err = os.Unsetenv("NOTIFY_SOCKET")
}
if err != nil {
return false, err
if err := os.Unsetenv("NOTIFY_SOCKET"); err != nil {
return false, err
}
}
conn, err := net.DialUnix(socketAddr.Net, nil, socketAddr)
......@@ -54,9 +77,7 @@ func SdNotify(unsetEnvironment bool, state string) (sent bool, err error) {
}
defer conn.Close()
_, err = conn.Write([]byte(state))
// Error sending the message
if err != nil {
if _, err = conn.Write([]byte(state)); err != nil {
return false, err
}
return true, nil
......
......@@ -21,10 +21,11 @@ import (
"time"
)
// SdWatchdogEnabled return watchdog information for a service.
// Process should send daemon.SdNotify("WATCHDOG=1") every time / 2.
// If `unsetEnvironment` is true, the environment variables `WATCHDOG_USEC`
// and `WATCHDOG_PID` will be unconditionally unset.
// SdWatchdogEnabled returns watchdog information for a service.
// Processes should call daemon.SdNotify(false, daemon.SdNotifyWatchdog) every
// time / 2.
// If `unsetEnvironment` is true, the environment variables `WATCHDOG_USEC` and
// `WATCHDOG_PID` will be unconditionally unset.
//
// It returns one of the following:
// (0, nil) - watchdog isn't enabled or we aren't the watched PID.
......
Go support for Protocol Buffers - Google's data interchange format
Copyright 2010 The Go Authors. All rights reserved.
https://github.com/golang/protobuf
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
......
# Go support for Protocol Buffers - Google's data interchange format
#
# Copyright 2010 The Go Authors. All rights reserved.
# https://github.com/golang/protobuf
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
install:
go install
test: install generate-test-pbs
go test
generate-test-pbs:
make install
make -C testdata
protoc --go_out=Mtestdata/test.proto=github.com/golang/protobuf/proto/testdata,Mgoogle/protobuf/any.proto=github.com/golang/protobuf/ptypes/any:. proto3_proto/proto3.proto
make
......@@ -35,22 +35,39 @@
package proto
import (
"fmt"
"log"
"reflect"
"strings"
)
// Clone returns a deep copy of a protocol buffer.
func Clone(pb Message) Message {
in := reflect.ValueOf(pb)
func Clone(src Message) Message {
in := reflect.ValueOf(src)
if in.IsNil() {
return pb
return src
}
out := reflect.New(in.Type().Elem())
// out is empty so a merge is a deep copy.
mergeStruct(out.Elem(), in.Elem())
return out.Interface().(Message)
dst := out.Interface().(Message)
Merge(dst, src)
return dst
}
// Merger is the interface representing objects that can merge messages of the same type.
type Merger interface {
// Merge merges src into this message.
// Required and optional fields that are set in src will be set to that value in dst.
// Elements of repeated fields will be appended.
//
// Merge may panic if called with a different argument type than the receiver.
Merge(src Message)