From e38b5763b0ddd6d51069b2297df534b325ae7c9c Mon Sep 17 00:00:00 2001 From: ale <ale@incal.net> Date: Thu, 20 Feb 2014 11:12:21 +0000 Subject: [PATCH] update to etcd 0.3 client API --- api.go | 42 +- etcd_client.go | 16 +- masterelection/masterelection.go | 20 +- node/node.go | 18 +- .../coreos/go-etcd/etcd/add_child.go | 16 +- .../coreos/go-etcd/etcd/add_child_test.go | 20 +- .../github.com/coreos/go-etcd/etcd/client.go | 370 ++++++++++-------- .../coreos/go-etcd/etcd/client_test.go | 6 +- .../github.com/coreos/go-etcd/etcd/cluster.go | 51 +++ .../coreos/go-etcd/etcd/compare_and_delete.go | 34 ++ .../go-etcd/etcd/compare_and_delete_test.go | 46 +++ .../coreos/go-etcd/etcd/compare_and_swap.go | 22 +- .../go-etcd/etcd/compare_and_swap_test.go | 22 +- .../github.com/coreos/go-etcd/etcd/debug.go | 56 ++- .../github.com/coreos/go-etcd/etcd/delete.go | 47 ++- .../coreos/go-etcd/etcd/delete_test.go | 47 ++- .../github.com/coreos/go-etcd/etcd/error.go | 32 +- .../github.com/coreos/go-etcd/etcd/get.go | 38 +- .../coreos/go-etcd/etcd/get_test.go | 78 ++-- .../github.com/coreos/go-etcd/etcd/options.go | 72 ++++ .../coreos/go-etcd/etcd/requests.go | 301 ++++++-------- .../coreos/go-etcd/etcd/requests_test.go | 50 +++ .../coreos/go-etcd/etcd/response.go | 96 +++-- .../coreos/go-etcd/etcd/set_curl_chan_test.go | 15 +- .../coreos/go-etcd/etcd/set_update_create.go | 122 ++++-- .../go-etcd/etcd/set_update_create_test.go | 83 ++-- .../github.com/coreos/go-etcd/etcd/utils.go | 33 -- .../github.com/coreos/go-etcd/etcd/watch.go | 93 +++-- .../coreos/go-etcd/etcd/watch_test.go | 24 +- 29 files changed, 1222 insertions(+), 648 deletions(-) create mode 100644 third_party/github.com/coreos/go-etcd/etcd/cluster.go create mode 100644 third_party/github.com/coreos/go-etcd/etcd/compare_and_delete.go create mode 100644 third_party/github.com/coreos/go-etcd/etcd/compare_and_delete_test.go create mode 100644 third_party/github.com/coreos/go-etcd/etcd/options.go create mode 100644 third_party/github.com/coreos/go-etcd/etcd/requests_test.go delete mode 100644 third_party/github.com/coreos/go-etcd/etcd/utils.go diff --git a/api.go b/api.go index ab6bff2a..0c5c58ea 100644 --- a/api.go +++ b/api.go @@ -134,16 +134,16 @@ func NewRadioAPI(client *etcd.Client) *RadioAPI { // GetMount returns data on a specific mountpoint (returns nil if not // found). func (r *RadioAPI) GetMount(mountName string) (*Mount, error) { - response, err := r.client.Get(mountPath(mountName), false) - if err != nil { + response, err := r.client.Get(mountPath(mountName), false, false) + if err != nil || response.Node == nil { return nil, err } - if response.Dir { + if response.Node.Dir { return nil, ErrIsDirectory } var m Mount - if err := json.NewDecoder(strings.NewReader(response.Value)).Decode(&m); err != nil { + if err := json.NewDecoder(strings.NewReader(response.Node.Value)).Decode(&m); err != nil { return nil, err } return &m, nil @@ -162,27 +162,27 @@ func (r *RadioAPI) SetMount(m *Mount) error { // DelMount removes a mountpoint. func (r *RadioAPI) DelMount(mountName string) error { - _, err := r.client.Delete(mountPath(mountName)) + _, err := r.client.Delete(mountPath(mountName), false) return err } // ListMounts returns a list of all the configured mountpoints. func (r *RadioAPI) ListMounts() ([]*Mount, error) { - response, err := r.client.Get(MountPrefix, false) - if err != nil { + response, err := r.client.Get(MountPrefix, true, false) + if err != nil || response.Node == nil { return nil, err } - if !response.Dir { + if !response.Node.Dir { return nil, ErrIsFile } - result := make([]*Mount, 0, len(response.Kvs)) - for _, kv := range response.Kvs { - if kv.Dir { + result := make([]*Mount, 0, len(response.Node.Nodes)) + for _, n := range response.Node.Nodes { + if n.Dir { continue } var m Mount - if err := json.NewDecoder(strings.NewReader(kv.Value)).Decode(&m); err != nil { + if err := json.NewDecoder(strings.NewReader(n.Value)).Decode(&m); err != nil { continue } result = append(result, &m) @@ -192,27 +192,27 @@ func (r *RadioAPI) ListMounts() ([]*Mount, error) { // GetMasterAddr returns the address of the current master server. func (r *RadioAPI) GetMasterAddr() (string, error) { - response, err := r.client.Get(MasterElectionPath, false) - if err != nil { + response, err := r.client.Get(MasterElectionPath, false, false) + if err != nil || response.Node == nil { return "", err } - if response.Dir { + if response.Node.Dir { return "", ErrIsDirectory } - return response.Value, nil + return response.Node.Value, nil } // GetNodes returns the list of active cluster nodes. func (r *RadioAPI) doGetNodes() ([]*NodeStatus, error) { - response, err := r.client.Get(NodePrefix, false) - if err != nil { + response, err := r.client.Get(NodePrefix, false, false) + if err != nil || response.Node == nil { return nil, err } - if !response.Dir { + if !response.Node.Dir { return nil, ErrIsFile } - result := make([]*NodeStatus, 0, len(response.Kvs)) - for _, entry := range response.Kvs { + result := make([]*NodeStatus, 0, len(response.Node.Nodes)) + for _, entry := range response.Node.Nodes { var ns NodeStatus if err := json.NewDecoder(strings.NewReader(entry.Value)).Decode(&ns); err == nil { result = append(result, &ns) diff --git a/etcd_client.go b/etcd_client.go index 788c3109..0faa31ae 100644 --- a/etcd_client.go +++ b/etcd_client.go @@ -14,6 +14,7 @@ var ( etcdMachines = flag.String("etcd-servers", "localhost:4001", "Etcd servers (comma-separated list)") etcdCertFile = flag.String("etcd-cert", "", "SSL certificate for etcd client") etcdKeyFile = flag.String("etcd-key", "", "SSL private key for etcd client") + etcdCaFile = flag.String("etcd-ca", "", "SSL CA certificate for etcd client") ) func loadFile(path string) string { @@ -50,19 +51,28 @@ func NewEtcdClient() *etcd.Client { proto = "https" } + // Resolve etcd servers. machines := resolveAll(strings.Split(*etcdMachines, ","), proto) if len(machines) == 0 { log.Fatal("No etcd servers specified!") } log.Printf("etcd servers: %+v", machines) - c := etcd.NewClient(machines) + // Create the etcd client. + var c *etcd.Client if proto == "https" { - c.SetScheme(etcd.HTTPS) - if err := c.SetCertAndKey(loadFile(*etcdCertFile), loadFile(*etcdKeyFile)); err != nil { + var err error + c, err = etcd.NewTLSClient(machines, + loadFile(*etcdCertFile), + loadFile(*etcdKeyFile), + loadFile(*etcdCaFile)) + if err != nil { log.Fatal("Error setting up SSL for etcd client: %s", err) } + } else { + c = etcd.NewClient(machines) } + c.SetConsistency(etcd.WEAK_CONSISTENCY) return c } diff --git a/masterelection/masterelection.go b/masterelection/masterelection.go index 87316beb..deada56e 100644 --- a/masterelection/masterelection.go +++ b/masterelection/masterelection.go @@ -55,11 +55,11 @@ func (m *MasterElection) IsMaster() bool { } func (m *MasterElection) GetMasterAddr() string { - response, err := m.client.Get(m.Path, false) - if err != nil { + response, err := m.client.Get(m.Path, false, false) + if err != nil || response.Node == nil { return "" } - return response.Value + return response.Node.Value } func (m *MasterElection) setState(state int) { @@ -88,7 +88,7 @@ func (m *MasterElection) stopper() { // Remove the lock file if we are the master. if m.State == STATE_MASTER { log.Printf("releasing masterelection lock") - m.client.Delete(m.Path) + m.client.Delete(m.Path, false) } } @@ -121,7 +121,7 @@ func (m *MasterElection) runMaster(index uint64) { return } } - index = response.ModifiedIndex + index = response.EtcdIndex lastUpdate = t case <-m.stop: return @@ -134,7 +134,7 @@ func (m *MasterElection) runSlave(index uint64) { for { // Start a watch on the lock, waiting for its removal. - response, err := m.client.Watch(m.Path, index+1, nil, m.stop) + response, err := m.client.Watch(m.Path, index+1, false, nil, m.stop) if err != nil { if err != etcd.ErrWatchStoppedByUser { log.Printf("slave Watch() error: %+v", err) @@ -146,7 +146,7 @@ func (m *MasterElection) runSlave(index uint64) { return } - index = response.ModifiedIndex + index = response.EtcdIndex } } @@ -165,9 +165,9 @@ func (m *MasterElection) Run() { // RAFT index, let's optimistically query the lock // before starting just to set a baseline for the // following Watch(). - if iresponse, err := m.client.Get(m.Path, false); err == nil { + if iresponse, err := m.client.Get(m.Path, false, false); err == nil { log.Printf("lock already exists: %+v", iresponse) - watchIndex = iresponse.ModifiedIndex + watchIndex = iresponse.EtcdIndex } // Try to acquire the lock. This call will only succeed @@ -179,7 +179,7 @@ func (m *MasterElection) Run() { if err == nil { // Howdy, we're the master now. Wait a while // and renew our TTL. - m.runMaster(response.ModifiedIndex) + m.runMaster(response.EtcdIndex) } else { // We're not the master. Wait until the lock // is deleted or expires. diff --git a/node/node.go b/node/node.go index 1447b17d..8fa36f7f 100644 --- a/node/node.go +++ b/node/node.go @@ -100,11 +100,11 @@ func (w *ConfigSyncer) syncer() { case response := <-w.rch: if response.Action == "delete" { - mountName := keyToMount(response.Key) + mountName := keyToMount(response.Node.Key) log.Printf("deleted mount %s", mountName) w.config.delMount(mountName) } else if response.Action == "set" || response.Action == "create" || response.Action == "update" { - w.updateConfigWithResponse(response.Key, response.Value) + w.updateConfigWithResponse(response.Node.Key, response.Node.Value) } else { continue } @@ -113,7 +113,7 @@ func (w *ConfigSyncer) syncer() { // the Watcher dies, it knows where to start // from and we do not have to download the // full configuration again. - w.index = response.ModifiedIndex + w.index = response.EtcdIndex // Trigger an update. trigger(w.upch) @@ -142,13 +142,13 @@ func (w *ConfigSyncer) Run() { // Run until the first successful Get(). log.Printf("attempting to retrieve initial config...") for { - response, err := w.client.Get(autoradio.MountPrefix, false) - if err == nil && response.Dir { + response, err := w.client.Get(autoradio.MountPrefix, false, false) + if err == nil && response.Node != nil && response.Node.Dir { // Directly update the configuration. - for _, r := range response.Kvs { - w.updateConfigWithResponse(r.Key, r.Value) + for _, n := range response.Node.Nodes { + w.updateConfigWithResponse(n.Key, n.Value) } - w.index = response.ModifiedIndex + w.index = response.EtcdIndex break } log.Printf("Get error: %s", err) @@ -171,7 +171,7 @@ func (w *ConfigSyncer) Run() { for { curIndex := w.index + 1 log.Printf("starting watcher at index %d", curIndex) - _, err := w.client.WatchAll(autoradio.MountPrefix, curIndex, w.rch, w.stop) + _, err := w.client.Watch(autoradio.MountPrefix, curIndex, true, w.rch, w.stop) if err == etcd.ErrWatchStoppedByUser { return } else if err != nil { diff --git a/third_party/github.com/coreos/go-etcd/etcd/add_child.go b/third_party/github.com/coreos/go-etcd/etcd/add_child.go index f275599c..53af6d28 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/add_child.go +++ b/third_party/github.com/coreos/go-etcd/etcd/add_child.go @@ -2,10 +2,22 @@ package etcd // Add a new directory with a random etcd-generated key under the given path. func (c *Client) AddChildDir(key string, ttl uint64) (*Response, error) { - return c.post(key, "", ttl) + raw, err := c.post(key, "", ttl) + + if err != nil { + return nil, err + } + + return raw.toResponse() } // Add a new file with a random etcd-generated key under the given path. func (c *Client) AddChild(key string, value string, ttl uint64) (*Response, error) { - return c.post(key, value, ttl) + raw, err := c.post(key, value, ttl) + + if err != nil { + return nil, err + } + + return raw.toResponse() } diff --git a/third_party/github.com/coreos/go-etcd/etcd/add_child_test.go b/third_party/github.com/coreos/go-etcd/etcd/add_child_test.go index efe15546..26223ff1 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/add_child_test.go +++ b/third_party/github.com/coreos/go-etcd/etcd/add_child_test.go @@ -5,11 +5,11 @@ import "testing" func TestAddChild(t *testing.T) { c := NewClient(nil) defer func() { - c.DeleteAll("fooDir") - c.DeleteAll("nonexistentDir") + c.Delete("fooDir", true) + c.Delete("nonexistentDir", true) }() - c.SetDir("fooDir", 5) + c.CreateDir("fooDir", 5) _, err := c.AddChild("fooDir", "v0", 5) if err != nil { @@ -21,10 +21,10 @@ func TestAddChild(t *testing.T) { t.Fatal(err) } - resp, err := c.Get("fooDir", true) + resp, err := c.Get("fooDir", true, false) // The child with v0 should proceed the child with v1 because it's added // earlier, so it should have a lower key. - if !(len(resp.Kvs) == 2 && (resp.Kvs[0].Value == "v0" && resp.Kvs[1].Value == "v1")) { + if !(len(resp.Node.Nodes) == 2 && (resp.Node.Nodes[0].Value == "v0" && resp.Node.Nodes[1].Value == "v1")) { t.Fatalf("AddChild 1 failed. There should be two chlidren whose values are v0 and v1, respectively."+ " The response was: %#v", resp) } @@ -40,11 +40,11 @@ func TestAddChild(t *testing.T) { func TestAddChildDir(t *testing.T) { c := NewClient(nil) defer func() { - c.DeleteAll("fooDir") - c.DeleteAll("nonexistentDir") + c.Delete("fooDir", true) + c.Delete("nonexistentDir", true) }() - c.SetDir("fooDir", 5) + c.CreateDir("fooDir", 5) _, err := c.AddChildDir("fooDir", 5) if err != nil { @@ -56,10 +56,10 @@ func TestAddChildDir(t *testing.T) { t.Fatal(err) } - resp, err := c.Get("fooDir", true) + resp, err := c.Get("fooDir", true, false) // The child with v0 should proceed the child with v1 because it's added // earlier, so it should have a lower key. - if !(len(resp.Kvs) == 2 && (len(resp.Kvs[0].KVPairs) == 0 && len(resp.Kvs[1].KVPairs) == 0)) { + if !(len(resp.Node.Nodes) == 2 && (len(resp.Node.Nodes[0].Nodes) == 0 && len(resp.Node.Nodes[1].Nodes) == 0)) { t.Fatalf("AddChildDir 1 failed. There should be two chlidren whose values are v0 and v1, respectively."+ " The response was: %#v", resp) } diff --git a/third_party/github.com/coreos/go-etcd/etcd/client.go b/third_party/github.com/coreos/go-etcd/etcd/client.go index 63ce6ab7..48ba9b42 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/client.go +++ b/third_party/github.com/coreos/go-etcd/etcd/client.go @@ -2,6 +2,7 @@ package etcd import ( "crypto/tls" + "crypto/x509" "encoding/json" "errors" "io" @@ -11,16 +12,9 @@ import ( "net/url" "os" "path" - "reflect" - "strings" "time" ) -const ( - HTTP = iota - HTTPS -) - // See SetConsistency for how to use these constants. const ( // Using strings rather than iota because the consistency level @@ -30,125 +24,178 @@ const ( WEAK_CONSISTENCY = "WEAK" ) -type Cluster struct { - Leader string `json:"leader"` - Machines []string `json:"machines"` -} +const ( + defaultBufferSize = 10 +) type Config struct { CertFile string `json:"certFile"` KeyFile string `json:"keyFile"` - Scheme string `json:"scheme"` + CaCertFile []string `json:"caCertFiles"` Timeout time.Duration `json:"timeout"` Consistency string `json: "consistency"` } type Client struct { - cluster Cluster `json:"cluster"` - config Config `json:"config"` + config Config `json:"config"` + cluster *Cluster `json:"cluster"` httpClient *http.Client persistence io.Writer + cURLch chan string + keyPrefix string } -type options map[string]interface{} - -// An internally-used data structure that represents a mapping -// between valid options and their kinds -type validOptions map[string]reflect.Kind - // NewClient create a basic client that is configured to be used // with the given machine list. func NewClient(machines []string) *Client { - // if an empty slice was sent in then just assume localhost - if len(machines) == 0 { - machines = []string{"http://127.0.0.1:4001"} + config := Config{ + // default timeout is one second + Timeout: time.Second, + // default consistency level is STRONG + Consistency: STRONG_CONSISTENCY, + } + + client := &Client{ + cluster: NewCluster(machines), + config: config, + keyPrefix: path.Join(version, "keys"), } - // default leader and machines - cluster := Cluster{ - Leader: machines[0], - Machines: machines, + client.initHTTPClient() + client.saveConfig() + + return client +} + +// NewTLSClient create a basic client with TLS configuration +func NewTLSClient(machines []string, cert, key, caCert string) (*Client, error) { + // overwrite the default machine to use https + if len(machines) == 0 { + machines = []string{"https://127.0.0.1:4001"} } config := Config{ - // default use http - Scheme: "http", // default timeout is one second Timeout: time.Second, // default consistency level is STRONG Consistency: STRONG_CONSISTENCY, + CertFile: cert, + KeyFile: key, + CaCertFile: make([]string, 0), } client := &Client{ - cluster: cluster, - config: config, + cluster: NewCluster(machines), + config: config, + keyPrefix: path.Join(version, "keys"), } - err := setupHttpClient(client) + err := client.initHTTPSClient(cert, key) if err != nil { - panic(err) + return nil, err } - return client + err = client.AddRootCA(caCert) + + client.saveConfig() + + return client, nil } -// NewClientFile creates a client from a given file path. +// NewClientFromFile creates a client from a given file path. // The given file is expected to use the JSON format. -func NewClientFile(fpath string) (*Client, error) { +func NewClientFromFile(fpath string) (*Client, error) { fi, err := os.Open(fpath) if err != nil { return nil, err } + defer func() { if err := fi.Close(); err != nil { panic(err) } }() - return NewClientReader(fi) + return NewClientFromReader(fi) } -// NewClientReader creates a Client configured from a given reader. -// The config is expected to use the JSON format. -func NewClientReader(reader io.Reader) (*Client, error) { - var client Client +// NewClientFromReader creates a Client configured from a given reader. +// The configuration is expected to use the JSON format. +func NewClientFromReader(reader io.Reader) (*Client, error) { + c := new(Client) b, err := ioutil.ReadAll(reader) if err != nil { return nil, err } - err = json.Unmarshal(b, &client) + err = json.Unmarshal(b, c) if err != nil { return nil, err } + if c.config.CertFile == "" { + c.initHTTPClient() + } else { + err = c.initHTTPSClient(c.config.CertFile, c.config.KeyFile) + } - err = setupHttpClient(&client) if err != nil { return nil, err } - return &client, nil + for _, caCert := range c.config.CaCertFile { + if err := c.AddRootCA(caCert); err != nil { + return nil, err + } + } + + return c, nil } -func setupHttpClient(client *Client) error { - if client.config.CertFile != "" && client.config.KeyFile != "" { - err := client.SetCertAndKey(client.config.CertFile, client.config.KeyFile) - if err != nil { - return err - } - } else { - client.config.CertFile = "" - client.config.KeyFile = "" - tr := &http.Transport{ - Dial: dialTimeout, - TLSClientConfig: &tls.Config{ - InsecureSkipVerify: true, - }, - } - client.httpClient = &http.Client{Transport: tr} +// Override the Client's HTTP Transport object +func (c *Client) SetTransport(tr *http.Transport) { + c.httpClient.Transport = tr +} + +// SetKeyPrefix changes the key prefix from the default `/v2/keys` to whatever +// is set. +func (c *Client) SetKeyPrefix(prefix string) { + c.keyPrefix = prefix +} + +// initHTTPClient initializes a HTTP client for etcd client +func (c *Client) initHTTPClient() { + tr := &http.Transport{ + Dial: dialTimeout, + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true, + }, + } + c.httpClient = &http.Client{Transport: tr} +} + +// initHTTPClient initializes a HTTPS client for etcd client +func (c *Client) initHTTPSClient(cert, key string) error { + if cert == "" || key == "" { + return errors.New("Require both cert and key path") + } + + tlsCert, err := tls.LoadX509KeyPair(cert, key) + if err != nil { + return err + } + + tlsConfig := &tls.Config{ + Certificates: []tls.Certificate{tlsCert}, + InsecureSkipVerify: true, + } + + tr := &http.Transport{ + TLSClientConfig: tlsConfig, + Dial: dialTimeout, } + c.httpClient = &http.Client{Transport: tr} return nil } @@ -162,15 +209,15 @@ func (c *Client) SetPersistence(writer io.Writer) { // // When consistency is set to STRONG_CONSISTENCY, all requests, // including GET, are sent to the leader. This means that, assuming -// the absence of leader failures, GET requests are guranteed to see +// the absence of leader failures, GET requests are guaranteed to see // the changes made by previous requests. // // When consistency is set to WEAK_CONSISTENCY, other requests // are still sent to the leader, but GET requests are sent to a // random server from the server pool. This reduces the read -// load on the leader, but it's not guranteed that the GET requests +// load on the leader, but it's not guaranteed that the GET requests // will see changes made by previous requests (they might have not -// yet been commited on non-leader servers). +// yet been committed on non-leader servers). func (c *Client) SetConsistency(consistency string) error { if !(consistency == STRONG_CONSISTENCY || consistency == WEAK_CONSISTENCY) { return errors.New("The argument must be either STRONG_CONSISTENCY or WEAK_CONSISTENCY.") @@ -179,96 +226,45 @@ func (c *Client) SetConsistency(consistency string) error { return nil } -// MarshalJSON implements the Marshaller interface -// as defined by the standard JSON package. -func (c *Client) MarshalJSON() ([]byte, error) { - b, err := json.Marshal(struct { - Config Config `json:"config"` - Cluster Cluster `json:"cluster"` - }{ - Config: c.config, - Cluster: c.cluster, - }) - - if err != nil { - return nil, err +// AddRootCA adds a root CA cert for the etcd client +func (c *Client) AddRootCA(caCert string) error { + if c.httpClient == nil { + return errors.New("Client has not been initialized yet!") } - return b, nil -} - -// UnmarshalJSON implements the Unmarshaller interface -// as defined by the standard JSON package. -func (c *Client) UnmarshalJSON(b []byte) error { - temp := struct { - Config Config `json: "config"` - Cluster Cluster `json: "cluster"` - }{} - err := json.Unmarshal(b, &temp) + certBytes, err := ioutil.ReadFile(caCert) if err != nil { return err } - c.cluster = temp.Cluster - c.config = temp.Config - return nil -} + tr, ok := c.httpClient.Transport.(*http.Transport) -// saveConfig saves the current config using c.persistence. -func (c *Client) saveConfig() error { - if c.persistence != nil { - b, err := json.Marshal(c) - if err != nil { - return err - } - - _, err = c.persistence.Write(b) - if err != nil { - return err - } + if !ok { + panic("AddRootCA(): Transport type assert should not fail") } - return nil -} - -func (c *Client) SetCertAndKey(cert string, key string) error { - if cert != "" && key != "" { - tlsCert, err := tls.LoadX509KeyPair(cert, key) - - if err != nil { - return err - } - - tr := &http.Transport{ - TLSClientConfig: &tls.Config{ - Certificates: []tls.Certificate{tlsCert}, - InsecureSkipVerify: true, - }, - Dial: dialTimeout, + if tr.TLSClientConfig.RootCAs == nil { + caCertPool := x509.NewCertPool() + ok = caCertPool.AppendCertsFromPEM(certBytes) + if ok { + tr.TLSClientConfig.RootCAs = caCertPool } - - c.httpClient = &http.Client{Transport: tr} - c.saveConfig() - return nil + tr.TLSClientConfig.InsecureSkipVerify = false + } else { + ok = tr.TLSClientConfig.RootCAs.AppendCertsFromPEM(certBytes) } - return errors.New("Require both cert and key path") -} -func (c *Client) SetScheme(scheme int) error { - if scheme == HTTP { - c.config.Scheme = "http" - c.saveConfig() - return nil - } - if scheme == HTTPS { - c.config.Scheme = "https" - c.saveConfig() - return nil + if !ok { + err = errors.New("Unable to load caCert") } - return errors.New("Unknown Scheme") + + c.config.CaCertFile = append(c.config.CaCertFile, caCert) + c.saveConfig() + + return err } -// SetCluster updates config using the given machine list. +// SetCluster updates cluster information using the given machine list. func (c *Client) SetCluster(machines []string) bool { success := c.internalSyncCluster(machines) return success @@ -278,16 +274,15 @@ func (c *Client) GetCluster() []string { return c.cluster.Machines } -// SyncCluster updates config using the internal machine list. +// SyncCluster updates the cluster information using the internal machine list. func (c *Client) SyncCluster() bool { - success := c.internalSyncCluster(c.cluster.Machines) - return success + return c.internalSyncCluster(c.cluster.Machines) } // internalSyncCluster syncs cluster information using the given machine list. func (c *Client) internalSyncCluster(machines []string) bool { for _, machine := range machines { - httpPath := c.createHttpPath(machine, version+"/machines") + httpPath := c.createHttpPath(machine, path.Join(version, "machines")) resp, err := c.httpClient.Get(httpPath) if err != nil { // try another machine in the cluster @@ -301,12 +296,11 @@ func (c *Client) internalSyncCluster(machines []string) bool { } // update Machines List - c.cluster.Machines = strings.Split(string(b), ", ") + c.cluster.updateFromStr(string(b)) // update leader // the first one in the machine list is the leader - logger.Debugf("update.leader[%s,%s]", c.cluster.Leader, c.cluster.Machines[0]) - c.cluster.Leader = c.cluster.Machines[0] + c.cluster.switchLeader(0) logger.Debug("sync.machines ", c.cluster.Machines) c.saveConfig() @@ -319,8 +313,12 @@ func (c *Client) internalSyncCluster(machines []string) bool { // createHttpPath creates a complete HTTP URL. // serverName should contain both the host name and a port number, if any. func (c *Client) createHttpPath(serverName string, _path string) string { - u, _ := url.Parse(serverName) - u.Path = path.Join(u.Path, "/", _path) + u, err := url.Parse(serverName) + if err != nil { + panic(err) + } + + u.Path = path.Join(u.Path, _path) if u.Scheme == "" { u.Scheme = "http" @@ -333,17 +331,75 @@ func dialTimeout(network, addr string) (net.Conn, error) { return net.DialTimeout(network, addr, time.Second) } -func (c *Client) updateLeader(httpPath string) { - u, _ := url.Parse(httpPath) +func (c *Client) OpenCURL() { + c.cURLch = make(chan string, defaultBufferSize) +} - var leader string - if u.Scheme == "" { - leader = "http://" + u.Host - } else { - leader = u.Scheme + "://" + u.Host +func (c *Client) CloseCURL() { + c.cURLch = nil +} + +func (c *Client) sendCURL(command string) { + go func() { + select { + case c.cURLch <- command: + default: + } + }() +} + +func (c *Client) RecvCURL() string { + return <-c.cURLch +} + +// saveConfig saves the current config using c.persistence. +func (c *Client) saveConfig() error { + if c.persistence != nil { + b, err := json.Marshal(c) + if err != nil { + return err + } + + _, err = c.persistence.Write(b) + if err != nil { + return err + } } - logger.Debugf("update.leader[%s,%s]", c.cluster.Leader, leader) - c.cluster.Leader = leader - c.saveConfig() + return nil +} + +// MarshalJSON implements the Marshaller interface +// as defined by the standard JSON package. +func (c *Client) MarshalJSON() ([]byte, error) { + b, err := json.Marshal(struct { + Config Config `json:"config"` + Cluster *Cluster `json:"cluster"` + }{ + Config: c.config, + Cluster: c.cluster, + }) + + if err != nil { + return nil, err + } + + return b, nil +} + +// UnmarshalJSON implements the Unmarshaller interface +// as defined by the standard JSON package. +func (c *Client) UnmarshalJSON(b []byte) error { + temp := struct { + Config Config `json: "config"` + Cluster *Cluster `json: "cluster"` + }{} + err := json.Unmarshal(b, &temp) + if err != nil { + return err + } + + c.cluster = temp.Cluster + c.config = temp.Config + return nil } diff --git a/third_party/github.com/coreos/go-etcd/etcd/client_test.go b/third_party/github.com/coreos/go-etcd/etcd/client_test.go index b25611b2..c245e479 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/client_test.go +++ b/third_party/github.com/coreos/go-etcd/etcd/client_test.go @@ -14,7 +14,9 @@ import ( func TestSync(t *testing.T) { fmt.Println("Make sure there are three nodes at 0.0.0.0:4001-4003") - c := NewClient(nil) + // Explicit trailing slash to ensure this doesn't reproduce: + // https://github.com/coreos/go-etcd/issues/82 + c := NewClient([]string{"http://127.0.0.1:4001/"}) success := c.SyncCluster() if !success { @@ -79,7 +81,7 @@ func TestPersistence(t *testing.T) { t.Fatal(err) } - c2, err := NewClientFile("config.json") + c2, err := NewClientFromFile("config.json") if err != nil { t.Fatal(err) } diff --git a/third_party/github.com/coreos/go-etcd/etcd/cluster.go b/third_party/github.com/coreos/go-etcd/etcd/cluster.go new file mode 100644 index 00000000..aaa20546 --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/cluster.go @@ -0,0 +1,51 @@ +package etcd + +import ( + "net/url" + "strings" +) + +type Cluster struct { + Leader string `json:"leader"` + Machines []string `json:"machines"` +} + +func NewCluster(machines []string) *Cluster { + // if an empty slice was sent in then just assume HTTP 4001 on localhost + if len(machines) == 0 { + machines = []string{"http://127.0.0.1:4001"} + } + + // default leader and machines + return &Cluster{ + Leader: machines[0], + Machines: machines, + } +} + +// switchLeader switch the current leader to machines[num] +func (cl *Cluster) switchLeader(num int) { + logger.Debugf("switch.leader[from %v to %v]", + cl.Leader, cl.Machines[num]) + + cl.Leader = cl.Machines[num] +} + +func (cl *Cluster) updateFromStr(machines string) { + cl.Machines = strings.Split(machines, ", ") +} + +func (cl *Cluster) updateLeader(leader string) { + logger.Debugf("update.leader[%s,%s]", cl.Leader, leader) + cl.Leader = leader +} + +func (cl *Cluster) updateLeaderFromURL(u *url.URL) { + var leader string + if u.Scheme == "" { + leader = "http://" + u.Host + } else { + leader = u.Scheme + "://" + u.Host + } + cl.updateLeader(leader) +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/compare_and_delete.go b/third_party/github.com/coreos/go-etcd/etcd/compare_and_delete.go new file mode 100644 index 00000000..924778dd --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/compare_and_delete.go @@ -0,0 +1,34 @@ +package etcd + +import "fmt" + +func (c *Client) CompareAndDelete(key string, prevValue string, prevIndex uint64) (*Response, error) { + raw, err := c.RawCompareAndDelete(key, prevValue, prevIndex) + if err != nil { + return nil, err + } + + return raw.toResponse() +} + +func (c *Client) RawCompareAndDelete(key string, prevValue string, prevIndex uint64) (*RawResponse, error) { + if prevValue == "" && prevIndex == 0 { + return nil, fmt.Errorf("You must give either prevValue or prevIndex.") + } + + options := options{} + if prevValue != "" { + options["prevValue"] = prevValue + } + if prevIndex != 0 { + options["prevIndex"] = prevIndex + } + + raw, err := c.delete(key, options) + + if err != nil { + return nil, err + } + + return raw, err +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/compare_and_delete_test.go b/third_party/github.com/coreos/go-etcd/etcd/compare_and_delete_test.go new file mode 100644 index 00000000..223e50f2 --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/compare_and_delete_test.go @@ -0,0 +1,46 @@ +package etcd + +import ( + "testing" +) + +func TestCompareAndDelete(t *testing.T) { + c := NewClient(nil) + defer func() { + c.Delete("foo", true) + }() + + c.Set("foo", "bar", 5) + + // This should succeed an correct prevValue + resp, err := c.CompareAndDelete("foo", "bar", 0) + if err != nil { + t.Fatal(err) + } + if !(resp.PrevNode.Value == "bar" && resp.PrevNode.Key == "/foo" && resp.PrevNode.TTL == 5) { + t.Fatalf("CompareAndDelete 1 prevNode failed: %#v", resp) + } + + resp, _ = c.Set("foo", "bar", 5) + // This should fail because it gives an incorrect prevValue + _, err = c.CompareAndDelete("foo", "xxx", 0) + if err == nil { + t.Fatalf("CompareAndDelete 2 should have failed. The response is: %#v", resp) + } + + // This should succeed because it gives an correct prevIndex + resp, err = c.CompareAndDelete("foo", "", resp.Node.ModifiedIndex) + if err != nil { + t.Fatal(err) + } + if !(resp.PrevNode.Value == "bar" && resp.PrevNode.Key == "/foo" && resp.PrevNode.TTL == 5) { + t.Fatalf("CompareAndSwap 3 prevNode failed: %#v", resp) + } + + c.Set("foo", "bar", 5) + // This should fail because it gives an incorrect prevIndex + resp, err = c.CompareAndDelete("foo", "", 29817514) + if err == nil { + t.Fatalf("CompareAndDelete 4 should have failed. The response is: %#v", resp) + } +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/compare_and_swap.go b/third_party/github.com/coreos/go-etcd/etcd/compare_and_swap.go index 565a03ef..0beaee57 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/compare_and_swap.go +++ b/third_party/github.com/coreos/go-etcd/etcd/compare_and_swap.go @@ -2,7 +2,18 @@ package etcd import "fmt" -func (c *Client) CompareAndSwap(key string, value string, ttl uint64, prevValue string, prevIndex uint64) (*Response, error) { +func (c *Client) CompareAndSwap(key string, value string, ttl uint64, + prevValue string, prevIndex uint64) (*Response, error) { + raw, err := c.RawCompareAndSwap(key, value, ttl, prevValue, prevIndex) + if err != nil { + return nil, err + } + + return raw.toResponse() +} + +func (c *Client) RawCompareAndSwap(key string, value string, ttl uint64, + prevValue string, prevIndex uint64) (*RawResponse, error) { if prevValue == "" && prevIndex == 0 { return nil, fmt.Errorf("You must give either prevValue or prevIndex.") } @@ -14,5 +25,12 @@ func (c *Client) CompareAndSwap(key string, value string, ttl uint64, prevValue if prevIndex != 0 { options["prevIndex"] = prevIndex } - return c.put(key, value, ttl, options) + + raw, err := c.put(key, value, ttl, options) + + if err != nil { + return nil, err + } + + return raw, err } diff --git a/third_party/github.com/coreos/go-etcd/etcd/compare_and_swap_test.go b/third_party/github.com/coreos/go-etcd/etcd/compare_and_swap_test.go index bc452a91..14a1b00f 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/compare_and_swap_test.go +++ b/third_party/github.com/coreos/go-etcd/etcd/compare_and_swap_test.go @@ -7,7 +7,7 @@ import ( func TestCompareAndSwap(t *testing.T) { c := NewClient(nil) defer func() { - c.DeleteAll("foo") + c.Delete("foo", true) }() c.Set("foo", "bar", 5) @@ -17,11 +17,14 @@ func TestCompareAndSwap(t *testing.T) { if err != nil { t.Fatal(err) } - if !(resp.Value == "bar2" && resp.PrevValue == "bar" && - resp.Key == "/foo" && resp.TTL == 5) { + if !(resp.Node.Value == "bar2" && resp.Node.Key == "/foo" && resp.Node.TTL == 5) { t.Fatalf("CompareAndSwap 1 failed: %#v", resp) } + if !(resp.PrevNode.Value == "bar" && resp.PrevNode.Key == "/foo" && resp.PrevNode.TTL == 5) { + t.Fatalf("CompareAndSwap 1 prevNode failed: %#v", resp) + } + // This should fail because it gives an incorrect prevValue resp, err = c.CompareAndSwap("foo", "bar3", 5, "xxx", 0) if err == nil { @@ -34,18 +37,21 @@ func TestCompareAndSwap(t *testing.T) { } // This should succeed - resp, err = c.CompareAndSwap("foo", "bar2", 5, "", resp.ModifiedIndex) + resp, err = c.CompareAndSwap("foo", "bar2", 5, "", resp.Node.ModifiedIndex) if err != nil { t.Fatal(err) } - if !(resp.Value == "bar2" && resp.PrevValue == "bar" && - resp.Key == "/foo" && resp.TTL == 5) { - t.Fatalf("CompareAndSwap 1 failed: %#v", resp) + if !(resp.Node.Value == "bar2" && resp.Node.Key == "/foo" && resp.Node.TTL == 5) { + t.Fatalf("CompareAndSwap 3 failed: %#v", resp) + } + + if !(resp.PrevNode.Value == "bar" && resp.PrevNode.Key == "/foo" && resp.PrevNode.TTL == 5) { + t.Fatalf("CompareAndSwap 3 prevNode failed: %#v", resp) } // This should fail because it gives an incorrect prevIndex resp, err = c.CompareAndSwap("foo", "bar3", 5, "", 29817514) if err == nil { - t.Fatalf("CompareAndSwap 2 should have failed. The response is: %#v", resp) + t.Fatalf("CompareAndSwap 4 should have failed. The response is: %#v", resp) } } diff --git a/third_party/github.com/coreos/go-etcd/etcd/debug.go b/third_party/github.com/coreos/go-etcd/etcd/debug.go index bd673988..fce23f07 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/debug.go +++ b/third_party/github.com/coreos/go-etcd/etcd/debug.go @@ -1,29 +1,53 @@ package etcd import ( - "github.com/coreos/go-log/log" - "os" + "io/ioutil" + "log" + "strings" ) -var logger *log.Logger +type Logger interface { + Debug(args ...interface{}) + Debugf(fmt string, args ...interface{}) + Warning(args ...interface{}) + Warningf(fmt string, args ...interface{}) +} -func init() { - setLogger(log.PriErr) - // Uncomment the following line if you want to see lots of logs - // OpenDebug() +var logger Logger + +func SetLogger(log Logger) { + logger = log +} + +func GetLogger() Logger { + return logger +} + +type defaultLogger struct { + log *log.Logger +} + +func (p *defaultLogger) Debug(args ...interface{}) { + p.log.Println(args) } -func OpenDebug() { - setLogger(log.PriDebug) +func (p *defaultLogger) Debugf(fmt string, args ...interface{}) { + // Append newline if necessary + if !strings.HasSuffix(fmt, "\n") { + fmt = fmt + "\n" + } + p.log.Printf(fmt, args) } -func CloseDebug() { - setLogger(log.PriErr) +func (p *defaultLogger) Warning(args ...interface{}) { + p.Debug(args) } -func setLogger(priority log.Priority) { - logger = log.NewSimple( - log.PriorityFilter( - priority, - log.WriterSink(os.Stdout, log.BasicFormat, log.BasicFields))) +func (p *defaultLogger) Warningf(fmt string, args ...interface{}) { + p.Debugf(fmt, args) +} + +func init() { + // Default logger uses the go default log. + SetLogger(&defaultLogger{log.New(ioutil.Discard, "go-etcd", log.LstdFlags)}) } diff --git a/third_party/github.com/coreos/go-etcd/etcd/delete.go b/third_party/github.com/coreos/go-etcd/etcd/delete.go index 00348f6b..6c60e4df 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/delete.go +++ b/third_party/github.com/coreos/go-etcd/etcd/delete.go @@ -1,17 +1,40 @@ package etcd -// DeleteAll deletes everything under the given key. If the key -// points to a file, the file will be deleted. If the key points -// to a directory, then everything under the directory, include -// all child directories, will be deleted. -func (c *Client) DeleteAll(key string) (*Response, error) { - return c.delete(key, options{ - "recursive": true, - }) +// Delete deletes the given key. +// +// When recursive set to false, if the key points to a +// directory the method will fail. +// +// When recursive set to true, if the key points to a file, +// the file will be deleted; if the key points to a directory, +// then everything under the directory (including all child directories) +// will be deleted. +func (c *Client) Delete(key string, recursive bool) (*Response, error) { + raw, err := c.RawDelete(key, recursive, false) + + if err != nil { + return nil, err + } + + return raw.toResponse() } -// Delete deletes the given key. If the key points to a -// directory, the method will fail. -func (c *Client) Delete(key string) (*Response, error) { - return c.delete(key, nil) +// DeleteDir deletes an empty directory or a key value pair +func (c *Client) DeleteDir(key string) (*Response, error) { + raw, err := c.RawDelete(key, false, true) + + if err != nil { + return nil, err + } + + return raw.toResponse() +} + +func (c *Client) RawDelete(key string, recursive bool, dir bool) (*RawResponse, error) { + ops := options{ + "recursive": recursive, + "dir": dir, + } + + return c.delete(key, ops) } diff --git a/third_party/github.com/coreos/go-etcd/etcd/delete_test.go b/third_party/github.com/coreos/go-etcd/etcd/delete_test.go index 0f8475a2..59049715 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/delete_test.go +++ b/third_party/github.com/coreos/go-etcd/etcd/delete_test.go @@ -7,21 +7,24 @@ import ( func TestDelete(t *testing.T) { c := NewClient(nil) defer func() { - c.DeleteAll("foo") + c.Delete("foo", true) }() c.Set("foo", "bar", 5) - resp, err := c.Delete("foo") + resp, err := c.Delete("foo", false) if err != nil { t.Fatal(err) } - if !(resp.PrevValue == "bar" && resp.Value == "") { - t.Fatalf("Delete failed with %s %s", resp.PrevValue, - resp.Value) + if !(resp.Node.Value == "") { + t.Fatalf("Delete failed with %s", resp.Node.Value) } - resp, err = c.Delete("foo") + if !(resp.PrevNode.Value == "bar") { + t.Fatalf("Delete PrevNode failed with %s", resp.Node.Value) + } + + resp, err = c.Delete("foo", false) if err == nil { t.Fatalf("Delete should have failed because the key foo did not exist. "+ "The response was: %v", resp) @@ -31,32 +34,46 @@ func TestDelete(t *testing.T) { func TestDeleteAll(t *testing.T) { c := NewClient(nil) defer func() { - c.DeleteAll("foo") - c.DeleteAll("fooDir") + c.Delete("foo", true) + c.Delete("fooDir", true) }() - c.Set("foo", "bar", 5) - resp, err := c.DeleteAll("foo") + c.SetDir("foo", 5) + // test delete an empty dir + resp, err := c.DeleteDir("foo") if err != nil { t.Fatal(err) } - if !(resp.PrevValue == "bar" && resp.Value == "") { + if !(resp.Node.Value == "") { t.Fatalf("DeleteAll 1 failed: %#v", resp) } - c.SetDir("fooDir", 5) + if !(resp.PrevNode.Dir == true && resp.PrevNode.Value == "") { + t.Fatalf("DeleteAll 1 PrevNode failed: %#v", resp) + } + + c.CreateDir("fooDir", 5) c.Set("fooDir/foo", "bar", 5) - resp, err = c.DeleteAll("fooDir") + _, err = c.DeleteDir("fooDir") + if err == nil { + t.Fatal("should not able to delete a non-empty dir with deletedir") + } + + resp, err = c.Delete("fooDir", true) if err != nil { t.Fatal(err) } - if !(resp.PrevValue == "" && resp.Value == "") { + if !(resp.Node.Value == "") { t.Fatalf("DeleteAll 2 failed: %#v", resp) } - resp, err = c.DeleteAll("foo") + if !(resp.PrevNode.Dir == true && resp.PrevNode.Value == "") { + t.Fatalf("DeleteAll 2 PrevNode failed: %#v", resp) + } + + resp, err = c.Delete("foo", true) if err == nil { t.Fatalf("DeleteAll should have failed because the key foo did not exist. "+ "The response was: %v", resp) diff --git a/third_party/github.com/coreos/go-etcd/etcd/error.go b/third_party/github.com/coreos/go-etcd/etcd/error.go index 9a3268d6..7e692872 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/error.go +++ b/third_party/github.com/coreos/go-etcd/etcd/error.go @@ -5,20 +5,44 @@ import ( "fmt" ) +const ( + ErrCodeEtcdNotReachable = 501 +) + +var ( + errorMap = map[int]string{ + ErrCodeEtcdNotReachable: "All the given peers are not reachable", + } +) + type EtcdError struct { ErrorCode int `json:"errorCode"` Message string `json:"message"` Cause string `json:"cause,omitempty"` + Index uint64 `json:"index"` } func (e EtcdError) Error() string { - return fmt.Sprintf("%d: %s (%s)", e.ErrorCode, e.Message, e.Cause) + return fmt.Sprintf("%v: %v (%v) [%v]", e.ErrorCode, e.Message, e.Cause, e.Index) +} + +func newError(errorCode int, cause string, index uint64) *EtcdError { + return &EtcdError{ + ErrorCode: errorCode, + Message: errorMap[errorCode], + Cause: cause, + Index: index, + } } func handleError(b []byte) error { - var err EtcdError + etcdErr := new(EtcdError) - json.Unmarshal(b, &err) + err := json.Unmarshal(b, etcdErr) + if err != nil { + logger.Warningf("cannot unmarshal etcd error: %v", err) + return err + } - return err + return etcdErr } diff --git a/third_party/github.com/coreos/go-etcd/etcd/get.go b/third_party/github.com/coreos/go-etcd/etcd/get.go index d42a83c7..7988f1a8 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/get.go +++ b/third_party/github.com/coreos/go-etcd/etcd/get.go @@ -1,23 +1,27 @@ package etcd -// GetDir gets the all contents under the given key. -// If the key points to a file, the file is returned. -// If the key points to a directory, everything under it is returnd, -// including all contents under all child directories. -func (c *Client) GetAll(key string, sort bool) (*Response, error) { - return c.get(key, options{ - "recursive": true, - "sorted": sort, - }) -} - // Get gets the file or directory associated with the given key. // If the key points to a directory, files and directories under // it will be returned in sorted or unsorted order, depending on -// the sort flag. Note that contents under child directories -// will not be returned. To get those contents, use GetAll. -func (c *Client) Get(key string, sort bool) (*Response, error) { - return c.get(key, options{ - "sorted": sort, - }) +// the sort flag. +// If recursive is set to false, contents under child directories +// will not be returned. +// If recursive is set to true, all the contents will be returned. +func (c *Client) Get(key string, sort, recursive bool) (*Response, error) { + raw, err := c.RawGet(key, sort, recursive) + + if err != nil { + return nil, err + } + + return raw.toResponse() +} + +func (c *Client) RawGet(key string, sort, recursive bool) (*RawResponse, error) { + ops := options{ + "recursive": recursive, + "sorted": sort, + } + + return c.get(key, ops) } diff --git a/third_party/github.com/coreos/go-etcd/etcd/get_test.go b/third_party/github.com/coreos/go-etcd/etcd/get_test.go index a34946c7..eccae189 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/get_test.go +++ b/third_party/github.com/coreos/go-etcd/etcd/get_test.go @@ -5,25 +5,45 @@ import ( "testing" ) +// cleanNode scrubs Expiration, ModifiedIndex and CreatedIndex of a node. +func cleanNode(n *Node) { + n.Expiration = nil + n.ModifiedIndex = 0 + n.CreatedIndex = 0 +} + +// cleanResult scrubs a result object two levels deep of Expiration, +// ModifiedIndex and CreatedIndex. +func cleanResult(result *Response) { + // TODO(philips): make this recursive. + cleanNode(result.Node) + for i, _ := range result.Node.Nodes { + cleanNode(&result.Node.Nodes[i]) + for j, _ := range result.Node.Nodes[i].Nodes { + cleanNode(&result.Node.Nodes[i].Nodes[j]) + } + } +} + func TestGet(t *testing.T) { c := NewClient(nil) defer func() { - c.DeleteAll("foo") + c.Delete("foo", true) }() c.Set("foo", "bar", 5) - result, err := c.Get("foo", false) + result, err := c.Get("foo", false, false) if err != nil { t.Fatal(err) } - if result.Key != "/foo" || result.Value != "bar" { - t.Fatalf("Get failed with %s %s %v", result.Key, result.Value, result.TTL) + if result.Node.Key != "/foo" || result.Node.Value != "bar" { + t.Fatalf("Get failed with %s %s %v", result.Node.Key, result.Node.Value, result.Node.TTL) } - result, err = c.Get("goo", false) + result, err = c.Get("goo", false, false) if err == nil { t.Fatalf("should not be able to get non-exist key") } @@ -32,68 +52,80 @@ func TestGet(t *testing.T) { func TestGetAll(t *testing.T) { c := NewClient(nil) defer func() { - c.DeleteAll("fooDir") + c.Delete("fooDir", true) }() - c.SetDir("fooDir", 5) + c.CreateDir("fooDir", 5) c.Set("fooDir/k0", "v0", 5) c.Set("fooDir/k1", "v1", 5) // Return kv-pairs in sorted order - result, err := c.Get("fooDir", true) + result, err := c.Get("fooDir", true, false) if err != nil { t.Fatal(err) } - expected := kvPairs{ - KeyValuePair{ + expected := Nodes{ + Node{ Key: "/fooDir/k0", Value: "v0", + TTL: 5, }, - KeyValuePair{ + Node{ Key: "/fooDir/k1", Value: "v1", + TTL: 5, }, } - if !reflect.DeepEqual(result.Kvs, expected) { - t.Fatalf("(actual) %v != (expected) %v", result.Kvs, expected) + cleanResult(result) + + if !reflect.DeepEqual(result.Node.Nodes, expected) { + t.Fatalf("(actual) %v != (expected) %v", result.Node.Nodes, expected) } // Test the `recursive` option - c.SetDir("fooDir/childDir", 5) + c.CreateDir("fooDir/childDir", 5) c.Set("fooDir/childDir/k2", "v2", 5) // Return kv-pairs in sorted order - result, err = c.GetAll("fooDir", true) + result, err = c.Get("fooDir", true, true) + + cleanResult(result) if err != nil { t.Fatal(err) } - expected = kvPairs{ - KeyValuePair{ + expected = Nodes{ + Node{ Key: "/fooDir/childDir", Dir: true, - KVPairs: kvPairs{ - KeyValuePair{ + Nodes: Nodes{ + Node{ Key: "/fooDir/childDir/k2", Value: "v2", + TTL: 5, }, }, + TTL: 5, }, - KeyValuePair{ + Node{ Key: "/fooDir/k0", Value: "v0", + TTL: 5, }, - KeyValuePair{ + Node{ Key: "/fooDir/k1", Value: "v1", + TTL: 5, }, } - if !reflect.DeepEqual(result.Kvs, expected) { - t.Fatalf("(actual) %v != (expected) %v", result.Kvs) + cleanResult(result) + + if !reflect.DeepEqual(result.Node.Nodes, expected) { + t.Fatalf("(actual) %v != (expected) %v", result.Node.Nodes, expected) } } diff --git a/third_party/github.com/coreos/go-etcd/etcd/options.go b/third_party/github.com/coreos/go-etcd/etcd/options.go new file mode 100644 index 00000000..335a0c21 --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/options.go @@ -0,0 +1,72 @@ +package etcd + +import ( + "fmt" + "net/url" + "reflect" +) + +type options map[string]interface{} + +// An internally-used data structure that represents a mapping +// between valid options and their kinds +type validOptions map[string]reflect.Kind + +// Valid options for GET, PUT, POST, DELETE +// Using CAPITALIZED_UNDERSCORE to emphasize that these +// values are meant to be used as constants. +var ( + VALID_GET_OPTIONS = validOptions{ + "recursive": reflect.Bool, + "consistent": reflect.Bool, + "sorted": reflect.Bool, + "wait": reflect.Bool, + "waitIndex": reflect.Uint64, + } + + VALID_PUT_OPTIONS = validOptions{ + "prevValue": reflect.String, + "prevIndex": reflect.Uint64, + "prevExist": reflect.Bool, + "dir": reflect.Bool, + } + + VALID_POST_OPTIONS = validOptions{} + + VALID_DELETE_OPTIONS = validOptions{ + "recursive": reflect.Bool, + "dir": reflect.Bool, + "prevValue": reflect.String, + "prevIndex": reflect.Uint64, + } +) + +// Convert options to a string of HTML parameters +func (ops options) toParameters(validOps validOptions) (string, error) { + p := "?" + values := url.Values{} + + if ops == nil { + return "", nil + } + + for k, v := range ops { + // Check if the given option is valid (that it exists) + kind := validOps[k] + if kind == reflect.Invalid { + return "", fmt.Errorf("Invalid option: %v", k) + } + + // Check if the given option is of the valid type + t := reflect.TypeOf(v) + if kind != t.Kind() { + return "", fmt.Errorf("Option %s should be of %v kind, not of %v kind.", + k, kind, t.Kind()) + } + + values.Set(k, fmt.Sprintf("%v", v)) + } + + p += values.Encode() + return p, nil +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/requests.go b/third_party/github.com/coreos/go-etcd/etcd/requests.go index 83e3b519..c16f7d46 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/requests.go +++ b/third_party/github.com/coreos/go-etcd/etcd/requests.go @@ -1,71 +1,30 @@ package etcd import ( - "encoding/json" - "errors" "fmt" "io/ioutil" "math/rand" "net/http" "net/url" "path" - "reflect" "strings" "time" ) -// Valid options for GET, PUT, POST, DELETE -// Using CAPITALIZED_UNDERSCORE to emphasize that these -// values are meant to be used as constants. -var ( - VALID_GET_OPTIONS = validOptions{ - "recursive": reflect.Bool, - "consistent": reflect.Bool, - "sorted": reflect.Bool, - "wait": reflect.Bool, - "waitIndex": reflect.Uint64, - } - - VALID_PUT_OPTIONS = validOptions{ - "prevValue": reflect.String, - "prevIndex": reflect.Uint64, - "prevExist": reflect.Bool, - } - - VALID_POST_OPTIONS = validOptions{} - - VALID_DELETE_OPTIONS = validOptions{ - "recursive": reflect.Bool, - } - - curlChan chan string -) - -// SetCurlChan sets a channel to which cURL commands which can be used to -// re-produce requests are sent. This is useful for debugging. -func SetCurlChan(c chan string) { - curlChan = c -} - // get issues a GET request -func (c *Client) get(key string, options options) (*Response, error) { - logger.Debugf("get %s [%s]", key, c.cluster.Leader) - - p := path.Join("keys", key) +func (c *Client) get(key string, options options) (*RawResponse, error) { // If consistency level is set to STRONG, append // the `consistent` query string. if c.config.Consistency == STRONG_CONSISTENCY { options["consistent"] = true } - if options != nil { - str, err := optionsToString(options, VALID_GET_OPTIONS) - if err != nil { - return nil, err - } - p += str + + str, err := options.toParameters(VALID_GET_OPTIONS) + if err != nil { + return nil, err } - resp, err := c.sendRequest("GET", p, url.Values{}) + resp, err := c.sendKeyRequest("GET", key, str, nil) if err != nil { return nil, err @@ -75,28 +34,15 @@ func (c *Client) get(key string, options options) (*Response, error) { } // put issues a PUT request -func (c *Client) put(key string, value string, ttl uint64, options options) (*Response, error) { - logger.Debugf("put %s, %s, ttl: %d, [%s]", key, value, ttl, c.cluster.Leader) - v := url.Values{} - - if value != "" { - v.Set("value", value) - } +func (c *Client) put(key string, value string, ttl uint64, + options options) (*RawResponse, error) { - if ttl > 0 { - v.Set("ttl", fmt.Sprintf("%v", ttl)) - } - - p := path.Join("keys", key) - if options != nil { - str, err := optionsToString(options, VALID_PUT_OPTIONS) - if err != nil { - return nil, err - } - p += str + str, err := options.toParameters(VALID_PUT_OPTIONS) + if err != nil { + return nil, err } - resp, err := c.sendRequest("PUT", p, v) + resp, err := c.sendKeyRequest("PUT", key, str, buildValues(value, ttl)) if err != nil { return nil, err @@ -106,19 +52,8 @@ func (c *Client) put(key string, value string, ttl uint64, options options) (*Re } // post issues a POST request -func (c *Client) post(key string, value string, ttl uint64) (*Response, error) { - logger.Debugf("post %s, %s, ttl: %d, [%s]", key, value, ttl, c.cluster.Leader) - v := url.Values{} - - if value != "" { - v.Set("value", value) - } - - if ttl > 0 { - v.Set("ttl", fmt.Sprintf("%v", ttl)) - } - - resp, err := c.sendRequest("POST", path.Join("keys", key), v) +func (c *Client) post(key string, value string, ttl uint64) (*RawResponse, error) { + resp, err := c.sendKeyRequest("POST", key, "", buildValues(value, ttl)) if err != nil { return nil, err @@ -128,20 +63,13 @@ func (c *Client) post(key string, value string, ttl uint64) (*Response, error) { } // delete issues a DELETE request -func (c *Client) delete(key string, options options) (*Response, error) { - logger.Debugf("delete %s [%s]", key, c.cluster.Leader) - v := url.Values{} - - p := path.Join("keys", key) - if options != nil { - str, err := optionsToString(options, VALID_DELETE_OPTIONS) - if err != nil { - return nil, err - } - p += str +func (c *Client) delete(key string, options options) (*RawResponse, error) { + str, err := options.toParameters(VALID_DELETE_OPTIONS) + if err != nil { + return nil, err } - resp, err := c.sendRequest("DELETE", p, v) + resp, err := c.sendKeyRequest("DELETE", key, str, nil) if err != nil { return nil, err @@ -150,141 +78,172 @@ func (c *Client) delete(key string, options options) (*Response, error) { return resp, nil } -// sendRequest sends a HTTP request and returns a Response as defined by etcd -func (c *Client) sendRequest(method string, _path string, values url.Values) (*Response, error) { - var body string = values.Encode() - var resp *http.Response +// sendKeyRequest sends a HTTP request and returns a Response as defined by etcd +func (c *Client) sendKeyRequest(method string, key string, params string, + values url.Values) (*RawResponse, error) { + var req *http.Request + var resp *http.Response + var httpPath string + var err error + var b []byte + + trial := 0 + + logger.Debugf("%s %s %s [%s]", method, key, params, c.cluster.Leader) + + // Build the request path if no prefix exists + relativePath := path.Join(c.keyPrefix, key) + params - retry := 0 // if we connect to a follower, we will retry until we found a leader for { - var httpPath string - - // If _path has schema already, then it's assumed to be - // a complete URL and therefore needs no further processing. - u, err := url.Parse(_path) - if err != nil { - return nil, err + trial++ + logger.Debug("begin trail ", trial) + if trial > 2*len(c.cluster.Machines) { + return nil, newError(ErrCodeEtcdNotReachable, + "Tried to connect to each peer twice and failed", 0) } - if u.Scheme != "" { - httpPath = _path + if method == "GET" && c.config.Consistency == WEAK_CONSISTENCY { + // If it's a GET and consistency level is set to WEAK, + // then use a random machine. + httpPath = c.getHttpPath(true, relativePath) } else { - if method == "GET" && c.config.Consistency == WEAK_CONSISTENCY { - // If it's a GET and consistency level is set to WEAK, - // then use a random machine. - httpPath = c.getHttpPath(true, _path) - } else { - // Else use the leader. - httpPath = c.getHttpPath(false, _path) - } + // Else use the leader. + httpPath = c.getHttpPath(false, relativePath) } // Return a cURL command if curlChan is set - if curlChan != nil { + if c.cURLch != nil { command := fmt.Sprintf("curl -X %s %s", method, httpPath) for key, value := range values { command += fmt.Sprintf(" -d %s=%s", key, value[0]) } - curlChan <- command + c.sendCURL(command) } logger.Debug("send.request.to ", httpPath, " | method ", method) - if body == "" { + if values == nil { req, _ = http.NewRequest(method, httpPath, nil) - } else { - req, _ = http.NewRequest(method, httpPath, strings.NewReader(body)) - req.Header.Set("Content-Type", "application/x-www-form-urlencoded; param=value") - } + req, _ = http.NewRequest(method, httpPath, + strings.NewReader(values.Encode())) - resp, err = c.httpClient.Do(req) + req.Header.Set("Content-Type", + "application/x-www-form-urlencoded; param=value") + } - logger.Debug("recv.response.from ", httpPath) // network error, change a machine! - if err != nil { - retry++ - if retry > 2*len(c.cluster.Machines) { - return nil, errors.New("Cannot reach servers") - } - num := retry % len(c.cluster.Machines) - logger.Debug("update.leader[", c.cluster.Leader, ",", c.cluster.Machines[num], "]") - c.cluster.Leader = c.cluster.Machines[num] + if resp, err = c.httpClient.Do(req); err != nil { + logger.Debug("network error: ", err.Error()) + c.cluster.switchLeader(trial % len(c.cluster.Machines)) time.Sleep(time.Millisecond * 200) continue } if resp != nil { - if resp.StatusCode == http.StatusTemporaryRedirect { - httpPath := resp.Header.Get("Location") - - resp.Body.Close() + logger.Debug("recv.response.from ", httpPath) - if httpPath == "" { - return nil, errors.New("Cannot get redirection location") - } + var ok bool + ok, b = c.handleResp(resp) - c.updateLeader(httpPath) - logger.Debug("send.redirect") - // try to connect the leader + if !ok { continue - } else if resp.StatusCode == http.StatusInternalServerError { - resp.Body.Close() - - retry++ - if retry > 2*len(c.cluster.Machines) { - return nil, errors.New("Cannot reach servers") - } - continue - } else { - logger.Debug("send.return.response ", httpPath) - break } + logger.Debug("recv.success.", httpPath) + break } - logger.Debug("error.from ", httpPath, " ", err.Error()) + + // should not reach here + // err and resp should not be nil at the same time + logger.Debug("error.from ", httpPath) return nil, err } - // Convert HTTP response to etcd response - b, err := ioutil.ReadAll(resp.Body) + r := &RawResponse{ + StatusCode: resp.StatusCode, + Body: b, + Header: resp.Header, + } - resp.Body.Close() + return r, nil +} - if err != nil { - return nil, err - } +// handleResp handles the responses from the etcd server +// If status code is OK, read the http body and return it as byte array +// If status code is TemporaryRedirect, update leader. +// If status code is InternalServerError, sleep for 200ms. +func (c *Client) handleResp(resp *http.Response) (bool, []byte) { + defer resp.Body.Close() - if !(resp.StatusCode == http.StatusOK || - resp.StatusCode == http.StatusCreated) { - return nil, handleError(b) - } + code := resp.StatusCode + + if code == http.StatusTemporaryRedirect { + u, err := resp.Location() + + if err != nil { + logger.Warning(err) + } else { + c.cluster.updateLeaderFromURL(u) + } - var result Response + return false, nil - err = json.Unmarshal(b, &result) + } else if code == http.StatusInternalServerError { + time.Sleep(time.Millisecond * 200) - if err != nil { - return nil, err + } else if validHttpStatusCode[code] { + b, err := ioutil.ReadAll(resp.Body) + + if err != nil { + return false, nil + } + + return true, b } - return &result, nil + logger.Warning("bad status code ", resp.StatusCode) + return false, nil } func (c *Client) getHttpPath(random bool, s ...string) string { var machine string + if random { machine = c.cluster.Machines[rand.Intn(len(c.cluster.Machines))] } else { machine = c.cluster.Leader } - fullPath := machine + "/" + version - for _, seg := range s { - fullPath = fullPath + "/" + seg + return machine + "/" + strings.Join(s, "/") +} + +// buildValues builds a url.Values map according to the given value and ttl +func buildValues(value string, ttl uint64) url.Values { + v := url.Values{} + + if value != "" { + v.Set("value", value) + } + + if ttl > 0 { + v.Set("ttl", fmt.Sprintf("%v", ttl)) + } + + return v +} + +// convert key string to http path exclude version +// for example: key[foo] -> path[foo] +// key[] -> path[/] +func keyToPath(key string) string { + clean := path.Clean(key) + + if clean == "" || clean == "." { + return "/" } - return fullPath + return clean } diff --git a/third_party/github.com/coreos/go-etcd/etcd/requests_test.go b/third_party/github.com/coreos/go-etcd/etcd/requests_test.go new file mode 100644 index 00000000..a7160c12 --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/requests_test.go @@ -0,0 +1,50 @@ +package etcd + +import ( + "path" + "testing" +) + +func testKey(t *testing.T, in, exp string) { + if keyToPath(in) != exp { + t.Errorf("Expected %s got %s", exp, keyToPath(in)) + } +} + +// TestKeyToPath ensures the key cleaning funciton keyToPath works in a number +// of cases. +func TestKeyToPath(t *testing.T) { + testKey(t, "", "/") + testKey(t, "/", "/") + testKey(t, "///", "/") + testKey(t, "hello/world/", "hello/world") + testKey(t, "///hello////world/../", "/hello") +} + +func testPath(t *testing.T, c *Client, in, exp string) { + out := c.getHttpPath(false, in) + + if out != exp { + t.Errorf("Expected %s got %s", exp, out) + } +} + +// TestHttpPath ensures that the URLs generated make sense for the given keys +func TestHttpPath(t *testing.T) { + c := NewClient(nil) + + testPath(t, c, + path.Join(c.keyPrefix, "hello") + "?prevInit=true", + "http://127.0.0.1:4001/v2/keys/hello?prevInit=true") + + testPath(t, c, + path.Join(c.keyPrefix, "///hello///world") + "?prevInit=true", + "http://127.0.0.1:4001/v2/keys/hello/world?prevInit=true") + + c = NewClient([]string{"https://discovery.etcd.io"}) + c.SetKeyPrefix("") + + testPath(t, c, + path.Join(c.keyPrefix, "hello") + "?prevInit=true", + "https://discovery.etcd.io/hello?prevInit=true") +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/response.go b/third_party/github.com/coreos/go-etcd/etcd/response.go index d05b8f45..a72c2d8d 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/response.go +++ b/third_party/github.com/coreos/go-etcd/etcd/response.go @@ -1,50 +1,88 @@ package etcd import ( + "encoding/json" + "net/http" + "strconv" "time" ) -// The response object from the server. -type Response struct { - Action string `json:"action"` - Key string `json:"key"` - Dir bool `json:"dir,omitempty"` - PrevValue string `json:"prevValue,omitempty"` - Value string `json:"value,omitempty"` - Kvs kvPairs `json:"kvs,omitempty"` +const ( + rawResponse = iota + normalResponse +) + +type responseType int + +type RawResponse struct { + StatusCode int + Body []byte + Header http.Header +} + +var ( + validHttpStatusCode = map[int]bool{ + http.StatusCreated: true, + http.StatusOK: true, + http.StatusBadRequest: true, + http.StatusNotFound: true, + http.StatusPreconditionFailed: true, + http.StatusForbidden: true, + } +) + +func (rr *RawResponse) toResponse() (*Response, error) { + if rr.StatusCode != http.StatusOK && rr.StatusCode != http.StatusCreated { + return nil, handleError(rr.Body) + } + + resp := new(Response) - // If the key did not exist before the action, - // this field should be set to true - NewKey bool `json:"newKey,omitempty"` + err := json.Unmarshal(rr.Body, resp) - Expiration *time.Time `json:"expiration,omitempty"` + if err != nil { + return nil, err + } - // Time to live in second - TTL int64 `json:"ttl,omitempty"` + // attach index and term to response + resp.EtcdIndex, _ = strconv.ParseUint(rr.Header.Get("X-Etcd-Index"), 10, 64) + resp.RaftIndex, _ = strconv.ParseUint(rr.Header.Get("X-Raft-Index"), 10, 64) + resp.RaftTerm, _ = strconv.ParseUint(rr.Header.Get("X-Raft-Term"), 10, 64) - // The command index of the raft machine when the command is executed - ModifiedIndex uint64 `json:"modifiedIndex"` + return resp, nil +} + +type Response struct { + Action string `json:"action"` + Node *Node `json:"node"` + PrevNode *Node `json:"prevNode,omitempty"` + EtcdIndex uint64 `json:"etcdIndex"` + RaftIndex uint64 `json:"raftIndex"` + RaftTerm uint64 `json:"raftTerm"` } -// When user list a directory, we add all the node into key-value pair slice -type KeyValuePair struct { - Key string `json:"key, omitempty"` - Value string `json:"value,omitempty"` - Dir bool `json:"dir,omitempty"` - KVPairs kvPairs `json:"kvs,omitempty"` +type Node struct { + Key string `json:"key, omitempty"` + Value string `json:"value,omitempty"` + Dir bool `json:"dir,omitempty"` + Expiration *time.Time `json:"expiration,omitempty"` + TTL int64 `json:"ttl,omitempty"` + Nodes Nodes `json:"nodes,omitempty"` + ModifiedIndex uint64 `json:"modifiedIndex,omitempty"` + CreatedIndex uint64 `json:"createdIndex,omitempty"` } -type kvPairs []KeyValuePair +type Nodes []Node // interfaces for sorting -func (kvs kvPairs) Len() int { - return len(kvs) +func (ns Nodes) Len() int { + return len(ns) } -func (kvs kvPairs) Less(i, j int) bool { - return kvs[i].Key < kvs[j].Key +func (ns Nodes) Less(i, j int) bool { + return ns[i].Key < ns[j].Key } -func (kvs kvPairs) Swap(i, j int) { - kvs[i], kvs[j] = kvs[j], kvs[i] +func (ns Nodes) Swap(i, j int) { + ns[i], ns[j] = ns[j], ns[i] } diff --git a/third_party/github.com/coreos/go-etcd/etcd/set_curl_chan_test.go b/third_party/github.com/coreos/go-etcd/etcd/set_curl_chan_test.go index 6d063310..756e3178 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/set_curl_chan_test.go +++ b/third_party/github.com/coreos/go-etcd/etcd/set_curl_chan_test.go @@ -7,13 +7,12 @@ import ( func TestSetCurlChan(t *testing.T) { c := NewClient(nil) + c.OpenCURL() + defer func() { - c.DeleteAll("foo") + c.Delete("foo", true) }() - curlChan := make(chan string, 1) - SetCurlChan(curlChan) - _, err := c.Set("foo", "bar", 5) if err != nil { t.Fatal(err) @@ -21,21 +20,21 @@ func TestSetCurlChan(t *testing.T) { expected := fmt.Sprintf("curl -X PUT %s/v2/keys/foo -d value=bar -d ttl=5", c.cluster.Leader) - actual := <-curlChan + actual := c.RecvCURL() if expected != actual { t.Fatalf(`Command "%s" is not equal to expected value "%s"`, actual, expected) } c.SetConsistency(STRONG_CONSISTENCY) - _, err = c.Get("foo", false) + _, err = c.Get("foo", false, false) if err != nil { t.Fatal(err) } - expected = fmt.Sprintf("curl -X GET %s/v2/keys/foo?consistent=true&sorted=false", + expected = fmt.Sprintf("curl -X GET %s/v2/keys/foo?consistent=true&recursive=false&sorted=false", c.cluster.Leader) - actual = <-curlChan + actual = c.RecvCURL() if expected != actual { t.Fatalf(`Command "%s" is not equal to expected value "%s"`, actual, expected) diff --git a/third_party/github.com/coreos/go-etcd/etcd/set_update_create.go b/third_party/github.com/coreos/go-etcd/etcd/set_update_create.go index 281cd577..ab420d8f 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/set_update_create.go +++ b/third_party/github.com/coreos/go-etcd/etcd/set_update_create.go @@ -1,43 +1,121 @@ package etcd -// SetDir sets the given key to a directory. +// Set sets the given key to the given value. +// It will create a new key value pair or replace the old one. +// It will not replace a existing directory. +func (c *Client) Set(key string, value string, ttl uint64) (*Response, error) { + raw, err := c.RawSet(key, value, ttl) + + if err != nil { + return nil, err + } + + return raw.toResponse() +} + +// Set sets the given key to a directory. +// It will create a new directory or replace the old key value pair by a directory. +// It will not replace a existing directory. func (c *Client) SetDir(key string, ttl uint64) (*Response, error) { - return c.put(key, "", ttl, nil) + raw, err := c.RawSetDir(key, ttl) + + if err != nil { + return nil, err + } + + return raw.toResponse() } -// UpdateDir updates the given key to a directory. It succeeds only if the +// CreateDir creates a directory. It succeeds only if +// the given key does not yet exist. +func (c *Client) CreateDir(key string, ttl uint64) (*Response, error) { + raw, err := c.RawCreateDir(key, ttl) + + if err != nil { + return nil, err + } + + return raw.toResponse() +} + +// UpdateDir updates the given directory. It succeeds only if the // given key already exists. func (c *Client) UpdateDir(key string, ttl uint64) (*Response, error) { - return c.put(key, "", ttl, options{ + raw, err := c.RawUpdateDir(key, ttl) + + if err != nil { + return nil, err + } + + return raw.toResponse() +} + +// Create creates a file with the given value under the given key. It succeeds +// only if the given key does not yet exist. +func (c *Client) Create(key string, value string, ttl uint64) (*Response, error) { + raw, err := c.RawCreate(key, value, ttl) + + if err != nil { + return nil, err + } + + return raw.toResponse() +} + +// Update updates the given key to the given value. It succeeds only if the +// given key already exists. +func (c *Client) Update(key string, value string, ttl uint64) (*Response, error) { + raw, err := c.RawUpdate(key, value, ttl) + + if err != nil { + return nil, err + } + + return raw.toResponse() +} + +func (c *Client) RawUpdateDir(key string, ttl uint64) (*RawResponse, error) { + ops := options{ "prevExist": true, - }) + "dir": true, + } + + return c.put(key, "", ttl, ops) } -// UpdateDir creates a directory under the given key. It succeeds only if -// the given key does not yet exist. -func (c *Client) CreateDir(key string, ttl uint64) (*Response, error) { - return c.put(key, "", ttl, options{ +func (c *Client) RawCreateDir(key string, ttl uint64) (*RawResponse, error) { + ops := options{ "prevExist": false, - }) + "dir": true, + } + + return c.put(key, "", ttl, ops) } -// Set sets the given key to the given value. -func (c *Client) Set(key string, value string, ttl uint64) (*Response, error) { +func (c *Client) RawSet(key string, value string, ttl uint64) (*RawResponse, error) { return c.put(key, value, ttl, nil) } -// Update updates the given key to the given value. It succeeds only if the -// given key already exists. -func (c *Client) Update(key string, value string, ttl uint64) (*Response, error) { - return c.put(key, value, ttl, options{ +func (c *Client) RawSetDir(key string, ttl uint64) (*RawResponse, error) { + ops := options{ + "dir": true, + } + + return c.put(key, "", ttl, ops) +} + +func (c *Client) RawUpdate(key string, value string, ttl uint64) (*RawResponse, error) { + ops := options{ "prevExist": true, - }) + } + + return c.put(key, value, ttl, ops) } -// Create creates a file with the given value under the given key. It succeeds -// only if the given key does not yet exist. -func (c *Client) Create(key string, value string, ttl uint64) (*Response, error) { - return c.put(key, value, ttl, options{ +func (c *Client) RawCreate(key string, value string, ttl uint64) (*RawResponse, error) { + ops := options{ "prevExist": false, - }) + } + + return c.put(key, value, ttl, ops) } diff --git a/third_party/github.com/coreos/go-etcd/etcd/set_update_create_test.go b/third_party/github.com/coreos/go-etcd/etcd/set_update_create_test.go index 6f27fdfa..2003c59b 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/set_update_create_test.go +++ b/third_party/github.com/coreos/go-etcd/etcd/set_update_create_test.go @@ -7,36 +7,41 @@ import ( func TestSet(t *testing.T) { c := NewClient(nil) defer func() { - c.DeleteAll("foo") + c.Delete("foo", true) }() resp, err := c.Set("foo", "bar", 5) if err != nil { t.Fatal(err) } - if resp.Key != "/foo" || resp.Value != "bar" || resp.TTL != 5 { + if resp.Node.Key != "/foo" || resp.Node.Value != "bar" || resp.Node.TTL != 5 { t.Fatalf("Set 1 failed: %#v", resp) } + if resp.PrevNode != nil { + t.Fatalf("Set 1 PrevNode failed: %#v", resp) + } resp, err = c.Set("foo", "bar2", 5) if err != nil { t.Fatal(err) } - if !(resp.Key == "/foo" && resp.Value == "bar2" && - resp.PrevValue == "bar" && resp.TTL == 5) { + if !(resp.Node.Key == "/foo" && resp.Node.Value == "bar2" && resp.Node.TTL == 5) { t.Fatalf("Set 2 failed: %#v", resp) } + if resp.PrevNode.Key != "/foo" || resp.PrevNode.Value != "bar" || resp.Node.TTL != 5 { + t.Fatalf("Set 2 PrevNode failed: %#v", resp) + } } func TestUpdate(t *testing.T) { c := NewClient(nil) defer func() { - c.DeleteAll("foo") - c.DeleteAll("nonexistent") + c.Delete("foo", true) + c.Delete("nonexistent", true) }() resp, err := c.Set("foo", "bar", 5) - t.Logf("%#v", resp) + if err != nil { t.Fatal(err) } @@ -47,23 +52,25 @@ func TestUpdate(t *testing.T) { t.Fatal(err) } - if !(resp.Action == "update" && resp.Key == "/foo" && - resp.PrevValue == "bar" && resp.TTL == 5) { + if !(resp.Action == "update" && resp.Node.Key == "/foo" && resp.Node.TTL == 5) { t.Fatalf("Update 1 failed: %#v", resp) } + if !(resp.PrevNode.Key == "/foo" && resp.PrevNode.Value == "bar" && resp.Node.TTL == 5) { + t.Fatalf("Update 1 prevValue failed: %#v", resp) + } // This should fail because the key does not exist. resp, err = c.Update("nonexistent", "whatever", 5) if err == nil { t.Fatalf("The key %v did not exist, so the update should have failed."+ - "The response was: %#v", resp.Key, resp) + "The response was: %#v", resp.Node.Key, resp) } } func TestCreate(t *testing.T) { c := NewClient(nil) defer func() { - c.DeleteAll("newKey") + c.Delete("newKey", true) }() newKey := "/newKey" @@ -75,36 +82,42 @@ func TestCreate(t *testing.T) { t.Fatal(err) } - if !(resp.Action == "create" && resp.Key == newKey && - resp.Value == newValue && resp.PrevValue == "" && resp.TTL == 5) { + if !(resp.Action == "create" && resp.Node.Key == newKey && + resp.Node.Value == newValue && resp.Node.TTL == 5) { t.Fatalf("Create 1 failed: %#v", resp) } + if resp.PrevNode != nil { + t.Fatalf("Create 1 PrevNode failed: %#v", resp) + } // This should fail, because the key is already there resp, err = c.Create(newKey, newValue, 5) if err == nil { t.Fatalf("The key %v did exist, so the creation should have failed."+ - "The response was: %#v", resp.Key, resp) + "The response was: %#v", resp.Node.Key, resp) } } func TestSetDir(t *testing.T) { c := NewClient(nil) defer func() { - c.DeleteAll("foo") - c.DeleteAll("fooDir") + c.Delete("foo", true) + c.Delete("fooDir", true) }() - resp, err := c.SetDir("fooDir", 5) + resp, err := c.CreateDir("fooDir", 5) if err != nil { t.Fatal(err) } - if !(resp.Key == "/fooDir" && resp.Value == "" && resp.TTL == 5) { + if !(resp.Node.Key == "/fooDir" && resp.Node.Value == "" && resp.Node.TTL == 5) { t.Fatalf("SetDir 1 failed: %#v", resp) } + if resp.PrevNode != nil { + t.Fatalf("SetDir 1 PrevNode failed: %#v", resp) + } // This should fail because /fooDir already points to a directory - resp, err = c.SetDir("/fooDir", 5) + resp, err = c.CreateDir("/fooDir", 5) if err == nil { t.Fatalf("fooDir already points to a directory, so SetDir should have failed."+ "The response was: %#v", resp) @@ -116,12 +129,15 @@ func TestSetDir(t *testing.T) { } // This should succeed + // It should replace the key resp, err = c.SetDir("foo", 5) if err != nil { t.Fatal(err) } - if !(resp.Key == "/foo" && resp.Value == "" && - resp.PrevValue == "bar" && resp.TTL == 5) { + if !(resp.Node.Key == "/foo" && resp.Node.Value == "" && resp.Node.TTL == 5) { + t.Fatalf("SetDir 2 failed: %#v", resp) + } + if !(resp.PrevNode.Key == "/foo" && resp.PrevNode.Value == "bar" && resp.PrevNode.TTL == 5) { t.Fatalf("SetDir 2 failed: %#v", resp) } } @@ -129,11 +145,10 @@ func TestSetDir(t *testing.T) { func TestUpdateDir(t *testing.T) { c := NewClient(nil) defer func() { - c.DeleteAll("fooDir") + c.Delete("fooDir", true) }() - resp, err := c.SetDir("fooDir", 5) - t.Logf("%#v", resp) + resp, err := c.CreateDir("fooDir", 5) if err != nil { t.Fatal(err) } @@ -144,23 +159,26 @@ func TestUpdateDir(t *testing.T) { t.Fatal(err) } - if !(resp.Action == "update" && resp.Key == "/fooDir" && - resp.Value == "" && resp.PrevValue == "" && resp.TTL == 5) { + if !(resp.Action == "update" && resp.Node.Key == "/fooDir" && + resp.Node.Value == "" && resp.Node.TTL == 5) { t.Fatalf("UpdateDir 1 failed: %#v", resp) } + if !(resp.PrevNode.Key == "/fooDir" && resp.PrevNode.Dir == true && resp.PrevNode.TTL == 5) { + t.Fatalf("UpdateDir 1 PrevNode failed: %#v", resp) + } // This should fail because the key does not exist. resp, err = c.UpdateDir("nonexistentDir", 5) if err == nil { t.Fatalf("The key %v did not exist, so the update should have failed."+ - "The response was: %#v", resp.Key, resp) + "The response was: %#v", resp.Node.Key, resp) } } func TestCreateDir(t *testing.T) { c := NewClient(nil) defer func() { - c.DeleteAll("fooDir") + c.Delete("fooDir", true) }() // This should succeed @@ -169,15 +187,18 @@ func TestCreateDir(t *testing.T) { t.Fatal(err) } - if !(resp.Action == "create" && resp.Key == "/fooDir" && - resp.Value == "" && resp.PrevValue == "" && resp.TTL == 5) { + if !(resp.Action == "create" && resp.Node.Key == "/fooDir" && + resp.Node.Value == "" && resp.Node.TTL == 5) { t.Fatalf("CreateDir 1 failed: %#v", resp) } + if resp.PrevNode != nil { + t.Fatalf("CreateDir 1 PrevNode failed: %#v", resp) + } // This should fail, because the key is already there resp, err = c.CreateDir("fooDir", 5) if err == nil { t.Fatalf("The key %v did exist, so the creation should have failed."+ - "The response was: %#v", resp.Key, resp) + "The response was: %#v", resp.Node.Key, resp) } } diff --git a/third_party/github.com/coreos/go-etcd/etcd/utils.go b/third_party/github.com/coreos/go-etcd/etcd/utils.go deleted file mode 100644 index eb2f6046..00000000 --- a/third_party/github.com/coreos/go-etcd/etcd/utils.go +++ /dev/null @@ -1,33 +0,0 @@ -// Utility functions - -package etcd - -import ( - "fmt" - "net/url" - "reflect" -) - -// Convert options to a string of HTML parameters -func optionsToString(options options, vops validOptions) (string, error) { - p := "?" - v := url.Values{} - for opKey, opVal := range options { - // Check if the given option is valid (that it exists) - kind := vops[opKey] - if kind == reflect.Invalid { - return "", fmt.Errorf("Invalid option: %v", opKey) - } - - // Check if the given option is of the valid type - t := reflect.TypeOf(opVal) - if kind != t.Kind() { - return "", fmt.Errorf("Option %s should be of %v kind, not of %v kind.", - opKey, kind, t.Kind()) - } - - v.Set(opKey, fmt.Sprintf("%v", opVal)) - } - p += v.Encode() - return p, nil -} diff --git a/third_party/github.com/coreos/go-etcd/etcd/watch.go b/third_party/github.com/coreos/go-etcd/etcd/watch.go index bbce2039..1e1ac74a 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/watch.go +++ b/third_party/github.com/coreos/go-etcd/etcd/watch.go @@ -9,44 +9,74 @@ var ( ErrWatchStoppedByUser = errors.New("Watch stopped by the user via stop channel") ) -// WatchAll returns the first change under the given prefix since the given index. To -// watch for the latest change, set waitIndex = 0. +// If recursive is set to true the watch returns the first change under the given +// prefix since the given index. // -// If the prefix points to a directory, any change under it, including all child directories, -// will be returned. +// If recursive is set to false the watch returns the first change to the given key +// since the given index. // -// If a receiver channel is given, it will be a long-term watch. Watch will block at the -// channel. And after someone receive the channel, it will go on to watch that prefix. -// If a stop channel is given, client can close long-term watch using the stop channel -func (c *Client) WatchAll(prefix string, waitIndex uint64, receiver chan *Response, stop chan bool) (*Response, error) { - return c.watch(prefix, waitIndex, true, receiver, stop) -} - -// Watch returns the first change to the given key since the given index. To -// watch for the latest change, set waitIndex = 0. +// To watch for the latest change, set waitIndex = 0. // // If a receiver channel is given, it will be a long-term watch. Watch will block at the -// channel. And after someone receive the channel, it will go on to watch that -// prefix. If a stop channel is given, client can close long-term watch using -// the stop channel -func (c *Client) Watch(key string, waitIndex uint64, receiver chan *Response, stop chan bool) (*Response, error) { - return c.watch(key, waitIndex, false, receiver, stop) +//channel. After someone receives the channel, it will go on to watch that +// prefix. If a stop channel is given, the client can close long-term watch using +// the stop channel. +func (c *Client) Watch(prefix string, waitIndex uint64, recursive bool, + receiver chan *Response, stop chan bool) (*Response, error) { + logger.Debugf("watch %s [%s]", prefix, c.cluster.Leader) + if receiver == nil { + raw, err := c.watchOnce(prefix, waitIndex, recursive, stop) + + if err != nil { + return nil, err + } + + return raw.toResponse() + } + + for { + raw, err := c.watchOnce(prefix, waitIndex, recursive, stop) + + if err != nil { + return nil, err + } + + resp, err := raw.toResponse() + + if err != nil { + return nil, err + } + + waitIndex = resp.Node.ModifiedIndex + 1 + receiver <- resp + } + + return nil, nil } -func (c *Client) watch(prefix string, waitIndex uint64, recursive bool, receiver chan *Response, stop chan bool) (*Response, error) { - logger.Debugf("watch %s [%s]", prefix, c.cluster.Leader) +func (c *Client) RawWatch(prefix string, waitIndex uint64, recursive bool, + receiver chan *RawResponse, stop chan bool) (*RawResponse, error) { + + logger.Debugf("rawWatch %s [%s]", prefix, c.cluster.Leader) if receiver == nil { return c.watchOnce(prefix, waitIndex, recursive, stop) - } else { - for { - resp, err := c.watchOnce(prefix, waitIndex, recursive, stop) - if resp != nil { - waitIndex = resp.ModifiedIndex + 1 - receiver <- resp - } else { - return nil, err - } + } + + for { + raw, err := c.watchOnce(prefix, waitIndex, recursive, stop) + + if err != nil { + return nil, err + } + + resp, err := raw.toResponse() + + if err != nil { + return nil, err } + + waitIndex = resp.Node.ModifiedIndex + 1 + receiver <- raw } return nil, nil @@ -54,9 +84,9 @@ func (c *Client) watch(prefix string, waitIndex uint64, recursive bool, receiver // helper func // return when there is change under the given prefix -func (c *Client) watchOnce(key string, waitIndex uint64, recursive bool, stop chan bool) (*Response, error) { +func (c *Client) watchOnce(key string, waitIndex uint64, recursive bool, stop chan bool) (*RawResponse, error) { - respChan := make(chan *Response) + respChan := make(chan *RawResponse, 1) errChan := make(chan error) go func() { @@ -74,6 +104,7 @@ func (c *Client) watchOnce(key string, waitIndex uint64, recursive bool, stop ch if err != nil { errChan <- err + return } respChan <- resp diff --git a/third_party/github.com/coreos/go-etcd/etcd/watch_test.go b/third_party/github.com/coreos/go-etcd/etcd/watch_test.go index 10fc2b6b..9b466489 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/watch_test.go +++ b/third_party/github.com/coreos/go-etcd/etcd/watch_test.go @@ -9,26 +9,26 @@ import ( func TestWatch(t *testing.T) { c := NewClient(nil) defer func() { - c.DeleteAll("watch_foo") + c.Delete("watch_foo", true) }() go setHelper("watch_foo", "bar", c) - resp, err := c.Watch("watch_foo", 0, nil, nil) + resp, err := c.Watch("watch_foo", 0, false, nil, nil) if err != nil { t.Fatal(err) } - if !(resp.Key == "/watch_foo" && resp.Value == "bar") { + if !(resp.Node.Key == "/watch_foo" && resp.Node.Value == "bar") { t.Fatalf("Watch 1 failed: %#v", resp) } go setHelper("watch_foo", "bar", c) - resp, err = c.Watch("watch_foo", resp.ModifiedIndex, nil, nil) + resp, err = c.Watch("watch_foo", resp.Node.ModifiedIndex+1, false, nil, nil) if err != nil { t.Fatal(err) } - if !(resp.Key == "/watch_foo" && resp.Value == "bar") { + if !(resp.Node.Key == "/watch_foo" && resp.Node.Value == "bar") { t.Fatalf("Watch 2 failed: %#v", resp) } @@ -39,7 +39,7 @@ func TestWatch(t *testing.T) { go receiver(ch, stop) - _, err = c.Watch("watch_foo", 0, ch, stop) + _, err = c.Watch("watch_foo", 0, false, ch, stop) if err != ErrWatchStoppedByUser { t.Fatalf("Watch returned a non-user stop error") } @@ -48,26 +48,26 @@ func TestWatch(t *testing.T) { func TestWatchAll(t *testing.T) { c := NewClient(nil) defer func() { - c.DeleteAll("watch_foo") + c.Delete("watch_foo", true) }() go setHelper("watch_foo/foo", "bar", c) - resp, err := c.WatchAll("watch_foo", 0, nil, nil) + resp, err := c.Watch("watch_foo", 0, true, nil, nil) if err != nil { t.Fatal(err) } - if !(resp.Key == "/watch_foo/foo" && resp.Value == "bar") { + if !(resp.Node.Key == "/watch_foo/foo" && resp.Node.Value == "bar") { t.Fatalf("WatchAll 1 failed: %#v", resp) } go setHelper("watch_foo/foo", "bar", c) - resp, err = c.WatchAll("watch_foo", resp.ModifiedIndex, nil, nil) + resp, err = c.Watch("watch_foo", resp.Node.ModifiedIndex+1, true, nil, nil) if err != nil { t.Fatal(err) } - if !(resp.Key == "/watch_foo/foo" && resp.Value == "bar") { + if !(resp.Node.Key == "/watch_foo/foo" && resp.Node.Value == "bar") { t.Fatalf("WatchAll 2 failed: %#v", resp) } @@ -78,7 +78,7 @@ func TestWatchAll(t *testing.T) { go receiver(ch, stop) - _, err = c.WatchAll("watch_foo", 0, ch, stop) + _, err = c.Watch("watch_foo", 0, true, ch, stop) if err != ErrWatchStoppedByUser { t.Fatalf("Watch returned a non-user stop error") } -- GitLab