diff --git a/third_party/github.com/coreos/go-etcd/etcd/add_child.go b/third_party/github.com/coreos/go-etcd/etcd/add_child.go new file mode 100644 index 0000000000000000000000000000000000000000..f275599c5a41ce1e3510a673ca7c627b96e9ee7b --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/add_child.go @@ -0,0 +1,11 @@ +package etcd + +// Add a new directory with a random etcd-generated key under the given path. +func (c *Client) AddChildDir(key string, ttl uint64) (*Response, error) { + return c.post(key, "", ttl) +} + +// Add a new file with a random etcd-generated key under the given path. +func (c *Client) AddChild(key string, value string, ttl uint64) (*Response, error) { + return c.post(key, value, ttl) +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/add_child_test.go b/third_party/github.com/coreos/go-etcd/etcd/add_child_test.go new file mode 100644 index 0000000000000000000000000000000000000000..efe155467aeec9a3ca904edf351e7fb207260d48 --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/add_child_test.go @@ -0,0 +1,73 @@ +package etcd + +import "testing" + +func TestAddChild(t *testing.T) { + c := NewClient(nil) + defer func() { + c.DeleteAll("fooDir") + c.DeleteAll("nonexistentDir") + }() + + c.SetDir("fooDir", 5) + + _, err := c.AddChild("fooDir", "v0", 5) + if err != nil { + t.Fatal(err) + } + + _, err = c.AddChild("fooDir", "v1", 5) + if err != nil { + t.Fatal(err) + } + + resp, err := c.Get("fooDir", true) + // The child with v0 should proceed the child with v1 because it's added + // earlier, so it should have a lower key. + if !(len(resp.Kvs) == 2 && (resp.Kvs[0].Value == "v0" && resp.Kvs[1].Value == "v1")) { + t.Fatalf("AddChild 1 failed. There should be two chlidren whose values are v0 and v1, respectively."+ + " The response was: %#v", resp) + } + + // Creating a child under a nonexistent directory should succeed. + // The directory should be created. + resp, err = c.AddChild("nonexistentDir", "foo", 5) + if err != nil { + t.Fatal(err) + } +} + +func TestAddChildDir(t *testing.T) { + c := NewClient(nil) + defer func() { + c.DeleteAll("fooDir") + c.DeleteAll("nonexistentDir") + }() + + c.SetDir("fooDir", 5) + + _, err := c.AddChildDir("fooDir", 5) + if err != nil { + t.Fatal(err) + } + + _, err = c.AddChildDir("fooDir", 5) + if err != nil { + t.Fatal(err) + } + + resp, err := c.Get("fooDir", true) + // The child with v0 should proceed the child with v1 because it's added + // earlier, so it should have a lower key. + if !(len(resp.Kvs) == 2 && (len(resp.Kvs[0].KVPairs) == 0 && len(resp.Kvs[1].KVPairs) == 0)) { + t.Fatalf("AddChildDir 1 failed. There should be two chlidren whose values are v0 and v1, respectively."+ + " The response was: %#v", resp) + } + + // Creating a child under a nonexistent directory should succeed. + // The directory should be created. + resp, err = c.AddChildDir("nonexistentDir", 5) + if err != nil { + t.Fatal(err) + } +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/client.go b/third_party/github.com/coreos/go-etcd/etcd/client.go index 31d3c2a3a76069f3c956e34e0078834d76a764fc..63ce6ab7fa67f315462933b33c999f8c05e8f409 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/client.go +++ b/third_party/github.com/coreos/go-etcd/etcd/client.go @@ -2,12 +2,16 @@ package etcd import ( "crypto/tls" + "encoding/json" "errors" + "io" "io/ioutil" "net" "net/http" "net/url" + "os" "path" + "reflect" "strings" "time" ) @@ -17,25 +21,43 @@ const ( HTTPS ) +// See SetConsistency for how to use these constants. +const ( + // Using strings rather than iota because the consistency level + // could be persisted to disk, so it'd be better to use + // human-readable values. + STRONG_CONSISTENCY = "STRONG" + WEAK_CONSISTENCY = "WEAK" +) + type Cluster struct { - Leader string - Machines []string + Leader string `json:"leader"` + Machines []string `json:"machines"` } type Config struct { - CertFile string - KeyFile string - Scheme string - Timeout time.Duration + CertFile string `json:"certFile"` + KeyFile string `json:"keyFile"` + Scheme string `json:"scheme"` + Timeout time.Duration `json:"timeout"` + Consistency string `json: "consistency"` } type Client struct { - cluster Cluster - config Config - httpClient *http.Client + cluster Cluster `json:"cluster"` + config Config `json:"config"` + httpClient *http.Client + persistence io.Writer } -// Setup a basic conf and cluster +type options map[string]interface{} + +// An internally-used data structure that represents a mapping +// between valid options and their kinds +type validOptions map[string]reflect.Kind + +// NewClient create a basic client that is configured to be used +// with the given machine list. func NewClient(machines []string) *Client { // if an empty slice was sent in then just assume localhost if len(machines) == 0 { @@ -53,30 +75,168 @@ func NewClient(machines []string) *Client { Scheme: "http", // default timeout is one second Timeout: time.Second, + // default consistency level is STRONG + Consistency: STRONG_CONSISTENCY, + } + + client := &Client{ + cluster: cluster, + config: config, + } + + err := setupHttpClient(client) + if err != nil { + panic(err) + } + + return client +} + +// NewClientFile creates a client from a given file path. +// The given file is expected to use the JSON format. +func NewClientFile(fpath string) (*Client, error) { + fi, err := os.Open(fpath) + if err != nil { + return nil, err + } + defer func() { + if err := fi.Close(); err != nil { + panic(err) + } + }() + + return NewClientReader(fi) +} + +// NewClientReader creates a Client configured from a given reader. +// The config is expected to use the JSON format. +func NewClientReader(reader io.Reader) (*Client, error) { + var client Client + + b, err := ioutil.ReadAll(reader) + if err != nil { + return nil, err + } + + err = json.Unmarshal(b, &client) + if err != nil { + return nil, err } - tr := &http.Transport{ - Dial: dialTimeout, - TLSClientConfig: &tls.Config{ - InsecureSkipVerify: true, - }, + err = setupHttpClient(&client) + if err != nil { + return nil, err + } + + return &client, nil +} + +func setupHttpClient(client *Client) error { + if client.config.CertFile != "" && client.config.KeyFile != "" { + err := client.SetCertAndKey(client.config.CertFile, client.config.KeyFile) + if err != nil { + return err + } + } else { + client.config.CertFile = "" + client.config.KeyFile = "" + tr := &http.Transport{ + Dial: dialTimeout, + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true, + }, + } + client.httpClient = &http.Client{Transport: tr} + } + + return nil +} + +// SetPersistence sets a writer to which the config will be +// written every time it's changed. +func (c *Client) SetPersistence(writer io.Writer) { + c.persistence = writer +} + +// SetConsistency changes the consistency level of the client. +// +// When consistency is set to STRONG_CONSISTENCY, all requests, +// including GET, are sent to the leader. This means that, assuming +// the absence of leader failures, GET requests are guranteed to see +// the changes made by previous requests. +// +// When consistency is set to WEAK_CONSISTENCY, other requests +// are still sent to the leader, but GET requests are sent to a +// random server from the server pool. This reduces the read +// load on the leader, but it's not guranteed that the GET requests +// will see changes made by previous requests (they might have not +// yet been commited on non-leader servers). +func (c *Client) SetConsistency(consistency string) error { + if !(consistency == STRONG_CONSISTENCY || consistency == WEAK_CONSISTENCY) { + return errors.New("The argument must be either STRONG_CONSISTENCY or WEAK_CONSISTENCY.") + } + c.config.Consistency = consistency + return nil +} + +// MarshalJSON implements the Marshaller interface +// as defined by the standard JSON package. +func (c *Client) MarshalJSON() ([]byte, error) { + b, err := json.Marshal(struct { + Config Config `json:"config"` + Cluster Cluster `json:"cluster"` + }{ + Config: c.config, + Cluster: c.cluster, + }) + + if err != nil { + return nil, err } - return &Client{ - cluster: cluster, - config: config, - httpClient: &http.Client{Transport: tr}, + return b, nil +} + +// UnmarshalJSON implements the Unmarshaller interface +// as defined by the standard JSON package. +func (c *Client) UnmarshalJSON(b []byte) error { + temp := struct { + Config Config `json: "config"` + Cluster Cluster `json: "cluster"` + }{} + err := json.Unmarshal(b, &temp) + if err != nil { + return err } + c.cluster = temp.Cluster + c.config = temp.Config + return nil } -func (c *Client) SetCertAndKey(cert string, key string) (bool, error) { +// saveConfig saves the current config using c.persistence. +func (c *Client) saveConfig() error { + if c.persistence != nil { + b, err := json.Marshal(c) + if err != nil { + return err + } + + _, err = c.persistence.Write(b) + if err != nil { + return err + } + } + return nil +} + +func (c *Client) SetCertAndKey(cert string, key string) error { if cert != "" && key != "" { tlsCert, err := tls.LoadX509KeyPair(cert, key) if err != nil { - return false, err + return err } tr := &http.Transport{ @@ -88,24 +248,27 @@ func (c *Client) SetCertAndKey(cert string, key string) (bool, error) { } c.httpClient = &http.Client{Transport: tr} - return true, nil + c.saveConfig() + return nil } - return false, errors.New("Require both cert and key path") + return errors.New("Require both cert and key path") } -func (c *Client) SetScheme(scheme int) (bool, error) { +func (c *Client) SetScheme(scheme int) error { if scheme == HTTP { c.config.Scheme = "http" - return true, nil + c.saveConfig() + return nil } if scheme == HTTPS { c.config.Scheme = "https" - return true, nil + c.saveConfig() + return nil } - return false, errors.New("Unknown Scheme") + return errors.New("Unknown Scheme") } -// Try to sync from the given machine +// SetCluster updates config using the given machine list. func (c *Client) SetCluster(machines []string) bool { success := c.internalSyncCluster(machines) return success @@ -115,13 +278,13 @@ func (c *Client) GetCluster() []string { return c.cluster.Machines } -// sycn cluster information using the existing machine list +// SyncCluster updates config using the internal machine list. func (c *Client) SyncCluster() bool { success := c.internalSyncCluster(c.cluster.Machines) return success } -// sync cluster information by providing machine list +// internalSyncCluster syncs cluster information using the given machine list. func (c *Client) internalSyncCluster(machines []string) bool { for _, machine := range machines { httpPath := c.createHttpPath(machine, version+"/machines") @@ -146,16 +309,19 @@ func (c *Client) internalSyncCluster(machines []string) bool { c.cluster.Leader = c.cluster.Machines[0] logger.Debug("sync.machines ", c.cluster.Machines) + c.saveConfig() return true } } return false } -// serverName should contain both hostName and port +// createHttpPath creates a complete HTTP URL. +// serverName should contain both the host name and a port number, if any. func (c *Client) createHttpPath(serverName string, _path string) string { u, _ := url.Parse(serverName) u.Path = path.Join(u.Path, "/", _path) + if u.Scheme == "" { u.Scheme = "http" } @@ -167,18 +333,6 @@ func dialTimeout(network, addr string) (net.Conn, error) { return net.DialTimeout(network, addr, time.Second) } -func (c *Client) getHttpPath(s ...string) string { - u, _ := url.Parse(c.cluster.Leader) - - u.Path = path.Join(u.Path, "/", version) - - for _, seg := range s { - u.Path = path.Join(u.Path, seg) - } - - return u.String() -} - func (c *Client) updateLeader(httpPath string) { u, _ := url.Parse(httpPath) @@ -191,77 +345,5 @@ func (c *Client) updateLeader(httpPath string) { logger.Debugf("update.leader[%s,%s]", c.cluster.Leader, leader) c.cluster.Leader = leader -} - -// Wrap GET, POST and internal error handling -func (c *Client) sendRequest(method string, _path string, body string) (*http.Response, error) { - - var resp *http.Response - var err error - var req *http.Request - - retry := 0 - // if we connect to a follower, we will retry until we found a leader - for { - - httpPath := c.getHttpPath(_path) - - logger.Debug("send.request.to ", httpPath) - if body == "" { - - req, _ = http.NewRequest(method, httpPath, nil) - - } else { - req, _ = http.NewRequest(method, httpPath, strings.NewReader(body)) - req.Header.Set("Content-Type", "application/x-www-form-urlencoded; param=value") - } - - resp, err = c.httpClient.Do(req) - - logger.Debug("recv.response.from ", httpPath) - // network error, change a machine! - if err != nil { - retry++ - if retry > 2*len(c.cluster.Machines) { - return nil, errors.New("Cannot reach servers") - } - num := retry % len(c.cluster.Machines) - logger.Debug("update.leader[", c.cluster.Leader, ",", c.cluster.Machines[num], "]") - c.cluster.Leader = c.cluster.Machines[num] - time.Sleep(time.Millisecond * 200) - continue - } - - if resp != nil { - if resp.StatusCode == http.StatusTemporaryRedirect { - httpPath := resp.Header.Get("Location") - - resp.Body.Close() - - if httpPath == "" { - return nil, errors.New("Cannot get redirection location") - } - - c.updateLeader(httpPath) - logger.Debug("send.redirect") - // try to connect the leader - continue - } else if resp.StatusCode == http.StatusInternalServerError { - resp.Body.Close() - - retry++ - if retry > 2*len(c.cluster.Machines) { - return nil, errors.New("Cannot reach servers") - } - continue - } else { - logger.Debug("send.return.response ", httpPath) - break - } - - } - logger.Debug("error.from ", httpPath, " ", err.Error()) - return nil, err - } - return resp, nil + c.saveConfig() } diff --git a/third_party/github.com/coreos/go-etcd/etcd/client_test.go b/third_party/github.com/coreos/go-etcd/etcd/client_test.go index 29f138113cef739885142d8d6427c2baf35395e6..b25611b22fdc76d63528ea8fe1d9ce33c72acc94 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/client_test.go +++ b/third_party/github.com/coreos/go-etcd/etcd/client_test.go @@ -1,10 +1,12 @@ package etcd import ( + "encoding/json" "fmt" - "testing" - "net/url" "net" + "net/url" + "os" + "testing" ) // To pass this test, we need to create a cluster of 3 machines @@ -19,7 +21,7 @@ func TestSync(t *testing.T) { t.Fatal("cannot sync machines") } - for _, m := range(c.GetCluster()) { + for _, m := range c.GetCluster() { u, err := url.Parse(m) if err != nil { t.Fatal(err) @@ -27,7 +29,7 @@ func TestSync(t *testing.T) { if u.Scheme != "http" { t.Fatal("scheme must be http") } - + host, _, err := net.SplitHostPort(u.Host) if err != nil { t.Fatal(err) @@ -56,3 +58,37 @@ func TestSync(t *testing.T) { } } + +func TestPersistence(t *testing.T) { + c := NewClient(nil) + c.SyncCluster() + + fo, err := os.Create("config.json") + if err != nil { + t.Fatal(err) + } + defer func() { + if err := fo.Close(); err != nil { + panic(err) + } + }() + + c.SetPersistence(fo) + err = c.saveConfig() + if err != nil { + t.Fatal(err) + } + + c2, err := NewClientFile("config.json") + if err != nil { + t.Fatal(err) + } + + // Verify that the two clients have the same config + b1, _ := json.Marshal(c) + b2, _ := json.Marshal(c2) + + if string(b1) != string(b2) { + t.Fatalf("The two configs should be equal!") + } +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/compare_and_swap.go b/third_party/github.com/coreos/go-etcd/etcd/compare_and_swap.go new file mode 100644 index 0000000000000000000000000000000000000000..565a03ef1ac9f362a85c5b034fddf2c91268f4e8 --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/compare_and_swap.go @@ -0,0 +1,18 @@ +package etcd + +import "fmt" + +func (c *Client) CompareAndSwap(key string, value string, ttl uint64, prevValue string, prevIndex uint64) (*Response, error) { + if prevValue == "" && prevIndex == 0 { + return nil, fmt.Errorf("You must give either prevValue or prevIndex.") + } + + options := options{} + if prevValue != "" { + options["prevValue"] = prevValue + } + if prevIndex != 0 { + options["prevIndex"] = prevIndex + } + return c.put(key, value, ttl, options) +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/compare_and_swap_test.go b/third_party/github.com/coreos/go-etcd/etcd/compare_and_swap_test.go new file mode 100644 index 0000000000000000000000000000000000000000..647aadffa27c3f9e492114de54f7c32cf3986363 --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/compare_and_swap_test.go @@ -0,0 +1,51 @@ +package etcd + +import ( + "testing" +) + +func TestCompareAndSwap(t *testing.T) { + c := NewClient(nil) + defer func() { + c.DeleteAll("foo") + }() + + c.Set("foo", "bar", 5) + + // This should succeed + resp, err := c.CompareAndSwap("foo", "bar2", 5, "bar", 0) + if err != nil { + t.Fatal(err) + } + if !(resp.Value == "bar2" && resp.PrevValue == "bar" && + resp.Key == "/foo" && resp.TTL == 5) { + t.Fatalf("CompareAndSwap 1 failed: %#v", resp) + } + + // This should fail because it gives an incorrect prevValue + resp, err = c.CompareAndSwap("foo", "bar3", 5, "xxx", 0) + if err == nil { + t.Fatalf("CompareAndSwap 2 should have failed. The response is: %#v", resp) + } + + resp, err = c.Set("foo", "bar", 5) + if err != nil { + t.Fatal(err) + } + + // This should succeed + resp, err = c.CompareAndSwap("foo", "bar2", 5, "", resp.Index) + if err != nil { + t.Fatal(err) + } + if !(resp.Value == "bar2" && resp.PrevValue == "bar" && + resp.Key == "/foo" && resp.TTL == 5) { + t.Fatalf("CompareAndSwap 1 failed: %#v", resp) + } + + // This should fail because it gives an incorrect prevIndex + resp, err = c.CompareAndSwap("foo", "bar3", 5, "", 29817514) + if err == nil { + t.Fatalf("CompareAndSwap 2 should have failed. The response is: %#v", resp) + } +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/debug.go b/third_party/github.com/coreos/go-etcd/etcd/debug.go index d82d9b292845f2ce5252edce13ecbf0f6d719d31..bd67398813a226905869a6b759b666678d7df070 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/debug.go +++ b/third_party/github.com/coreos/go-etcd/etcd/debug.go @@ -9,6 +9,8 @@ var logger *log.Logger func init() { setLogger(log.PriErr) + // Uncomment the following line if you want to see lots of logs + // OpenDebug() } func OpenDebug() { diff --git a/third_party/github.com/coreos/go-etcd/etcd/delete.go b/third_party/github.com/coreos/go-etcd/etcd/delete.go index 91f6df87a5adc33542428e624227eb7756ca5d0c..00348f6ba8858fca1658ec610d7190b9011762c8 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/delete.go +++ b/third_party/github.com/coreos/go-etcd/etcd/delete.go @@ -1,40 +1,17 @@ package etcd -import ( - "encoding/json" - "io/ioutil" - "net/http" - "path" -) +// DeleteAll deletes everything under the given key. If the key +// points to a file, the file will be deleted. If the key points +// to a directory, then everything under the directory, include +// all child directories, will be deleted. +func (c *Client) DeleteAll(key string) (*Response, error) { + return c.delete(key, options{ + "recursive": true, + }) +} +// Delete deletes the given key. If the key points to a +// directory, the method will fail. func (c *Client) Delete(key string) (*Response, error) { - - resp, err := c.sendRequest("DELETE", path.Join("keys", key), "") - - if err != nil { - return nil, err - } - - b, err := ioutil.ReadAll(resp.Body) - - resp.Body.Close() - - if err != nil { - return nil, err - } - - if resp.StatusCode != http.StatusOK { - return nil, handleError(b) - } - - var result Response - - err = json.Unmarshal(b, &result) - - if err != nil { - return nil, err - } - - return &result, nil - + return c.delete(key, nil) } diff --git a/third_party/github.com/coreos/go-etcd/etcd/delete_test.go b/third_party/github.com/coreos/go-etcd/etcd/delete_test.go index 52756d09fcd74db7ed77478b56dcad6e1b48ba5c..0f8475a235c7bd2b56141c36f2d56bc05a006b80 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/delete_test.go +++ b/third_party/github.com/coreos/go-etcd/etcd/delete_test.go @@ -5,18 +5,60 @@ import ( ) func TestDelete(t *testing.T) { + c := NewClient(nil) + defer func() { + c.DeleteAll("foo") + }() + + c.Set("foo", "bar", 5) + resp, err := c.Delete("foo") + if err != nil { + t.Fatal(err) + } + if !(resp.PrevValue == "bar" && resp.Value == "") { + t.Fatalf("Delete failed with %s %s", resp.PrevValue, + resp.Value) + } + + resp, err = c.Delete("foo") + if err == nil { + t.Fatalf("Delete should have failed because the key foo did not exist. "+ + "The response was: %v", resp) + } +} + +func TestDeleteAll(t *testing.T) { c := NewClient(nil) + defer func() { + c.DeleteAll("foo") + c.DeleteAll("fooDir") + }() + + c.Set("foo", "bar", 5) + resp, err := c.DeleteAll("foo") + if err != nil { + t.Fatal(err) + } + + if !(resp.PrevValue == "bar" && resp.Value == "") { + t.Fatalf("DeleteAll 1 failed: %#v", resp) + } - c.Set("foo", "bar", 100) - result, err := c.Delete("foo") + c.SetDir("fooDir", 5) + c.Set("fooDir/foo", "bar", 5) + resp, err = c.DeleteAll("fooDir") if err != nil { t.Fatal(err) } - if result.PrevValue != "bar" || result.Value != "" { - t.Fatalf("Delete failed with %s %s", result.PrevValue, - result.Value) + if !(resp.PrevValue == "" && resp.Value == "") { + t.Fatalf("DeleteAll 2 failed: %#v", resp) } + resp, err = c.DeleteAll("foo") + if err == nil { + t.Fatalf("DeleteAll should have failed because the key foo did not exist. "+ + "The response was: %v", resp) + } } diff --git a/third_party/github.com/coreos/go-etcd/etcd/get.go b/third_party/github.com/coreos/go-etcd/etcd/get.go index 3288621e7c44d3368c48b417b49bafa65283cb66..d42a83c7d93d3a478dc82ac1d038c3916f682414 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/get.go +++ b/third_party/github.com/coreos/go-etcd/etcd/get.go @@ -1,82 +1,23 @@ package etcd -import ( - "encoding/json" - "io/ioutil" - "net/http" - "path" -) - -func (c *Client) Get(key string) ([]*Response, error) { - logger.Debugf("get %s [%s]", key, c.cluster.Leader) - resp, err := c.sendRequest("GET", path.Join("keys", key), "") - - if err != nil { - return nil, err - } - - b, err := ioutil.ReadAll(resp.Body) - - resp.Body.Close() - - if err != nil { - return nil, err - } - - if resp.StatusCode != http.StatusOK { - - return nil, handleError(b) - } - - return convertGetResponse(b) - +// GetDir gets the all contents under the given key. +// If the key points to a file, the file is returned. +// If the key points to a directory, everything under it is returnd, +// including all contents under all child directories. +func (c *Client) GetAll(key string, sort bool) (*Response, error) { + return c.get(key, options{ + "recursive": true, + "sorted": sort, + }) } -// GetTo gets the value of the key from a given machine address. -// If the given machine is not available it returns an error. -// Mainly use for testing purpose -func (c *Client) GetFrom(key string, addr string) ([]*Response, error) { - httpPath := c.createHttpPath(addr, path.Join(version, "keys", key)) - - resp, err := c.httpClient.Get(httpPath) - - if err != nil { - return nil, err - } - - b, err := ioutil.ReadAll(resp.Body) - - resp.Body.Close() - - if err != nil { - return nil, err - } - - if resp.StatusCode != http.StatusOK { - return nil, handleError(b) - } - - return convertGetResponse(b) -} - -// Convert byte stream to response. -func convertGetResponse(b []byte) ([]*Response, error) { - - var results []*Response - var result *Response - - err := json.Unmarshal(b, &result) - - if err != nil { - err = json.Unmarshal(b, &results) - - if err != nil { - return nil, err - } - - } else { - results = make([]*Response, 1) - results[0] = result - } - return results, nil +// Get gets the file or directory associated with the given key. +// If the key points to a directory, files and directories under +// it will be returned in sorted or unsorted order, depending on +// the sort flag. Note that contents under child directories +// will not be returned. To get those contents, use GetAll. +func (c *Client) Get(key string, sort bool) (*Response, error) { + return c.get(key, options{ + "sorted": sort, + }) } diff --git a/third_party/github.com/coreos/go-etcd/etcd/get_test.go b/third_party/github.com/coreos/go-etcd/etcd/get_test.go index ff81374003cdea690432d2fdd6853b6856b2a59d..a34946c7eddb10ca2caedfa5268eef8641e54340 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/get_test.go +++ b/third_party/github.com/coreos/go-etcd/etcd/get_test.go @@ -1,46 +1,99 @@ package etcd import ( + "reflect" "testing" - "time" ) func TestGet(t *testing.T) { - c := NewClient(nil) + defer func() { + c.DeleteAll("foo") + }() - c.Set("foo", "bar", 100) - - // wait for commit - time.Sleep(100 * time.Millisecond) + c.Set("foo", "bar", 5) - results, err := c.Get("foo") + result, err := c.Get("foo", false) - if err != nil || results[0].Key != "/foo" || results[0].Value != "bar" { - if err != nil { - t.Fatal(err) - } - t.Fatalf("Get failed with %s %s %v", results[0].Key, results[0].Value, results[0].TTL) + if err != nil { + t.Fatal(err) } - results, err = c.Get("goo") + if result.Key != "/foo" || result.Value != "bar" { + t.Fatalf("Get failed with %s %s %v", result.Key, result.Value, result.TTL) + } + result, err = c.Get("goo", false) if err == nil { t.Fatalf("should not be able to get non-exist key") } +} - results, err = c.GetFrom("foo", "0.0.0.0:4001") +func TestGetAll(t *testing.T) { + c := NewClient(nil) + defer func() { + c.DeleteAll("fooDir") + }() - if err != nil || results[0].Key != "/foo" || results[0].Value != "bar" { - if err != nil { - t.Fatal(err) - } - t.Fatalf("Get failed with %s %s %v", results[0].Key, results[0].Value, results[0].TTL) + c.SetDir("fooDir", 5) + c.Set("fooDir/k0", "v0", 5) + c.Set("fooDir/k1", "v1", 5) + + // Return kv-pairs in sorted order + result, err := c.Get("fooDir", true) + + if err != nil { + t.Fatal(err) } - results, err = c.GetFrom("foo", "0.0.0.0:4009") + expected := kvPairs{ + KeyValuePair{ + Key: "/fooDir/k0", + Value: "v0", + }, + KeyValuePair{ + Key: "/fooDir/k1", + Value: "v1", + }, + } - if err == nil { - t.Fatal("should not get from port 4009") + if !reflect.DeepEqual(result.Kvs, expected) { + t.Fatalf("(actual) %v != (expected) %v", result.Kvs, expected) + } + + // Test the `recursive` option + c.SetDir("fooDir/childDir", 5) + c.Set("fooDir/childDir/k2", "v2", 5) + + // Return kv-pairs in sorted order + result, err = c.GetAll("fooDir", true) + + if err != nil { + t.Fatal(err) + } + + expected = kvPairs{ + KeyValuePair{ + Key: "/fooDir/childDir", + Dir: true, + KVPairs: kvPairs{ + KeyValuePair{ + Key: "/fooDir/childDir/k2", + Value: "v2", + }, + }, + }, + KeyValuePair{ + Key: "/fooDir/k0", + Value: "v0", + }, + KeyValuePair{ + Key: "/fooDir/k1", + Value: "v1", + }, + } + + if !reflect.DeepEqual(result.Kvs, expected) { + t.Fatalf("(actual) %v != (expected) %v", result.Kvs) } } diff --git a/third_party/github.com/coreos/go-etcd/etcd/requests.go b/third_party/github.com/coreos/go-etcd/etcd/requests.go new file mode 100644 index 0000000000000000000000000000000000000000..83e3b519ef799b4f3b9974dd4f5263f36b78966f --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/requests.go @@ -0,0 +1,290 @@ +package etcd + +import ( + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "math/rand" + "net/http" + "net/url" + "path" + "reflect" + "strings" + "time" +) + +// Valid options for GET, PUT, POST, DELETE +// Using CAPITALIZED_UNDERSCORE to emphasize that these +// values are meant to be used as constants. +var ( + VALID_GET_OPTIONS = validOptions{ + "recursive": reflect.Bool, + "consistent": reflect.Bool, + "sorted": reflect.Bool, + "wait": reflect.Bool, + "waitIndex": reflect.Uint64, + } + + VALID_PUT_OPTIONS = validOptions{ + "prevValue": reflect.String, + "prevIndex": reflect.Uint64, + "prevExist": reflect.Bool, + } + + VALID_POST_OPTIONS = validOptions{} + + VALID_DELETE_OPTIONS = validOptions{ + "recursive": reflect.Bool, + } + + curlChan chan string +) + +// SetCurlChan sets a channel to which cURL commands which can be used to +// re-produce requests are sent. This is useful for debugging. +func SetCurlChan(c chan string) { + curlChan = c +} + +// get issues a GET request +func (c *Client) get(key string, options options) (*Response, error) { + logger.Debugf("get %s [%s]", key, c.cluster.Leader) + + p := path.Join("keys", key) + // If consistency level is set to STRONG, append + // the `consistent` query string. + if c.config.Consistency == STRONG_CONSISTENCY { + options["consistent"] = true + } + if options != nil { + str, err := optionsToString(options, VALID_GET_OPTIONS) + if err != nil { + return nil, err + } + p += str + } + + resp, err := c.sendRequest("GET", p, url.Values{}) + + if err != nil { + return nil, err + } + + return resp, nil +} + +// put issues a PUT request +func (c *Client) put(key string, value string, ttl uint64, options options) (*Response, error) { + logger.Debugf("put %s, %s, ttl: %d, [%s]", key, value, ttl, c.cluster.Leader) + v := url.Values{} + + if value != "" { + v.Set("value", value) + } + + if ttl > 0 { + v.Set("ttl", fmt.Sprintf("%v", ttl)) + } + + p := path.Join("keys", key) + if options != nil { + str, err := optionsToString(options, VALID_PUT_OPTIONS) + if err != nil { + return nil, err + } + p += str + } + + resp, err := c.sendRequest("PUT", p, v) + + if err != nil { + return nil, err + } + + return resp, nil +} + +// post issues a POST request +func (c *Client) post(key string, value string, ttl uint64) (*Response, error) { + logger.Debugf("post %s, %s, ttl: %d, [%s]", key, value, ttl, c.cluster.Leader) + v := url.Values{} + + if value != "" { + v.Set("value", value) + } + + if ttl > 0 { + v.Set("ttl", fmt.Sprintf("%v", ttl)) + } + + resp, err := c.sendRequest("POST", path.Join("keys", key), v) + + if err != nil { + return nil, err + } + + return resp, nil +} + +// delete issues a DELETE request +func (c *Client) delete(key string, options options) (*Response, error) { + logger.Debugf("delete %s [%s]", key, c.cluster.Leader) + v := url.Values{} + + p := path.Join("keys", key) + if options != nil { + str, err := optionsToString(options, VALID_DELETE_OPTIONS) + if err != nil { + return nil, err + } + p += str + } + + resp, err := c.sendRequest("DELETE", p, v) + + if err != nil { + return nil, err + } + + return resp, nil +} + +// sendRequest sends a HTTP request and returns a Response as defined by etcd +func (c *Client) sendRequest(method string, _path string, values url.Values) (*Response, error) { + var body string = values.Encode() + var resp *http.Response + var req *http.Request + + retry := 0 + // if we connect to a follower, we will retry until we found a leader + for { + var httpPath string + + // If _path has schema already, then it's assumed to be + // a complete URL and therefore needs no further processing. + u, err := url.Parse(_path) + if err != nil { + return nil, err + } + + if u.Scheme != "" { + httpPath = _path + } else { + if method == "GET" && c.config.Consistency == WEAK_CONSISTENCY { + // If it's a GET and consistency level is set to WEAK, + // then use a random machine. + httpPath = c.getHttpPath(true, _path) + } else { + // Else use the leader. + httpPath = c.getHttpPath(false, _path) + } + } + + // Return a cURL command if curlChan is set + if curlChan != nil { + command := fmt.Sprintf("curl -X %s %s", method, httpPath) + for key, value := range values { + command += fmt.Sprintf(" -d %s=%s", key, value[0]) + } + curlChan <- command + } + + logger.Debug("send.request.to ", httpPath, " | method ", method) + if body == "" { + + req, _ = http.NewRequest(method, httpPath, nil) + + } else { + req, _ = http.NewRequest(method, httpPath, strings.NewReader(body)) + req.Header.Set("Content-Type", "application/x-www-form-urlencoded; param=value") + } + + resp, err = c.httpClient.Do(req) + + logger.Debug("recv.response.from ", httpPath) + // network error, change a machine! + if err != nil { + retry++ + if retry > 2*len(c.cluster.Machines) { + return nil, errors.New("Cannot reach servers") + } + num := retry % len(c.cluster.Machines) + logger.Debug("update.leader[", c.cluster.Leader, ",", c.cluster.Machines[num], "]") + c.cluster.Leader = c.cluster.Machines[num] + time.Sleep(time.Millisecond * 200) + continue + } + + if resp != nil { + if resp.StatusCode == http.StatusTemporaryRedirect { + httpPath := resp.Header.Get("Location") + + resp.Body.Close() + + if httpPath == "" { + return nil, errors.New("Cannot get redirection location") + } + + c.updateLeader(httpPath) + logger.Debug("send.redirect") + // try to connect the leader + continue + } else if resp.StatusCode == http.StatusInternalServerError { + resp.Body.Close() + + retry++ + if retry > 2*len(c.cluster.Machines) { + return nil, errors.New("Cannot reach servers") + } + continue + } else { + logger.Debug("send.return.response ", httpPath) + break + } + + } + logger.Debug("error.from ", httpPath, " ", err.Error()) + return nil, err + } + + // Convert HTTP response to etcd response + b, err := ioutil.ReadAll(resp.Body) + + resp.Body.Close() + + if err != nil { + return nil, err + } + + if !(resp.StatusCode == http.StatusOK || + resp.StatusCode == http.StatusCreated) { + return nil, handleError(b) + } + + var result Response + + err = json.Unmarshal(b, &result) + + if err != nil { + return nil, err + } + + return &result, nil +} + +func (c *Client) getHttpPath(random bool, s ...string) string { + var machine string + if random { + machine = c.cluster.Machines[rand.Intn(len(c.cluster.Machines))] + } else { + machine = c.cluster.Leader + } + + fullPath := machine + "/" + version + for _, seg := range s { + fullPath = fullPath + "/" + seg + } + + return fullPath +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/response.go b/third_party/github.com/coreos/go-etcd/etcd/response.go index d2311a675bd6f0cfdb3949d49f9de1dbb5ccfa5a..4bf0c88370719e5ebbb2ec79ac083f975786da2f 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/response.go +++ b/third_party/github.com/coreos/go-etcd/etcd/response.go @@ -6,11 +6,12 @@ import ( // The response object from the server. type Response struct { - Action string `json:"action"` - Key string `json:"key"` - Dir bool `json:"dir,omitempty"` - PrevValue string `json:"prevValue,omitempty"` - Value string `json:"value,omitempty"` + Action string `json:"action"` + Key string `json:"key"` + Dir bool `json:"dir,omitempty"` + PrevValue string `json:"prevValue,omitempty"` + Value string `json:"value,omitempty"` + Kvs kvPairs `json:"kvs,omitempty"` // If the key did not exist before the action, // this field should be set to true @@ -24,3 +25,26 @@ type Response struct { // The command index of the raft machine when the command is executed Index uint64 `json:"index"` } + +// When user list a directory, we add all the node into key-value pair slice +type KeyValuePair struct { + Key string `json:"key, omitempty"` + Value string `json:"value,omitempty"` + Dir bool `json:"dir,omitempty"` + KVPairs kvPairs `json:"kvs,omitempty"` +} + +type kvPairs []KeyValuePair + +// interfaces for sorting +func (kvs kvPairs) Len() int { + return len(kvs) +} + +func (kvs kvPairs) Less(i, j int) bool { + return kvs[i].Key < kvs[j].Key +} + +func (kvs kvPairs) Swap(i, j int) { + kvs[i], kvs[j] = kvs[j], kvs[i] +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/set_curl_chan_test.go b/third_party/github.com/coreos/go-etcd/etcd/set_curl_chan_test.go new file mode 100644 index 0000000000000000000000000000000000000000..6d06331095bceef59ee80a44074d623c2e5d31ac --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/set_curl_chan_test.go @@ -0,0 +1,43 @@ +package etcd + +import ( + "fmt" + "testing" +) + +func TestSetCurlChan(t *testing.T) { + c := NewClient(nil) + defer func() { + c.DeleteAll("foo") + }() + + curlChan := make(chan string, 1) + SetCurlChan(curlChan) + + _, err := c.Set("foo", "bar", 5) + if err != nil { + t.Fatal(err) + } + + expected := fmt.Sprintf("curl -X PUT %s/v2/keys/foo -d value=bar -d ttl=5", + c.cluster.Leader) + actual := <-curlChan + if expected != actual { + t.Fatalf(`Command "%s" is not equal to expected value "%s"`, + actual, expected) + } + + c.SetConsistency(STRONG_CONSISTENCY) + _, err = c.Get("foo", false) + if err != nil { + t.Fatal(err) + } + + expected = fmt.Sprintf("curl -X GET %s/v2/keys/foo?consistent=true&sorted=false", + c.cluster.Leader) + actual = <-curlChan + if expected != actual { + t.Fatalf(`Command "%s" is not equal to expected value "%s"`, + actual, expected) + } +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/set_update_create.go b/third_party/github.com/coreos/go-etcd/etcd/set_update_create.go new file mode 100644 index 0000000000000000000000000000000000000000..281cd577cf65a993151fb1ed305914c3985630b7 --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/set_update_create.go @@ -0,0 +1,43 @@ +package etcd + +// SetDir sets the given key to a directory. +func (c *Client) SetDir(key string, ttl uint64) (*Response, error) { + return c.put(key, "", ttl, nil) +} + +// UpdateDir updates the given key to a directory. It succeeds only if the +// given key already exists. +func (c *Client) UpdateDir(key string, ttl uint64) (*Response, error) { + return c.put(key, "", ttl, options{ + "prevExist": true, + }) +} + +// UpdateDir creates a directory under the given key. It succeeds only if +// the given key does not yet exist. +func (c *Client) CreateDir(key string, ttl uint64) (*Response, error) { + return c.put(key, "", ttl, options{ + "prevExist": false, + }) +} + +// Set sets the given key to the given value. +func (c *Client) Set(key string, value string, ttl uint64) (*Response, error) { + return c.put(key, value, ttl, nil) +} + +// Update updates the given key to the given value. It succeeds only if the +// given key already exists. +func (c *Client) Update(key string, value string, ttl uint64) (*Response, error) { + return c.put(key, value, ttl, options{ + "prevExist": true, + }) +} + +// Create creates a file with the given value under the given key. It succeeds +// only if the given key does not yet exist. +func (c *Client) Create(key string, value string, ttl uint64) (*Response, error) { + return c.put(key, value, ttl, options{ + "prevExist": false, + }) +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/set_update_create_test.go b/third_party/github.com/coreos/go-etcd/etcd/set_update_create_test.go new file mode 100644 index 0000000000000000000000000000000000000000..6f27fdfa6e1c41d27c4ae4862579875c0a13e2ff --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/set_update_create_test.go @@ -0,0 +1,183 @@ +package etcd + +import ( + "testing" +) + +func TestSet(t *testing.T) { + c := NewClient(nil) + defer func() { + c.DeleteAll("foo") + }() + + resp, err := c.Set("foo", "bar", 5) + if err != nil { + t.Fatal(err) + } + if resp.Key != "/foo" || resp.Value != "bar" || resp.TTL != 5 { + t.Fatalf("Set 1 failed: %#v", resp) + } + + resp, err = c.Set("foo", "bar2", 5) + if err != nil { + t.Fatal(err) + } + if !(resp.Key == "/foo" && resp.Value == "bar2" && + resp.PrevValue == "bar" && resp.TTL == 5) { + t.Fatalf("Set 2 failed: %#v", resp) + } +} + +func TestUpdate(t *testing.T) { + c := NewClient(nil) + defer func() { + c.DeleteAll("foo") + c.DeleteAll("nonexistent") + }() + + resp, err := c.Set("foo", "bar", 5) + t.Logf("%#v", resp) + if err != nil { + t.Fatal(err) + } + + // This should succeed. + resp, err = c.Update("foo", "wakawaka", 5) + if err != nil { + t.Fatal(err) + } + + if !(resp.Action == "update" && resp.Key == "/foo" && + resp.PrevValue == "bar" && resp.TTL == 5) { + t.Fatalf("Update 1 failed: %#v", resp) + } + + // This should fail because the key does not exist. + resp, err = c.Update("nonexistent", "whatever", 5) + if err == nil { + t.Fatalf("The key %v did not exist, so the update should have failed."+ + "The response was: %#v", resp.Key, resp) + } +} + +func TestCreate(t *testing.T) { + c := NewClient(nil) + defer func() { + c.DeleteAll("newKey") + }() + + newKey := "/newKey" + newValue := "/newValue" + + // This should succeed + resp, err := c.Create(newKey, newValue, 5) + if err != nil { + t.Fatal(err) + } + + if !(resp.Action == "create" && resp.Key == newKey && + resp.Value == newValue && resp.PrevValue == "" && resp.TTL == 5) { + t.Fatalf("Create 1 failed: %#v", resp) + } + + // This should fail, because the key is already there + resp, err = c.Create(newKey, newValue, 5) + if err == nil { + t.Fatalf("The key %v did exist, so the creation should have failed."+ + "The response was: %#v", resp.Key, resp) + } +} + +func TestSetDir(t *testing.T) { + c := NewClient(nil) + defer func() { + c.DeleteAll("foo") + c.DeleteAll("fooDir") + }() + + resp, err := c.SetDir("fooDir", 5) + if err != nil { + t.Fatal(err) + } + if !(resp.Key == "/fooDir" && resp.Value == "" && resp.TTL == 5) { + t.Fatalf("SetDir 1 failed: %#v", resp) + } + + // This should fail because /fooDir already points to a directory + resp, err = c.SetDir("/fooDir", 5) + if err == nil { + t.Fatalf("fooDir already points to a directory, so SetDir should have failed."+ + "The response was: %#v", resp) + } + + _, err = c.Set("foo", "bar", 5) + if err != nil { + t.Fatal(err) + } + + // This should succeed + resp, err = c.SetDir("foo", 5) + if err != nil { + t.Fatal(err) + } + if !(resp.Key == "/foo" && resp.Value == "" && + resp.PrevValue == "bar" && resp.TTL == 5) { + t.Fatalf("SetDir 2 failed: %#v", resp) + } +} + +func TestUpdateDir(t *testing.T) { + c := NewClient(nil) + defer func() { + c.DeleteAll("fooDir") + }() + + resp, err := c.SetDir("fooDir", 5) + t.Logf("%#v", resp) + if err != nil { + t.Fatal(err) + } + + // This should succeed. + resp, err = c.UpdateDir("fooDir", 5) + if err != nil { + t.Fatal(err) + } + + if !(resp.Action == "update" && resp.Key == "/fooDir" && + resp.Value == "" && resp.PrevValue == "" && resp.TTL == 5) { + t.Fatalf("UpdateDir 1 failed: %#v", resp) + } + + // This should fail because the key does not exist. + resp, err = c.UpdateDir("nonexistentDir", 5) + if err == nil { + t.Fatalf("The key %v did not exist, so the update should have failed."+ + "The response was: %#v", resp.Key, resp) + } +} + +func TestCreateDir(t *testing.T) { + c := NewClient(nil) + defer func() { + c.DeleteAll("fooDir") + }() + + // This should succeed + resp, err := c.CreateDir("fooDir", 5) + if err != nil { + t.Fatal(err) + } + + if !(resp.Action == "create" && resp.Key == "/fooDir" && + resp.Value == "" && resp.PrevValue == "" && resp.TTL == 5) { + t.Fatalf("CreateDir 1 failed: %#v", resp) + } + + // This should fail, because the key is already there + resp, err = c.CreateDir("fooDir", 5) + if err == nil { + t.Fatalf("The key %v did exist, so the creation should have failed."+ + "The response was: %#v", resp.Key, resp) + } +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/utils.go b/third_party/github.com/coreos/go-etcd/etcd/utils.go new file mode 100644 index 0000000000000000000000000000000000000000..eb2f6046fc53c33c4829d8057a9d96994600fee6 --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/utils.go @@ -0,0 +1,33 @@ +// Utility functions + +package etcd + +import ( + "fmt" + "net/url" + "reflect" +) + +// Convert options to a string of HTML parameters +func optionsToString(options options, vops validOptions) (string, error) { + p := "?" + v := url.Values{} + for opKey, opVal := range options { + // Check if the given option is valid (that it exists) + kind := vops[opKey] + if kind == reflect.Invalid { + return "", fmt.Errorf("Invalid option: %v", opKey) + } + + // Check if the given option is of the valid type + t := reflect.TypeOf(opVal) + if kind != t.Kind() { + return "", fmt.Errorf("Option %s should be of %v kind, not of %v kind.", + opKey, kind, t.Kind()) + } + + v.Set(opKey, fmt.Sprintf("%v", opVal)) + } + p += v.Encode() + return p, nil +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/version.go b/third_party/github.com/coreos/go-etcd/etcd/version.go index e84e7b5b76565128c523a8ca8bc48aceed092fda..b3d05df70bc24bd4388c3ee405dc6977f243bf5a 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/version.go +++ b/third_party/github.com/coreos/go-etcd/etcd/version.go @@ -1,3 +1,3 @@ package etcd -const version = "v1" +const version = "v2" diff --git a/third_party/github.com/coreos/go-etcd/etcd/watch.go b/third_party/github.com/coreos/go-etcd/etcd/watch.go index 18fcfdc1235df214e4cee7ea9e178d132edd38d3..e770f1a2dc47ac38ee5b9b20b39cceb08ba620d5 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/watch.go +++ b/third_party/github.com/coreos/go-etcd/etcd/watch.go @@ -1,42 +1,47 @@ package etcd import ( - "encoding/json" "errors" - "fmt" - "io/ioutil" - "net/http" - "net/url" - "path" ) -type respAndErr struct { - resp *http.Response - err error -} - // Errors introduced by the Watch command. var ( ErrWatchStoppedByUser = errors.New("Watch stopped by the user via stop channel") ) -// Watch any change under the given prefix. -// When a sinceIndex is given, watch will try to scan from that index to the last index -// and will return any changes under the given prefix during the history +// WatchAll returns the first change under the given prefix since the given index. To +// watch for the latest change, set waitIndex = 0. +// +// If the prefix points to a directory, any change under it, including all child directories, +// will be returned. +// // If a receiver channel is given, it will be a long-term watch. Watch will block at the // channel. And after someone receive the channel, it will go on to watch that prefix. // If a stop channel is given, client can close long-term watch using the stop channel +func (c *Client) WatchAll(prefix string, waitIndex uint64, receiver chan *Response, stop chan bool) (*Response, error) { + return c.watch(prefix, waitIndex, true, receiver, stop) +} -func (c *Client) Watch(prefix string, sinceIndex uint64, receiver chan *Response, stop chan bool) (*Response, error) { +// Watch returns the first change to the given key since the given index. To +// watch for the latest change, set waitIndex = 0. +// +// If a receiver channel is given, it will be a long-term watch. Watch will block at the +// channel. And after someone receive the channel, it will go on to watch that +// prefix. If a stop channel is given, client can close long-term watch using +// the stop channel +func (c *Client) Watch(key string, waitIndex uint64, receiver chan *Response, stop chan bool) (*Response, error) { + return c.watch(key, waitIndex, false, receiver, stop) +} + +func (c *Client) watch(prefix string, waitIndex uint64, recursive bool, receiver chan *Response, stop chan bool) (*Response, error) { logger.Debugf("watch %s [%s]", prefix, c.cluster.Leader) if receiver == nil { - return c.watchOnce(prefix, sinceIndex, stop) - + return c.watchOnce(prefix, waitIndex, recursive, stop) } else { for { - resp, err := c.watchOnce(prefix, sinceIndex, stop) + resp, err := c.watchOnce(prefix, waitIndex, recursive, stop) if resp != nil { - sinceIndex = resp.Index + 1 + waitIndex = resp.Index + 1 receiver <- resp } else { return nil, err @@ -49,70 +54,37 @@ func (c *Client) Watch(prefix string, sinceIndex uint64, receiver chan *Response // helper func // return when there is change under the given prefix -func (c *Client) watchOnce(key string, sinceIndex uint64, stop chan bool) (*Response, error) { - - var resp *http.Response - var err error - - if stop != nil { - ch := make(chan respAndErr) +func (c *Client) watchOnce(key string, waitIndex uint64, recursive bool, stop chan bool) (*Response, error) { - go func() { - resp, err = c.sendWatchRequest(key, sinceIndex) + respChan := make(chan *Response) + errChan := make(chan error) - ch <- respAndErr{resp, err} - }() - - // select at stop or continue to receive - select { - - case res := <-ch: - resp, err = res.resp, res.err - - case <-stop: - resp, err = nil, ErrWatchStoppedByUser + go func() { + options := options{ + "wait": true, + } + if waitIndex > 0 { + options["waitIndex"] = waitIndex + } + if recursive { + options["recursive"] = true } - } else { - resp, err = c.sendWatchRequest(key, sinceIndex) - } - - if err != nil { - return nil, err - } - - b, err := ioutil.ReadAll(resp.Body) - - resp.Body.Close() - - if err != nil { - return nil, err - } - - if resp.StatusCode != http.StatusOK { - return nil, handleError(b) - } + resp, err := c.get(key, options) - var result Response + if err != nil { + errChan <- err + } - err = json.Unmarshal(b, &result) + respChan <- resp + }() - if err != nil { + select { + case resp := <-respChan: + return resp, nil + case err := <-errChan: return nil, err + case <-stop: + return nil, ErrWatchStoppedByUser } - - return &result, nil -} - -func (c *Client) sendWatchRequest(key string, sinceIndex uint64) (*http.Response, error) { - if sinceIndex == 0 { - resp, err := c.sendRequest("GET", path.Join("watch", key), "") - return resp, err - } else { - v := url.Values{} - v.Set("index", fmt.Sprintf("%v", sinceIndex)) - resp, err := c.sendRequest("POST", path.Join("watch", key), v.Encode()) - return resp, err - } - } diff --git a/third_party/github.com/coreos/go-etcd/etcd/watch_test.go b/third_party/github.com/coreos/go-etcd/etcd/watch_test.go index a3d33a4f16266e063d243991e7c9a071dc76892f..f4efd9fdab0f0967b675c79d2c746f4f6f9bca24 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/watch_test.go +++ b/third_party/github.com/coreos/go-etcd/etcd/watch_test.go @@ -8,31 +8,34 @@ import ( func TestWatch(t *testing.T) { c := NewClient(nil) + defer func() { + c.DeleteAll("watch_foo") + }() - go setHelper("bar", c) + go setHelper("watch_foo", "bar", c) - result, err := c.Watch("watch_foo", 0, nil, nil) - - if err != nil || result.Key != "/watch_foo/foo" || result.Value != "bar" { - if err != nil { - t.Fatal(err) - } - t.Fatalf("Watch failed with %s %s %v %v", result.Key, result.Value, result.TTL, result.Index) + resp, err := c.Watch("watch_foo", 0, nil, nil) + if err != nil { + t.Fatal(err) + } + if !(resp.Key == "/watch_foo" && resp.Value == "bar") { + t.Fatalf("Watch 1 failed: %#v", resp) } - result, err = c.Watch("watch_foo", result.Index, nil, nil) + go setHelper("watch_foo", "bar", c) - if err != nil || result.Key != "/watch_foo/foo" || result.Value != "bar" { - if err != nil { - t.Fatal(err) - } - t.Fatalf("Watch with Index failed with %s %s %v %v", result.Key, result.Value, result.TTL, result.Index) + resp, err = c.Watch("watch_foo", resp.Index, nil, nil) + if err != nil { + t.Fatal(err) + } + if !(resp.Key == "/watch_foo" && resp.Value == "bar") { + t.Fatalf("Watch 2 failed: %#v", resp) } ch := make(chan *Response, 10) stop := make(chan bool, 1) - go setLoop("bar", c) + go setLoop("watch_foo", "bar", c) go receiver(ch, stop) @@ -42,16 +45,55 @@ func TestWatch(t *testing.T) { } } -func setHelper(value string, c *Client) { +func TestWatchAll(t *testing.T) { + c := NewClient(nil) + defer func() { + c.DeleteAll("watch_foo") + }() + + go setHelper("watch_foo/foo", "bar", c) + + resp, err := c.WatchAll("watch_foo", 0, nil, nil) + if err != nil { + t.Fatal(err) + } + if !(resp.Key == "/watch_foo/foo" && resp.Value == "bar") { + t.Fatalf("WatchAll 1 failed: %#v", resp) + } + + go setHelper("watch_foo/foo", "bar", c) + + resp, err = c.WatchAll("watch_foo", resp.Index, nil, nil) + if err != nil { + t.Fatal(err) + } + if !(resp.Key == "/watch_foo/foo" && resp.Value == "bar") { + t.Fatalf("WatchAll 2 failed: %#v", resp) + } + + ch := make(chan *Response, 10) + stop := make(chan bool, 1) + + go setLoop("watch_foo/foo", "bar", c) + + go receiver(ch, stop) + + _, err = c.WatchAll("watch_foo", 0, ch, stop) + if err != ErrWatchStoppedByUser { + t.Fatalf("Watch returned a non-user stop error") + } +} + +func setHelper(key, value string, c *Client) { time.Sleep(time.Second) - c.Set("watch_foo/foo", value, 100) + c.Set(key, value, 100) } -func setLoop(value string, c *Client) { +func setLoop(key, value string, c *Client) { time.Sleep(time.Second) for i := 0; i < 10; i++ { newValue := fmt.Sprintf("%s_%v", value, i) - c.Set("watch_foo/foo", newValue, 100) + c.Set(key, newValue, 100) time.Sleep(time.Second / 10) } }