diff --git a/third_party/github.com/coreos/go-etcd/etcd/add_child.go b/third_party/github.com/coreos/go-etcd/etcd/add_child.go index 53af6d285dbea4447d97d01d484e29ea4806913e..7122be049e253d927e190657ac50bf0998cc660c 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/add_child.go +++ b/third_party/github.com/coreos/go-etcd/etcd/add_child.go @@ -8,7 +8,7 @@ func (c *Client) AddChildDir(key string, ttl uint64) (*Response, error) { return nil, err } - return raw.toResponse() + return raw.Unmarshal() } // Add a new file with a random etcd-generated key under the given path. @@ -19,5 +19,5 @@ func (c *Client) AddChild(key string, value string, ttl uint64) (*Response, erro return nil, err } - return raw.toResponse() + return raw.Unmarshal() } diff --git a/third_party/github.com/coreos/go-etcd/etcd/client.go b/third_party/github.com/coreos/go-etcd/etcd/client.go index 48ba9b42377ec81771a5575ac5769c8f05883048..f6ae54861735800424011e1722a7380e4c3818e8 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/client.go +++ b/third_party/github.com/coreos/go-etcd/etcd/client.go @@ -32,8 +32,8 @@ type Config struct { CertFile string `json:"certFile"` KeyFile string `json:"keyFile"` CaCertFile []string `json:"caCertFiles"` - Timeout time.Duration `json:"timeout"` - Consistency string `json: "consistency"` + DialTimeout time.Duration `json:"timeout"` + Consistency string `json:"consistency"` } type Client struct { @@ -42,7 +42,20 @@ type Client struct { httpClient *http.Client persistence io.Writer cURLch chan string - keyPrefix string + // CheckRetry can be used to control the policy for failed requests + // and modify the cluster if needed. + // The client calls it before sending requests again, and + // stops retrying if CheckRetry returns some error. The cases that + // this function needs to handle include no response and unexpected + // http status code of response. + // If CheckRetry is nil, client will call the default one + // `DefaultCheckRetry`. + // Argument cluster is the etcd.Cluster object that these requests have been made on. + // Argument numReqs is the number of http.Requests that have been made so far. + // Argument lastResp is the http.Responses from the last request. + // Argument err is the reason of the failure. + CheckRetry func(cluster *Cluster, numReqs int, + lastResp http.Response, err error) error } // NewClient create a basic client that is configured to be used @@ -50,15 +63,14 @@ type Client struct { func NewClient(machines []string) *Client { config := Config{ // default timeout is one second - Timeout: time.Second, + DialTimeout: time.Second, // default consistency level is STRONG Consistency: STRONG_CONSISTENCY, } client := &Client{ - cluster: NewCluster(machines), - config: config, - keyPrefix: path.Join(version, "keys"), + cluster: NewCluster(machines), + config: config, } client.initHTTPClient() @@ -76,7 +88,7 @@ func NewTLSClient(machines []string, cert, key, caCert string) (*Client, error) config := Config{ // default timeout is one second - Timeout: time.Second, + DialTimeout: time.Second, // default consistency level is STRONG Consistency: STRONG_CONSISTENCY, CertFile: cert, @@ -85,9 +97,8 @@ func NewTLSClient(machines []string, cert, key, caCert string) (*Client, error) } client := &Client{ - cluster: NewCluster(machines), - config: config, - keyPrefix: path.Join(version, "keys"), + cluster: NewCluster(machines), + config: config, } err := client.initHTTPSClient(cert, key) @@ -157,16 +168,10 @@ func (c *Client) SetTransport(tr *http.Transport) { c.httpClient.Transport = tr } -// SetKeyPrefix changes the key prefix from the default `/v2/keys` to whatever -// is set. -func (c *Client) SetKeyPrefix(prefix string) { - c.keyPrefix = prefix -} - // initHTTPClient initializes a HTTP client for etcd client func (c *Client) initHTTPClient() { tr := &http.Transport{ - Dial: dialTimeout, + Dial: c.dial, TLSClientConfig: &tls.Config{ InsecureSkipVerify: true, }, @@ -192,7 +197,7 @@ func (c *Client) initHTTPSClient(cert, key string) error { tr := &http.Transport{ TLSClientConfig: tlsConfig, - Dial: dialTimeout, + Dial: c.dial, } c.httpClient = &http.Client{Transport: tr} @@ -226,6 +231,11 @@ func (c *Client) SetConsistency(consistency string) error { return nil } +// Sets the DialTimeout value +func (c *Client) SetDialTimeout(d time.Duration) { + c.config.DialTimeout = d +} + // AddRootCA adds a root CA cert for the etcd client func (c *Client) AddRootCA(caCert string) error { if c.httpClient == nil { @@ -326,9 +336,29 @@ func (c *Client) createHttpPath(serverName string, _path string) string { return u.String() } -// Dial with timeout. -func dialTimeout(network, addr string) (net.Conn, error) { - return net.DialTimeout(network, addr, time.Second) +// dial attempts to open a TCP connection to the provided address, explicitly +// enabling keep-alives with a one-second interval. +func (c *Client) dial(network, addr string) (net.Conn, error) { + conn, err := net.DialTimeout(network, addr, c.config.DialTimeout) + if err != nil { + return nil, err + } + + tcpConn, ok := conn.(*net.TCPConn) + if !ok { + return nil, errors.New("Failed type-assertion of net.Conn as *net.TCPConn") + } + + // Keep TCP alive to check whether or not the remote machine is down + if err = tcpConn.SetKeepAlive(true); err != nil { + return nil, err + } + + if err = tcpConn.SetKeepAlivePeriod(time.Second); err != nil { + return nil, err + } + + return tcpConn, nil } func (c *Client) OpenCURL() { @@ -391,8 +421,8 @@ func (c *Client) MarshalJSON() ([]byte, error) { // as defined by the standard JSON package. func (c *Client) UnmarshalJSON(b []byte) error { temp := struct { - Config Config `json: "config"` - Cluster *Cluster `json: "cluster"` + Config Config `json:"config"` + Cluster *Cluster `json:"cluster"` }{} err := json.Unmarshal(b, &temp) if err != nil { diff --git a/third_party/github.com/coreos/go-etcd/etcd/compare_and_delete.go b/third_party/github.com/coreos/go-etcd/etcd/compare_and_delete.go index 924778ddb1e59d1f704c6a97ffe45ec716ee55c1..11131bb76025d78e09cca1c3362f6d4cc6c54c81 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/compare_and_delete.go +++ b/third_party/github.com/coreos/go-etcd/etcd/compare_and_delete.go @@ -8,7 +8,7 @@ func (c *Client) CompareAndDelete(key string, prevValue string, prevIndex uint64 return nil, err } - return raw.toResponse() + return raw.Unmarshal() } func (c *Client) RawCompareAndDelete(key string, prevValue string, prevIndex uint64) (*RawResponse, error) { @@ -16,7 +16,7 @@ func (c *Client) RawCompareAndDelete(key string, prevValue string, prevIndex uin return nil, fmt.Errorf("You must give either prevValue or prevIndex.") } - options := options{} + options := Options{} if prevValue != "" { options["prevValue"] = prevValue } diff --git a/third_party/github.com/coreos/go-etcd/etcd/compare_and_swap.go b/third_party/github.com/coreos/go-etcd/etcd/compare_and_swap.go index 0beaee57f13e293025c15dbdda2af4e2fff6b081..bb4f90643aceaa7a7dd15488d0a22bf1e1a6c13a 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/compare_and_swap.go +++ b/third_party/github.com/coreos/go-etcd/etcd/compare_and_swap.go @@ -9,7 +9,7 @@ func (c *Client) CompareAndSwap(key string, value string, ttl uint64, return nil, err } - return raw.toResponse() + return raw.Unmarshal() } func (c *Client) RawCompareAndSwap(key string, value string, ttl uint64, @@ -18,7 +18,7 @@ func (c *Client) RawCompareAndSwap(key string, value string, ttl uint64, return nil, fmt.Errorf("You must give either prevValue or prevIndex.") } - options := options{} + options := Options{} if prevValue != "" { options["prevValue"] = prevValue } diff --git a/third_party/github.com/coreos/go-etcd/etcd/debug.go b/third_party/github.com/coreos/go-etcd/etcd/debug.go index fce23f07f68caf4e2366c04a03cb163374451b70..0f777886bae20fbb10b2394a2e221283a0fe3329 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/debug.go +++ b/third_party/github.com/coreos/go-etcd/etcd/debug.go @@ -1,53 +1,55 @@ package etcd import ( + "fmt" "io/ioutil" "log" "strings" ) -type Logger interface { - Debug(args ...interface{}) - Debugf(fmt string, args ...interface{}) - Warning(args ...interface{}) - Warningf(fmt string, args ...interface{}) -} - -var logger Logger +var logger *etcdLogger -func SetLogger(log Logger) { - logger = log +func SetLogger(l *log.Logger) { + logger = &etcdLogger{l} } -func GetLogger() Logger { - return logger +func GetLogger() *log.Logger { + return logger.log } -type defaultLogger struct { +type etcdLogger struct { log *log.Logger } -func (p *defaultLogger) Debug(args ...interface{}) { - p.log.Println(args) +func (p *etcdLogger) Debug(args ...interface{}) { + msg := "DEBUG: " + fmt.Sprint(args...) + p.log.Println(msg) } -func (p *defaultLogger) Debugf(fmt string, args ...interface{}) { +func (p *etcdLogger) Debugf(f string, args ...interface{}) { + msg := "DEBUG: " + fmt.Sprintf(f, args...) // Append newline if necessary - if !strings.HasSuffix(fmt, "\n") { - fmt = fmt + "\n" + if !strings.HasSuffix(msg, "\n") { + msg = msg + "\n" } - p.log.Printf(fmt, args) + p.log.Print(msg) } -func (p *defaultLogger) Warning(args ...interface{}) { - p.Debug(args) +func (p *etcdLogger) Warning(args ...interface{}) { + msg := "WARNING: " + fmt.Sprint(args...) + p.log.Println(msg) } -func (p *defaultLogger) Warningf(fmt string, args ...interface{}) { - p.Debugf(fmt, args) +func (p *etcdLogger) Warningf(f string, args ...interface{}) { + msg := "WARNING: " + fmt.Sprintf(f, args...) + // Append newline if necessary + if !strings.HasSuffix(msg, "\n") { + msg = msg + "\n" + } + p.log.Print(msg) } func init() { // Default logger uses the go default log. - SetLogger(&defaultLogger{log.New(ioutil.Discard, "go-etcd", log.LstdFlags)}) + SetLogger(log.New(ioutil.Discard, "go-etcd", log.LstdFlags)) } diff --git a/third_party/github.com/coreos/go-etcd/etcd/debug_test.go b/third_party/github.com/coreos/go-etcd/etcd/debug_test.go new file mode 100644 index 0000000000000000000000000000000000000000..97f6d1110bcbb8d4f30f4f958fe5233af58d8390 --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/debug_test.go @@ -0,0 +1,28 @@ +package etcd + +import ( + "testing" +) + +type Foo struct{} +type Bar struct { + one string + two int +} + +// Tests that logs don't panic with arbitrary interfaces +func TestDebug(t *testing.T) { + f := &Foo{} + b := &Bar{"asfd", 3} + for _, test := range []interface{}{ + 1234, + "asdf", + f, + b, + } { + logger.Debug(test) + logger.Debugf("something, %s", test) + logger.Warning(test) + logger.Warningf("something, %s", test) + } +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/delete.go b/third_party/github.com/coreos/go-etcd/etcd/delete.go index 6c60e4df39ff4b20a0e00938dcebf9a2212739f9..b37accd7db391e604adcaf75ba8bf0efd80089fd 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/delete.go +++ b/third_party/github.com/coreos/go-etcd/etcd/delete.go @@ -16,7 +16,7 @@ func (c *Client) Delete(key string, recursive bool) (*Response, error) { return nil, err } - return raw.toResponse() + return raw.Unmarshal() } // DeleteDir deletes an empty directory or a key value pair @@ -27,11 +27,11 @@ func (c *Client) DeleteDir(key string) (*Response, error) { return nil, err } - return raw.toResponse() + return raw.Unmarshal() } func (c *Client) RawDelete(key string, recursive bool, dir bool) (*RawResponse, error) { - ops := options{ + ops := Options{ "recursive": recursive, "dir": dir, } diff --git a/third_party/github.com/coreos/go-etcd/etcd/get.go b/third_party/github.com/coreos/go-etcd/etcd/get.go index 7988f1a80c658a947c0e52f5c90c73be54978966..976bf07fd746f7fca39f2dbf9deb03eb285e8a6e 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/get.go +++ b/third_party/github.com/coreos/go-etcd/etcd/get.go @@ -14,11 +14,11 @@ func (c *Client) Get(key string, sort, recursive bool) (*Response, error) { return nil, err } - return raw.toResponse() + return raw.Unmarshal() } func (c *Client) RawGet(key string, sort, recursive bool) (*RawResponse, error) { - ops := options{ + ops := Options{ "recursive": recursive, "sorted": sort, } diff --git a/third_party/github.com/coreos/go-etcd/etcd/get_test.go b/third_party/github.com/coreos/go-etcd/etcd/get_test.go index eccae1894e4d9abc549e5f83144124b9018fea5d..279c4e26f8b06a249cbdf9bce1a94404fd92626d 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/get_test.go +++ b/third_party/github.com/coreos/go-etcd/etcd/get_test.go @@ -18,9 +18,9 @@ func cleanResult(result *Response) { // TODO(philips): make this recursive. cleanNode(result.Node) for i, _ := range result.Node.Nodes { - cleanNode(&result.Node.Nodes[i]) + cleanNode(result.Node.Nodes[i]) for j, _ := range result.Node.Nodes[i].Nodes { - cleanNode(&result.Node.Nodes[i].Nodes[j]) + cleanNode(result.Node.Nodes[i].Nodes[j]) } } } @@ -67,12 +67,12 @@ func TestGetAll(t *testing.T) { } expected := Nodes{ - Node{ + &Node{ Key: "/fooDir/k0", Value: "v0", TTL: 5, }, - Node{ + &Node{ Key: "/fooDir/k1", Value: "v1", TTL: 5, @@ -99,11 +99,11 @@ func TestGetAll(t *testing.T) { } expected = Nodes{ - Node{ + &Node{ Key: "/fooDir/childDir", Dir: true, Nodes: Nodes{ - Node{ + &Node{ Key: "/fooDir/childDir/k2", Value: "v2", TTL: 5, @@ -111,12 +111,12 @@ func TestGetAll(t *testing.T) { }, TTL: 5, }, - Node{ + &Node{ Key: "/fooDir/k0", Value: "v0", TTL: 5, }, - Node{ + &Node{ Key: "/fooDir/k1", Value: "v1", TTL: 5, diff --git a/third_party/github.com/coreos/go-etcd/etcd/options.go b/third_party/github.com/coreos/go-etcd/etcd/options.go index 335a0c218a9e7e2d01793c70f37716cf2c8758dc..701c9b35b971eda1b26fe760c549bcaad0d58b1e 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/options.go +++ b/third_party/github.com/coreos/go-etcd/etcd/options.go @@ -6,7 +6,7 @@ import ( "reflect" ) -type options map[string]interface{} +type Options map[string]interface{} // An internally-used data structure that represents a mapping // between valid options and their kinds @@ -42,7 +42,7 @@ var ( ) // Convert options to a string of HTML parameters -func (ops options) toParameters(validOps validOptions) (string, error) { +func (ops Options) toParameters(validOps validOptions) (string, error) { p := "?" values := url.Values{} diff --git a/third_party/github.com/coreos/go-etcd/etcd/requests.go b/third_party/github.com/coreos/go-etcd/etcd/requests.go index c16f7d46451fef47d4d944359c3605d1f658e071..5d8b45a2d39386b576987620effbc9abc0016bd5 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/requests.go +++ b/third_party/github.com/coreos/go-etcd/etcd/requests.go @@ -1,6 +1,7 @@ package etcd import ( + "errors" "fmt" "io/ioutil" "math/rand" @@ -8,11 +9,38 @@ import ( "net/url" "path" "strings" + "sync" "time" ) -// get issues a GET request -func (c *Client) get(key string, options options) (*RawResponse, error) { +// Errors introduced by handling requests +var ( + ErrRequestCancelled = errors.New("sending request is cancelled") +) + +type RawRequest struct { + Method string + RelativePath string + Values url.Values + Cancel <-chan bool +} + +// NewRawRequest returns a new RawRequest +func NewRawRequest(method, relativePath string, values url.Values, cancel <-chan bool) *RawRequest { + return &RawRequest{ + Method: method, + RelativePath: relativePath, + Values: values, + Cancel: cancel, + } +} + +// getCancelable issues a cancelable GET request +func (c *Client) getCancelable(key string, options Options, + cancel <-chan bool) (*RawResponse, error) { + logger.Debugf("get %s [%s]", key, c.cluster.Leader) + p := keyToPath(key) + // If consistency level is set to STRONG, append // the `consistent` query string. if c.config.Consistency == STRONG_CONSISTENCY { @@ -23,8 +51,10 @@ func (c *Client) get(key string, options options) (*RawResponse, error) { if err != nil { return nil, err } + p += str - resp, err := c.sendKeyRequest("GET", key, str, nil) + req := NewRawRequest("GET", p, nil, cancel) + resp, err := c.SendRequest(req) if err != nil { return nil, err @@ -33,16 +63,26 @@ func (c *Client) get(key string, options options) (*RawResponse, error) { return resp, nil } +// get issues a GET request +func (c *Client) get(key string, options Options) (*RawResponse, error) { + return c.getCancelable(key, options, nil) +} + // put issues a PUT request func (c *Client) put(key string, value string, ttl uint64, - options options) (*RawResponse, error) { + options Options) (*RawResponse, error) { + + logger.Debugf("put %s, %s, ttl: %d, [%s]", key, value, ttl, c.cluster.Leader) + p := keyToPath(key) str, err := options.toParameters(VALID_PUT_OPTIONS) if err != nil { return nil, err } + p += str - resp, err := c.sendKeyRequest("PUT", key, str, buildValues(value, ttl)) + req := NewRawRequest("PUT", p, buildValues(value, ttl), nil) + resp, err := c.SendRequest(req) if err != nil { return nil, err @@ -53,7 +93,11 @@ func (c *Client) put(key string, value string, ttl uint64, // post issues a POST request func (c *Client) post(key string, value string, ttl uint64) (*RawResponse, error) { - resp, err := c.sendKeyRequest("POST", key, "", buildValues(value, ttl)) + logger.Debugf("post %s, %s, ttl: %d, [%s]", key, value, ttl, c.cluster.Leader) + p := keyToPath(key) + + req := NewRawRequest("POST", p, buildValues(value, ttl), nil) + resp, err := c.SendRequest(req) if err != nil { return nil, err @@ -63,13 +107,18 @@ func (c *Client) post(key string, value string, ttl uint64) (*RawResponse, error } // delete issues a DELETE request -func (c *Client) delete(key string, options options) (*RawResponse, error) { +func (c *Client) delete(key string, options Options) (*RawResponse, error) { + logger.Debugf("delete %s [%s]", key, c.cluster.Leader) + p := keyToPath(key) + str, err := options.toParameters(VALID_DELETE_OPTIONS) if err != nil { return nil, err } + p += str - resp, err := c.sendKeyRequest("DELETE", key, str, nil) + req := NewRawRequest("DELETE", p, nil, nil) + resp, err := c.SendRequest(req) if err != nil { return nil, err @@ -78,146 +127,222 @@ func (c *Client) delete(key string, options options) (*RawResponse, error) { return resp, nil } -// sendKeyRequest sends a HTTP request and returns a Response as defined by etcd -func (c *Client) sendKeyRequest(method string, key string, params string, - values url.Values) (*RawResponse, error) { +// SendRequest sends a HTTP request and returns a Response as defined by etcd +func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) { var req *http.Request var resp *http.Response var httpPath string var err error - var b []byte + var respBody []byte + + var numReqs = 1 + + checkRetry := c.CheckRetry + if checkRetry == nil { + checkRetry = DefaultCheckRetry + } - trial := 0 + cancelled := make(chan bool, 1) + reqLock := new(sync.Mutex) - logger.Debugf("%s %s %s [%s]", method, key, params, c.cluster.Leader) + if rr.Cancel != nil { + cancelRoutine := make(chan bool) + defer close(cancelRoutine) - // Build the request path if no prefix exists - relativePath := path.Join(c.keyPrefix, key) + params + go func() { + select { + case <-rr.Cancel: + cancelled <- true + logger.Debug("send.request is cancelled") + case <-cancelRoutine: + return + } + + // Repeat canceling request until this thread is stopped + // because we have no idea about whether it succeeds. + for { + reqLock.Lock() + c.httpClient.Transport.(*http.Transport).CancelRequest(req) + reqLock.Unlock() + + select { + case <-time.After(100 * time.Millisecond): + case <-cancelRoutine: + return + } + } + }() + } - // if we connect to a follower, we will retry until we found a leader - for { - trial++ - logger.Debug("begin trail ", trial) - if trial > 2*len(c.cluster.Machines) { - return nil, newError(ErrCodeEtcdNotReachable, - "Tried to connect to each peer twice and failed", 0) + // If we connect to a follower and consistency is required, retry until + // we connect to a leader + sleep := 25 * time.Millisecond + maxSleep := time.Second + for attempt := 0; ; attempt++ { + if attempt > 0 { + select { + case <-cancelled: + return nil, ErrRequestCancelled + case <-time.After(sleep): + sleep = sleep * 2 + if sleep > maxSleep { + sleep = maxSleep + } + } } - if method == "GET" && c.config.Consistency == WEAK_CONSISTENCY { + logger.Debug("Connecting to etcd: attempt", attempt+1, "for", rr.RelativePath) + + if rr.Method == "GET" && c.config.Consistency == WEAK_CONSISTENCY { // If it's a GET and consistency level is set to WEAK, // then use a random machine. - httpPath = c.getHttpPath(true, relativePath) + httpPath = c.getHttpPath(true, rr.RelativePath) } else { // Else use the leader. - httpPath = c.getHttpPath(false, relativePath) + httpPath = c.getHttpPath(false, rr.RelativePath) } // Return a cURL command if curlChan is set if c.cURLch != nil { - command := fmt.Sprintf("curl -X %s %s", method, httpPath) - for key, value := range values { + command := fmt.Sprintf("curl -X %s %s", rr.Method, httpPath) + for key, value := range rr.Values { command += fmt.Sprintf(" -d %s=%s", key, value[0]) } c.sendCURL(command) } - logger.Debug("send.request.to ", httpPath, " | method ", method) + logger.Debug("send.request.to ", httpPath, " | method ", rr.Method) - if values == nil { - req, _ = http.NewRequest(method, httpPath, nil) + reqLock.Lock() + if rr.Values == nil { + if req, err = http.NewRequest(rr.Method, httpPath, nil); err != nil { + return nil, err + } } else { - req, _ = http.NewRequest(method, httpPath, - strings.NewReader(values.Encode())) + body := strings.NewReader(rr.Values.Encode()) + if req, err = http.NewRequest(rr.Method, httpPath, body); err != nil { + return nil, err + } req.Header.Set("Content-Type", "application/x-www-form-urlencoded; param=value") } + reqLock.Unlock() + + resp, err = c.httpClient.Do(req) + defer func() { + if resp != nil { + resp.Body.Close() + } + }() + + // If the request was cancelled, return ErrRequestCancelled directly + select { + case <-cancelled: + return nil, ErrRequestCancelled + default: + } + + numReqs++ // network error, change a machine! - if resp, err = c.httpClient.Do(req); err != nil { - logger.Debug("network error: ", err.Error()) - c.cluster.switchLeader(trial % len(c.cluster.Machines)) - time.Sleep(time.Millisecond * 200) + if err != nil { + logger.Debug("network error:", err.Error()) + lastResp := http.Response{} + if checkErr := checkRetry(c.cluster, numReqs, lastResp, err); checkErr != nil { + return nil, checkErr + } + + c.cluster.switchLeader(attempt % len(c.cluster.Machines)) continue } - if resp != nil { - logger.Debug("recv.response.from ", httpPath) - - var ok bool - ok, b = c.handleResp(resp) + // if there is no error, it should receive response + logger.Debug("recv.response.from", httpPath) - if !ok { - continue + if validHttpStatusCode[resp.StatusCode] { + // try to read byte code and break the loop + respBody, err = ioutil.ReadAll(resp.Body) + if err == nil { + logger.Debug("recv.success.", httpPath) + break } + // ReadAll error may be caused due to cancel request + select { + case <-cancelled: + return nil, ErrRequestCancelled + default: + } + } - logger.Debug("recv.success.", httpPath) - break + // if resp is TemporaryRedirect, set the new leader and retry + if resp.StatusCode == http.StatusTemporaryRedirect { + u, err := resp.Location() + + if err != nil { + logger.Warning(err) + } else { + // Update cluster leader based on redirect location + // because it should point to the leader address + c.cluster.updateLeaderFromURL(u) + logger.Debug("recv.response.relocate", u.String()) + } + resp.Body.Close() + continue } - // should not reach here - // err and resp should not be nil at the same time - logger.Debug("error.from ", httpPath) - return nil, err + if checkErr := checkRetry(c.cluster, numReqs, *resp, + errors.New("Unexpected HTTP status code")); checkErr != nil { + return nil, checkErr + } + resp.Body.Close() } r := &RawResponse{ StatusCode: resp.StatusCode, - Body: b, + Body: respBody, Header: resp.Header, } return r, nil } -// handleResp handles the responses from the etcd server -// If status code is OK, read the http body and return it as byte array -// If status code is TemporaryRedirect, update leader. +// DefaultCheckRetry defines the retrying behaviour for bad HTTP requests +// If we have retried 2 * machine number, stop retrying. // If status code is InternalServerError, sleep for 200ms. -func (c *Client) handleResp(resp *http.Response) (bool, []byte) { - defer resp.Body.Close() - - code := resp.StatusCode +func DefaultCheckRetry(cluster *Cluster, numReqs int, lastResp http.Response, + err error) error { - if code == http.StatusTemporaryRedirect { - u, err := resp.Location() - - if err != nil { - logger.Warning(err) - } else { - c.cluster.updateLeaderFromURL(u) - } - - return false, nil + if numReqs >= 2*len(cluster.Machines) { + return newError(ErrCodeEtcdNotReachable, + "Tried to connect to each peer twice and failed", 0) + } - } else if code == http.StatusInternalServerError { + code := lastResp.StatusCode + if code == http.StatusInternalServerError { time.Sleep(time.Millisecond * 200) - } else if validHttpStatusCode[code] { - b, err := ioutil.ReadAll(resp.Body) - - if err != nil { - return false, nil - } - - return true, b } - logger.Warning("bad status code ", resp.StatusCode) - return false, nil + logger.Warning("bad response status code", code) + return nil } func (c *Client) getHttpPath(random bool, s ...string) string { var machine string - if random { machine = c.cluster.Machines[rand.Intn(len(c.cluster.Machines))] } else { machine = c.cluster.Leader } - return machine + "/" + strings.Join(s, "/") + fullPath := machine + "/" + version + for _, seg := range s { + fullPath = fullPath + "/" + seg + } + + return fullPath } // buildValues builds a url.Values map according to the given value and ttl @@ -236,14 +361,17 @@ func buildValues(value string, ttl uint64) url.Values { } // convert key string to http path exclude version -// for example: key[foo] -> path[foo] -// key[] -> path[/] +// for example: key[foo] -> path[keys/foo] +// key[/] -> path[keys/] func keyToPath(key string) string { - clean := path.Clean(key) + p := path.Join("keys", key) - if clean == "" || clean == "." { - return "/" + // corner case: if key is "/" or "//" ect + // path join will clear the tailing "/" + // we need to add it back + if p == "keys" { + p = "keys/" } - return clean + return p } diff --git a/third_party/github.com/coreos/go-etcd/etcd/response.go b/third_party/github.com/coreos/go-etcd/etcd/response.go index a72c2d8d728b8b57b5d30c58ee00da57e2ae6ef3..1fe9b4e87113a905ff8e1df487aec4bb7512d960 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/response.go +++ b/third_party/github.com/coreos/go-etcd/etcd/response.go @@ -31,7 +31,8 @@ var ( } ) -func (rr *RawResponse) toResponse() (*Response, error) { +// Unmarshal parses RawResponse and stores the result in Response +func (rr *RawResponse) Unmarshal() (*Response, error) { if rr.StatusCode != http.StatusOK && rr.StatusCode != http.StatusCreated { return nil, handleError(rr.Body) } @@ -72,7 +73,7 @@ type Node struct { CreatedIndex uint64 `json:"createdIndex,omitempty"` } -type Nodes []Node +type Nodes []*Node // interfaces for sorting func (ns Nodes) Len() int { diff --git a/third_party/github.com/coreos/go-etcd/etcd/set_update_create.go b/third_party/github.com/coreos/go-etcd/etcd/set_update_create.go index ab420d8fee17a491499ed6af5042be941fb3c443..cb0d5674775c615c00c76623e7e1a993fc9cf6aa 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/set_update_create.go +++ b/third_party/github.com/coreos/go-etcd/etcd/set_update_create.go @@ -10,7 +10,7 @@ func (c *Client) Set(key string, value string, ttl uint64) (*Response, error) { return nil, err } - return raw.toResponse() + return raw.Unmarshal() } // Set sets the given key to a directory. @@ -23,7 +23,7 @@ func (c *Client) SetDir(key string, ttl uint64) (*Response, error) { return nil, err } - return raw.toResponse() + return raw.Unmarshal() } // CreateDir creates a directory. It succeeds only if @@ -35,7 +35,7 @@ func (c *Client) CreateDir(key string, ttl uint64) (*Response, error) { return nil, err } - return raw.toResponse() + return raw.Unmarshal() } // UpdateDir updates the given directory. It succeeds only if the @@ -47,7 +47,7 @@ func (c *Client) UpdateDir(key string, ttl uint64) (*Response, error) { return nil, err } - return raw.toResponse() + return raw.Unmarshal() } // Create creates a file with the given value under the given key. It succeeds @@ -59,7 +59,19 @@ func (c *Client) Create(key string, value string, ttl uint64) (*Response, error) return nil, err } - return raw.toResponse() + return raw.Unmarshal() +} + +// CreateInOrder creates a file with a key that's guaranteed to be higher than other +// keys in the given directory. It is useful for creating queues. +func (c *Client) CreateInOrder(dir string, value string, ttl uint64) (*Response, error) { + raw, err := c.RawCreateInOrder(dir, value, ttl) + + if err != nil { + return nil, err + } + + return raw.Unmarshal() } // Update updates the given key to the given value. It succeeds only if the @@ -71,11 +83,11 @@ func (c *Client) Update(key string, value string, ttl uint64) (*Response, error) return nil, err } - return raw.toResponse() + return raw.Unmarshal() } func (c *Client) RawUpdateDir(key string, ttl uint64) (*RawResponse, error) { - ops := options{ + ops := Options{ "prevExist": true, "dir": true, } @@ -84,7 +96,7 @@ func (c *Client) RawUpdateDir(key string, ttl uint64) (*RawResponse, error) { } func (c *Client) RawCreateDir(key string, ttl uint64) (*RawResponse, error) { - ops := options{ + ops := Options{ "prevExist": false, "dir": true, } @@ -97,7 +109,7 @@ func (c *Client) RawSet(key string, value string, ttl uint64) (*RawResponse, err } func (c *Client) RawSetDir(key string, ttl uint64) (*RawResponse, error) { - ops := options{ + ops := Options{ "dir": true, } @@ -105,7 +117,7 @@ func (c *Client) RawSetDir(key string, ttl uint64) (*RawResponse, error) { } func (c *Client) RawUpdate(key string, value string, ttl uint64) (*RawResponse, error) { - ops := options{ + ops := Options{ "prevExist": true, } @@ -113,9 +125,13 @@ func (c *Client) RawUpdate(key string, value string, ttl uint64) (*RawResponse, } func (c *Client) RawCreate(key string, value string, ttl uint64) (*RawResponse, error) { - ops := options{ + ops := Options{ "prevExist": false, } return c.put(key, value, ttl, ops) } + +func (c *Client) RawCreateInOrder(dir string, value string, ttl uint64) (*RawResponse, error) { + return c.post(dir, value, ttl) +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/set_update_create_test.go b/third_party/github.com/coreos/go-etcd/etcd/set_update_create_test.go index 2003c59bb4d178e4ea50baf3cfa1558570bbf803..ced0f06e7be58ca716a96a6dab12529f5ece274b 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/set_update_create_test.go +++ b/third_party/github.com/coreos/go-etcd/etcd/set_update_create_test.go @@ -98,6 +98,43 @@ func TestCreate(t *testing.T) { } } +func TestCreateInOrder(t *testing.T) { + c := NewClient(nil) + dir := "/queue" + defer func() { + c.DeleteDir(dir) + }() + + var firstKey, secondKey string + + resp, err := c.CreateInOrder(dir, "1", 5) + if err != nil { + t.Fatal(err) + } + + if !(resp.Action == "create" && resp.Node.Value == "1" && resp.Node.TTL == 5) { + t.Fatalf("Create 1 failed: %#v", resp) + } + + firstKey = resp.Node.Key + + resp, err = c.CreateInOrder(dir, "2", 5) + if err != nil { + t.Fatal(err) + } + + if !(resp.Action == "create" && resp.Node.Value == "2" && resp.Node.TTL == 5) { + t.Fatalf("Create 2 failed: %#v", resp) + } + + secondKey = resp.Node.Key + + if firstKey >= secondKey { + t.Fatalf("Expected first key to be greater than second key, but %s is not greater than %s", + firstKey, secondKey) + } +} + func TestSetDir(t *testing.T) { c := NewClient(nil) defer func() { diff --git a/third_party/github.com/coreos/go-etcd/etcd/watch.go b/third_party/github.com/coreos/go-etcd/etcd/watch.go index 1e1ac74a5c9da5ebc82d38d7b42c6465fdb30a4b..aa8d3df301c811755b3656c34c6cdedbe4fb3832 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/watch.go +++ b/third_party/github.com/coreos/go-etcd/etcd/watch.go @@ -31,8 +31,9 @@ func (c *Client) Watch(prefix string, waitIndex uint64, recursive bool, return nil, err } - return raw.toResponse() + return raw.Unmarshal() } + defer close(receiver) for { raw, err := c.watchOnce(prefix, waitIndex, recursive, stop) @@ -41,7 +42,7 @@ func (c *Client) Watch(prefix string, waitIndex uint64, recursive bool, return nil, err } - resp, err := raw.toResponse() + resp, err := raw.Unmarshal() if err != nil { return nil, err @@ -50,8 +51,6 @@ func (c *Client) Watch(prefix string, waitIndex uint64, recursive bool, waitIndex = resp.Node.ModifiedIndex + 1 receiver <- resp } - - return nil, nil } func (c *Client) RawWatch(prefix string, waitIndex uint64, recursive bool, @@ -69,7 +68,7 @@ func (c *Client) RawWatch(prefix string, waitIndex uint64, recursive bool, return nil, err } - resp, err := raw.toResponse() + resp, err := raw.Unmarshal() if err != nil { return nil, err @@ -78,44 +77,27 @@ func (c *Client) RawWatch(prefix string, waitIndex uint64, recursive bool, waitIndex = resp.Node.ModifiedIndex + 1 receiver <- raw } - - return nil, nil } // helper func // return when there is change under the given prefix func (c *Client) watchOnce(key string, waitIndex uint64, recursive bool, stop chan bool) (*RawResponse, error) { - respChan := make(chan *RawResponse, 1) - errChan := make(chan error) - - go func() { - options := options{ - "wait": true, - } - if waitIndex > 0 { - options["waitIndex"] = waitIndex - } - if recursive { - options["recursive"] = true - } - - resp, err := c.get(key, options) - - if err != nil { - errChan <- err - return - } + options := Options{ + "wait": true, + } + if waitIndex > 0 { + options["waitIndex"] = waitIndex + } + if recursive { + options["recursive"] = true + } - respChan <- resp - }() + resp, err := c.getCancelable(key, options, stop) - select { - case resp := <-respChan: - return resp, nil - case err := <-errChan: - return nil, err - case <-stop: + if err == ErrRequestCancelled { return nil, ErrWatchStoppedByUser } + + return resp, err } diff --git a/third_party/github.com/coreos/go-etcd/etcd/watch_test.go b/third_party/github.com/coreos/go-etcd/etcd/watch_test.go index 9b466489cdce641337fd026458400a8bcca4b8d1..43e1dfeb81f18c3091bb0316a8da5fc92720fd28 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/watch_test.go +++ b/third_party/github.com/coreos/go-etcd/etcd/watch_test.go @@ -2,6 +2,7 @@ package etcd import ( "fmt" + "runtime" "testing" "time" ) @@ -32,6 +33,8 @@ func TestWatch(t *testing.T) { t.Fatalf("Watch 2 failed: %#v", resp) } + routineNum := runtime.NumGoroutine() + ch := make(chan *Response, 10) stop := make(chan bool, 1) @@ -43,6 +46,10 @@ func TestWatch(t *testing.T) { if err != ErrWatchStoppedByUser { t.Fatalf("Watch returned a non-user stop error") } + + if newRoutineNum := runtime.NumGoroutine(); newRoutineNum != routineNum { + t.Fatalf("Routine numbers differ after watch stop: %v, %v", routineNum, newRoutineNum) + } } func TestWatchAll(t *testing.T) { @@ -74,6 +81,8 @@ func TestWatchAll(t *testing.T) { ch := make(chan *Response, 10) stop := make(chan bool, 1) + routineNum := runtime.NumGoroutine() + go setLoop("watch_foo/foo", "bar", c) go receiver(ch, stop) @@ -82,6 +91,10 @@ func TestWatchAll(t *testing.T) { if err != ErrWatchStoppedByUser { t.Fatalf("Watch returned a non-user stop error") } + + if newRoutineNum := runtime.NumGoroutine(); newRoutineNum != routineNum { + t.Fatalf("Routine numbers differ after watch stop: %v, %v", routineNum, newRoutineNum) + } } func setHelper(key, value string, c *Client) {