diff --git a/api.go b/api.go index ce3e184927e2d35e985dd8d1d38f5081b4f987aa..5e75f16bfb2c360d14fe41116668f21240c0385c 100644 --- a/api.go +++ b/api.go @@ -10,7 +10,7 @@ import ( "sync" "time" - "github.com/coreos/go-etcd/etcd" + "git.autistici.org/ale/radioai/third_party/github.com/coreos/go-etcd/etcd" ) var ( diff --git a/etcd_client.go b/etcd_client.go index a6251e7c8edec1def38f9ad4d75f87c350bc8380..5d62395c223cd17babbef6e89939f4c9bbd5d1b5 100644 --- a/etcd_client.go +++ b/etcd_client.go @@ -7,7 +7,7 @@ import ( "net" "strings" - "github.com/coreos/go-etcd/etcd" + "git.autistici.org/ale/radioai/third_party/github.com/coreos/go-etcd/etcd" ) var ( diff --git a/masterelection/masterelection.go b/masterelection/masterelection.go index c73a67f1e0e8704c7140f30440d2671024f722b9..e7ec6286d3662b28bfdd6664d4d278960cc6b57b 100644 --- a/masterelection/masterelection.go +++ b/masterelection/masterelection.go @@ -4,7 +4,7 @@ import ( "log" "time" - "github.com/coreos/go-etcd/etcd" + "git.autistici.org/ale/radioai/third_party/github.com/coreos/go-etcd/etcd" ) const ( diff --git a/node/node.go b/node/node.go index 30ec6458e0060515ce697ab2af89e2a34200e716..49afdc90fbb4cbf48776dae5635cc77a03dca2a4 100644 --- a/node/node.go +++ b/node/node.go @@ -9,7 +9,7 @@ import ( "git.autistici.org/ale/radioai" "git.autistici.org/ale/radioai/masterelection" - "github.com/coreos/go-etcd/etcd" + "git.autistici.org/ale/radioai/third_party/github.com/coreos/go-etcd/etcd" ) func trigger(c chan bool) { diff --git a/third_party/github.com/coreos/go-etcd/etcd/client.go b/third_party/github.com/coreos/go-etcd/etcd/client.go new file mode 100644 index 0000000000000000000000000000000000000000..31d3c2a3a76069f3c956e34e0078834d76a764fc --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/client.go @@ -0,0 +1,267 @@ +package etcd + +import ( + "crypto/tls" + "errors" + "io/ioutil" + "net" + "net/http" + "net/url" + "path" + "strings" + "time" +) + +const ( + HTTP = iota + HTTPS +) + +type Cluster struct { + Leader string + Machines []string +} + +type Config struct { + CertFile string + KeyFile string + Scheme string + Timeout time.Duration +} + +type Client struct { + cluster Cluster + config Config + httpClient *http.Client +} + +// Setup a basic conf and cluster +func NewClient(machines []string) *Client { + // if an empty slice was sent in then just assume localhost + if len(machines) == 0 { + machines = []string{"http://127.0.0.1:4001"} + } + + // default leader and machines + cluster := Cluster{ + Leader: machines[0], + Machines: machines, + } + + config := Config{ + // default use http + Scheme: "http", + // default timeout is one second + Timeout: time.Second, + } + + tr := &http.Transport{ + Dial: dialTimeout, + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true, + }, + } + + return &Client{ + cluster: cluster, + config: config, + httpClient: &http.Client{Transport: tr}, + } + +} + +func (c *Client) SetCertAndKey(cert string, key string) (bool, error) { + + if cert != "" && key != "" { + tlsCert, err := tls.LoadX509KeyPair(cert, key) + + if err != nil { + return false, err + } + + tr := &http.Transport{ + TLSClientConfig: &tls.Config{ + Certificates: []tls.Certificate{tlsCert}, + InsecureSkipVerify: true, + }, + Dial: dialTimeout, + } + + c.httpClient = &http.Client{Transport: tr} + return true, nil + } + return false, errors.New("Require both cert and key path") +} + +func (c *Client) SetScheme(scheme int) (bool, error) { + if scheme == HTTP { + c.config.Scheme = "http" + return true, nil + } + if scheme == HTTPS { + c.config.Scheme = "https" + return true, nil + } + return false, errors.New("Unknown Scheme") +} + +// Try to sync from the given machine +func (c *Client) SetCluster(machines []string) bool { + success := c.internalSyncCluster(machines) + return success +} + +func (c *Client) GetCluster() []string { + return c.cluster.Machines +} + +// sycn cluster information using the existing machine list +func (c *Client) SyncCluster() bool { + success := c.internalSyncCluster(c.cluster.Machines) + return success +} + +// sync cluster information by providing machine list +func (c *Client) internalSyncCluster(machines []string) bool { + for _, machine := range machines { + httpPath := c.createHttpPath(machine, version+"/machines") + resp, err := c.httpClient.Get(httpPath) + if err != nil { + // try another machine in the cluster + continue + } else { + b, err := ioutil.ReadAll(resp.Body) + resp.Body.Close() + if err != nil { + // try another machine in the cluster + continue + } + + // update Machines List + c.cluster.Machines = strings.Split(string(b), ", ") + + // update leader + // the first one in the machine list is the leader + logger.Debugf("update.leader[%s,%s]", c.cluster.Leader, c.cluster.Machines[0]) + c.cluster.Leader = c.cluster.Machines[0] + + logger.Debug("sync.machines ", c.cluster.Machines) + return true + } + } + return false +} + +// serverName should contain both hostName and port +func (c *Client) createHttpPath(serverName string, _path string) string { + u, _ := url.Parse(serverName) + u.Path = path.Join(u.Path, "/", _path) + if u.Scheme == "" { + u.Scheme = "http" + } + return u.String() +} + +// Dial with timeout. +func dialTimeout(network, addr string) (net.Conn, error) { + return net.DialTimeout(network, addr, time.Second) +} + +func (c *Client) getHttpPath(s ...string) string { + u, _ := url.Parse(c.cluster.Leader) + + u.Path = path.Join(u.Path, "/", version) + + for _, seg := range s { + u.Path = path.Join(u.Path, seg) + } + + return u.String() +} + +func (c *Client) updateLeader(httpPath string) { + u, _ := url.Parse(httpPath) + + var leader string + if u.Scheme == "" { + leader = "http://" + u.Host + } else { + leader = u.Scheme + "://" + u.Host + } + + logger.Debugf("update.leader[%s,%s]", c.cluster.Leader, leader) + c.cluster.Leader = leader +} + +// Wrap GET, POST and internal error handling +func (c *Client) sendRequest(method string, _path string, body string) (*http.Response, error) { + + var resp *http.Response + var err error + var req *http.Request + + retry := 0 + // if we connect to a follower, we will retry until we found a leader + for { + + httpPath := c.getHttpPath(_path) + + logger.Debug("send.request.to ", httpPath) + if body == "" { + + req, _ = http.NewRequest(method, httpPath, nil) + + } else { + req, _ = http.NewRequest(method, httpPath, strings.NewReader(body)) + req.Header.Set("Content-Type", "application/x-www-form-urlencoded; param=value") + } + + resp, err = c.httpClient.Do(req) + + logger.Debug("recv.response.from ", httpPath) + // network error, change a machine! + if err != nil { + retry++ + if retry > 2*len(c.cluster.Machines) { + return nil, errors.New("Cannot reach servers") + } + num := retry % len(c.cluster.Machines) + logger.Debug("update.leader[", c.cluster.Leader, ",", c.cluster.Machines[num], "]") + c.cluster.Leader = c.cluster.Machines[num] + time.Sleep(time.Millisecond * 200) + continue + } + + if resp != nil { + if resp.StatusCode == http.StatusTemporaryRedirect { + httpPath := resp.Header.Get("Location") + + resp.Body.Close() + + if httpPath == "" { + return nil, errors.New("Cannot get redirection location") + } + + c.updateLeader(httpPath) + logger.Debug("send.redirect") + // try to connect the leader + continue + } else if resp.StatusCode == http.StatusInternalServerError { + resp.Body.Close() + + retry++ + if retry > 2*len(c.cluster.Machines) { + return nil, errors.New("Cannot reach servers") + } + continue + } else { + logger.Debug("send.return.response ", httpPath) + break + } + + } + logger.Debug("error.from ", httpPath, " ", err.Error()) + return nil, err + } + return resp, nil +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/client_test.go b/third_party/github.com/coreos/go-etcd/etcd/client_test.go new file mode 100644 index 0000000000000000000000000000000000000000..29f138113cef739885142d8d6427c2baf35395e6 --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/client_test.go @@ -0,0 +1,58 @@ +package etcd + +import ( + "fmt" + "testing" + "net/url" + "net" +) + +// To pass this test, we need to create a cluster of 3 machines +// The server should be listening on 127.0.0.1:4001, 4002, 4003 +func TestSync(t *testing.T) { + fmt.Println("Make sure there are three nodes at 0.0.0.0:4001-4003") + + c := NewClient(nil) + + success := c.SyncCluster() + if !success { + t.Fatal("cannot sync machines") + } + + for _, m := range(c.GetCluster()) { + u, err := url.Parse(m) + if err != nil { + t.Fatal(err) + } + if u.Scheme != "http" { + t.Fatal("scheme must be http") + } + + host, _, err := net.SplitHostPort(u.Host) + if err != nil { + t.Fatal(err) + } + if host != "127.0.0.1" { + t.Fatal("Host must be 127.0.0.1") + } + } + + badMachines := []string{"abc", "edef"} + + success = c.SetCluster(badMachines) + + if success { + t.Fatal("should not sync on bad machines") + } + + goodMachines := []string{"127.0.0.1:4002"} + + success = c.SetCluster(goodMachines) + + if !success { + t.Fatal("cannot sync machines") + } else { + fmt.Println(c.cluster.Machines) + } + +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/debug.go b/third_party/github.com/coreos/go-etcd/etcd/debug.go new file mode 100644 index 0000000000000000000000000000000000000000..d82d9b292845f2ce5252edce13ecbf0f6d719d31 --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/debug.go @@ -0,0 +1,27 @@ +package etcd + +import ( + "github.com/coreos/go-log/log" + "os" +) + +var logger *log.Logger + +func init() { + setLogger(log.PriErr) +} + +func OpenDebug() { + setLogger(log.PriDebug) +} + +func CloseDebug() { + setLogger(log.PriErr) +} + +func setLogger(priority log.Priority) { + logger = log.NewSimple( + log.PriorityFilter( + priority, + log.WriterSink(os.Stdout, log.BasicFormat, log.BasicFields))) +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/delete.go b/third_party/github.com/coreos/go-etcd/etcd/delete.go new file mode 100644 index 0000000000000000000000000000000000000000..91f6df87a5adc33542428e624227eb7756ca5d0c --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/delete.go @@ -0,0 +1,40 @@ +package etcd + +import ( + "encoding/json" + "io/ioutil" + "net/http" + "path" +) + +func (c *Client) Delete(key string) (*Response, error) { + + resp, err := c.sendRequest("DELETE", path.Join("keys", key), "") + + if err != nil { + return nil, err + } + + b, err := ioutil.ReadAll(resp.Body) + + resp.Body.Close() + + if err != nil { + return nil, err + } + + if resp.StatusCode != http.StatusOK { + return nil, handleError(b) + } + + var result Response + + err = json.Unmarshal(b, &result) + + if err != nil { + return nil, err + } + + return &result, nil + +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/delete_test.go b/third_party/github.com/coreos/go-etcd/etcd/delete_test.go new file mode 100644 index 0000000000000000000000000000000000000000..52756d09fcd74db7ed77478b56dcad6e1b48ba5c --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/delete_test.go @@ -0,0 +1,22 @@ +package etcd + +import ( + "testing" +) + +func TestDelete(t *testing.T) { + + c := NewClient(nil) + + c.Set("foo", "bar", 100) + result, err := c.Delete("foo") + if err != nil { + t.Fatal(err) + } + + if result.PrevValue != "bar" || result.Value != "" { + t.Fatalf("Delete failed with %s %s", result.PrevValue, + result.Value) + } + +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/error.go b/third_party/github.com/coreos/go-etcd/etcd/error.go new file mode 100644 index 0000000000000000000000000000000000000000..9a3268d607cea8924ca671a6d58f3756da7a4b84 --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/error.go @@ -0,0 +1,24 @@ +package etcd + +import ( + "encoding/json" + "fmt" +) + +type EtcdError struct { + ErrorCode int `json:"errorCode"` + Message string `json:"message"` + Cause string `json:"cause,omitempty"` +} + +func (e EtcdError) Error() string { + return fmt.Sprintf("%d: %s (%s)", e.ErrorCode, e.Message, e.Cause) +} + +func handleError(b []byte) error { + var err EtcdError + + json.Unmarshal(b, &err) + + return err +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/get.go b/third_party/github.com/coreos/go-etcd/etcd/get.go new file mode 100644 index 0000000000000000000000000000000000000000..3288621e7c44d3368c48b417b49bafa65283cb66 --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/get.go @@ -0,0 +1,82 @@ +package etcd + +import ( + "encoding/json" + "io/ioutil" + "net/http" + "path" +) + +func (c *Client) Get(key string) ([]*Response, error) { + logger.Debugf("get %s [%s]", key, c.cluster.Leader) + resp, err := c.sendRequest("GET", path.Join("keys", key), "") + + if err != nil { + return nil, err + } + + b, err := ioutil.ReadAll(resp.Body) + + resp.Body.Close() + + if err != nil { + return nil, err + } + + if resp.StatusCode != http.StatusOK { + + return nil, handleError(b) + } + + return convertGetResponse(b) + +} + +// GetTo gets the value of the key from a given machine address. +// If the given machine is not available it returns an error. +// Mainly use for testing purpose +func (c *Client) GetFrom(key string, addr string) ([]*Response, error) { + httpPath := c.createHttpPath(addr, path.Join(version, "keys", key)) + + resp, err := c.httpClient.Get(httpPath) + + if err != nil { + return nil, err + } + + b, err := ioutil.ReadAll(resp.Body) + + resp.Body.Close() + + if err != nil { + return nil, err + } + + if resp.StatusCode != http.StatusOK { + return nil, handleError(b) + } + + return convertGetResponse(b) +} + +// Convert byte stream to response. +func convertGetResponse(b []byte) ([]*Response, error) { + + var results []*Response + var result *Response + + err := json.Unmarshal(b, &result) + + if err != nil { + err = json.Unmarshal(b, &results) + + if err != nil { + return nil, err + } + + } else { + results = make([]*Response, 1) + results[0] = result + } + return results, nil +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/get_test.go b/third_party/github.com/coreos/go-etcd/etcd/get_test.go new file mode 100644 index 0000000000000000000000000000000000000000..ff81374003cdea690432d2fdd6853b6856b2a59d --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/get_test.go @@ -0,0 +1,46 @@ +package etcd + +import ( + "testing" + "time" +) + +func TestGet(t *testing.T) { + + c := NewClient(nil) + + c.Set("foo", "bar", 100) + + // wait for commit + time.Sleep(100 * time.Millisecond) + + results, err := c.Get("foo") + + if err != nil || results[0].Key != "/foo" || results[0].Value != "bar" { + if err != nil { + t.Fatal(err) + } + t.Fatalf("Get failed with %s %s %v", results[0].Key, results[0].Value, results[0].TTL) + } + + results, err = c.Get("goo") + + if err == nil { + t.Fatalf("should not be able to get non-exist key") + } + + results, err = c.GetFrom("foo", "0.0.0.0:4001") + + if err != nil || results[0].Key != "/foo" || results[0].Value != "bar" { + if err != nil { + t.Fatal(err) + } + t.Fatalf("Get failed with %s %s %v", results[0].Key, results[0].Value, results[0].TTL) + } + + results, err = c.GetFrom("foo", "0.0.0.0:4009") + + if err == nil { + t.Fatal("should not get from port 4009") + } +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/list_test.go b/third_party/github.com/coreos/go-etcd/etcd/list_test.go new file mode 100644 index 0000000000000000000000000000000000000000..382bb356d912ee60c9082500678dbaf5d100b695 --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/list_test.go @@ -0,0 +1,23 @@ +package etcd + +import ( + "testing" + "time" +) + +func TestList(t *testing.T) { + c := NewClient(nil) + + c.Set("foo_list/foo", "bar", 100) + c.Set("foo_list/fooo", "barbar", 100) + c.Set("foo_list/foooo/foo", "barbarbar", 100) + // wait for commit + time.Sleep(time.Second) + + _, err := c.Get("foo_list") + + if err != nil { + t.Fatal(err) + } + +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/response.go b/third_party/github.com/coreos/go-etcd/etcd/response.go new file mode 100644 index 0000000000000000000000000000000000000000..d2311a675bd6f0cfdb3949d49f9de1dbb5ccfa5a --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/response.go @@ -0,0 +1,26 @@ +package etcd + +import ( + "time" +) + +// The response object from the server. +type Response struct { + Action string `json:"action"` + Key string `json:"key"` + Dir bool `json:"dir,omitempty"` + PrevValue string `json:"prevValue,omitempty"` + Value string `json:"value,omitempty"` + + // If the key did not exist before the action, + // this field should be set to true + NewKey bool `json:"newKey,omitempty"` + + Expiration *time.Time `json:"expiration,omitempty"` + + // Time to live in second + TTL int64 `json:"ttl,omitempty"` + + // The command index of the raft machine when the command is executed + Index uint64 `json:"index"` +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/set.go b/third_party/github.com/coreos/go-etcd/etcd/set.go new file mode 100644 index 0000000000000000000000000000000000000000..17fc415f2f807f60848c9bd624c1722adb441cc8 --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/set.go @@ -0,0 +1,89 @@ +package etcd + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "path" +) + +func (c *Client) Set(key string, value string, ttl uint64) (*Response, error) { + logger.Debugf("set %s, %s, ttl: %d, [%s]", key, value, ttl, c.cluster.Leader) + v := url.Values{} + v.Set("value", value) + + if ttl > 0 { + v.Set("ttl", fmt.Sprintf("%v", ttl)) + } + + resp, err := c.sendRequest("POST", path.Join("keys", key), v.Encode()) + + if err != nil { + return nil, err + } + + b, err := ioutil.ReadAll(resp.Body) + + resp.Body.Close() + + if err != nil { + return nil, err + } + + if resp.StatusCode != http.StatusOK { + + return nil, handleError(b) + } + + return convertSetResponse(b) + +} + +// SetTo sets the value of the key to a given machine address. +// If the given machine is not available or is not leader it returns an error +// Mainly use for testing purpose. +func (c *Client) SetTo(key string, value string, ttl uint64, addr string) (*Response, error) { + v := url.Values{} + v.Set("value", value) + + if ttl > 0 { + v.Set("ttl", fmt.Sprintf("%v", ttl)) + } + + httpPath := c.createHttpPath(addr, path.Join(version, "keys", key)) + + resp, err := c.httpClient.PostForm(httpPath, v) + + if err != nil { + return nil, err + } + + b, err := ioutil.ReadAll(resp.Body) + + resp.Body.Close() + + if err != nil { + return nil, err + } + + if resp.StatusCode != http.StatusOK { + return nil, handleError(b) + } + + return convertSetResponse(b) +} + +// Convert byte stream to response. +func convertSetResponse(b []byte) (*Response, error) { + var result Response + + err := json.Unmarshal(b, &result) + + if err != nil { + return nil, err + } + + return &result, nil +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/set_test.go b/third_party/github.com/coreos/go-etcd/etcd/set_test.go new file mode 100644 index 0000000000000000000000000000000000000000..3809ee952a9d327ddc852d220490a3528c9fb9e0 --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/set_test.go @@ -0,0 +1,42 @@ +package etcd + +import ( + "testing" + "time" +) + +func TestSet(t *testing.T) { + c := NewClient(nil) + + result, err := c.Set("foo", "bar", 100) + + if err != nil || result.Key != "/foo" || result.Value != "bar" || result.TTL != 99 { + if err != nil { + t.Fatal(err) + } + + t.Fatalf("Set 1 failed with %s %s %v", result.Key, result.Value, result.TTL) + } + + time.Sleep(time.Second) + + result, err = c.Set("foo", "bar", 100) + + if err != nil || result.Key != "/foo" || result.Value != "bar" || result.PrevValue != "bar" || result.TTL != 99 { + if err != nil { + t.Fatal(err) + } + t.Fatalf("Set 2 failed with %s %s %v", result.Key, result.Value, result.TTL) + } + + result, err = c.SetTo("toFoo", "bar", 100, "0.0.0.0:4001") + + if err != nil || result.Key != "/toFoo" || result.Value != "bar" || result.TTL != 99 { + if err != nil { + t.Fatal(err) + } + + t.Fatalf("SetTo failed with %s %s %v", result.Key, result.Value, result.TTL) + } + +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/testAndSet.go b/third_party/github.com/coreos/go-etcd/etcd/testAndSet.go new file mode 100644 index 0000000000000000000000000000000000000000..bdd8ecb4fd8969cdab2e4f62686555e6f0a4436d --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/testAndSet.go @@ -0,0 +1,56 @@ +package etcd + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "path" +) + +func (c *Client) TestAndSet(key string, prevValue string, value string, ttl uint64) (*Response, bool, error) { + logger.Debugf("set %s, %s[%s], ttl: %d, [%s]", key, value, prevValue, ttl, c.cluster.Leader) + v := url.Values{} + v.Set("value", value) + v.Set("prevValue", prevValue) + + if ttl > 0 { + v.Set("ttl", fmt.Sprintf("%v", ttl)) + } + + resp, err := c.sendRequest("POST", path.Join("keys", key), v.Encode()) + + if err != nil { + return nil, false, err + } + + b, err := ioutil.ReadAll(resp.Body) + + resp.Body.Close() + + if err != nil { + + return nil, false, err + } + + if resp.StatusCode != http.StatusOK { + return nil, false, handleError(b) + } + + var result Response + + err = json.Unmarshal(b, &result) + + if err != nil { + return nil, false, err + } + + if result.PrevValue == prevValue && result.Value == value { + + return &result, true, nil + } + + return &result, false, nil + +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/testAndSet_test.go b/third_party/github.com/coreos/go-etcd/etcd/testAndSet_test.go new file mode 100644 index 0000000000000000000000000000000000000000..5dbd854b515c7b05602ac7bd7aa6078c72bb805a --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/testAndSet_test.go @@ -0,0 +1,39 @@ +package etcd + +import ( + "testing" + "time" +) + +func TestTestAndSet(t *testing.T) { + c := NewClient(nil) + + c.Set("foo_testAndSet", "bar", 100) + + time.Sleep(time.Second) + + results := make(chan bool, 3) + + for i := 0; i < 3; i++ { + testAndSet("foo_testAndSet", "bar", "barbar", results, c) + } + + count := 0 + + for i := 0; i < 3; i++ { + result := <-results + if result { + count++ + } + } + + if count != 1 { + t.Fatalf("test and set fails %v", count) + } + +} + +func testAndSet(key string, prevValue string, value string, ch chan bool, c *Client) { + _, success, _ := c.TestAndSet(key, prevValue, value, 0) + ch <- success +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/version.go b/third_party/github.com/coreos/go-etcd/etcd/version.go new file mode 100644 index 0000000000000000000000000000000000000000..e84e7b5b76565128c523a8ca8bc48aceed092fda --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/version.go @@ -0,0 +1,3 @@ +package etcd + +const version = "v1" diff --git a/third_party/github.com/coreos/go-etcd/etcd/watch.go b/third_party/github.com/coreos/go-etcd/etcd/watch.go new file mode 100644 index 0000000000000000000000000000000000000000..18fcfdc1235df214e4cee7ea9e178d132edd38d3 --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/watch.go @@ -0,0 +1,118 @@ +package etcd + +import ( + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "path" +) + +type respAndErr struct { + resp *http.Response + err error +} + +// Errors introduced by the Watch command. +var ( + ErrWatchStoppedByUser = errors.New("Watch stopped by the user via stop channel") +) + +// Watch any change under the given prefix. +// When a sinceIndex is given, watch will try to scan from that index to the last index +// and will return any changes under the given prefix during the history +// If a receiver channel is given, it will be a long-term watch. Watch will block at the +// channel. And after someone receive the channel, it will go on to watch that prefix. +// If a stop channel is given, client can close long-term watch using the stop channel + +func (c *Client) Watch(prefix string, sinceIndex uint64, receiver chan *Response, stop chan bool) (*Response, error) { + logger.Debugf("watch %s [%s]", prefix, c.cluster.Leader) + if receiver == nil { + return c.watchOnce(prefix, sinceIndex, stop) + + } else { + for { + resp, err := c.watchOnce(prefix, sinceIndex, stop) + if resp != nil { + sinceIndex = resp.Index + 1 + receiver <- resp + } else { + return nil, err + } + } + } + + return nil, nil +} + +// helper func +// return when there is change under the given prefix +func (c *Client) watchOnce(key string, sinceIndex uint64, stop chan bool) (*Response, error) { + + var resp *http.Response + var err error + + if stop != nil { + ch := make(chan respAndErr) + + go func() { + resp, err = c.sendWatchRequest(key, sinceIndex) + + ch <- respAndErr{resp, err} + }() + + // select at stop or continue to receive + select { + + case res := <-ch: + resp, err = res.resp, res.err + + case <-stop: + resp, err = nil, ErrWatchStoppedByUser + } + } else { + resp, err = c.sendWatchRequest(key, sinceIndex) + } + + if err != nil { + return nil, err + } + + b, err := ioutil.ReadAll(resp.Body) + + resp.Body.Close() + + if err != nil { + return nil, err + } + + if resp.StatusCode != http.StatusOK { + + return nil, handleError(b) + } + + var result Response + + err = json.Unmarshal(b, &result) + + if err != nil { + return nil, err + } + + return &result, nil +} + +func (c *Client) sendWatchRequest(key string, sinceIndex uint64) (*http.Response, error) { + if sinceIndex == 0 { + resp, err := c.sendRequest("GET", path.Join("watch", key), "") + return resp, err + } else { + v := url.Values{} + v.Set("index", fmt.Sprintf("%v", sinceIndex)) + resp, err := c.sendRequest("POST", path.Join("watch", key), v.Encode()) + return resp, err + } + +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/watch_test.go b/third_party/github.com/coreos/go-etcd/etcd/watch_test.go new file mode 100644 index 0000000000000000000000000000000000000000..a3d33a4f16266e063d243991e7c9a071dc76892f --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/watch_test.go @@ -0,0 +1,64 @@ +package etcd + +import ( + "fmt" + "testing" + "time" +) + +func TestWatch(t *testing.T) { + c := NewClient(nil) + + go setHelper("bar", c) + + result, err := c.Watch("watch_foo", 0, nil, nil) + + if err != nil || result.Key != "/watch_foo/foo" || result.Value != "bar" { + if err != nil { + t.Fatal(err) + } + t.Fatalf("Watch failed with %s %s %v %v", result.Key, result.Value, result.TTL, result.Index) + } + + result, err = c.Watch("watch_foo", result.Index, nil, nil) + + if err != nil || result.Key != "/watch_foo/foo" || result.Value != "bar" { + if err != nil { + t.Fatal(err) + } + t.Fatalf("Watch with Index failed with %s %s %v %v", result.Key, result.Value, result.TTL, result.Index) + } + + ch := make(chan *Response, 10) + stop := make(chan bool, 1) + + go setLoop("bar", c) + + go receiver(ch, stop) + + _, err = c.Watch("watch_foo", 0, ch, stop) + if err != ErrWatchStoppedByUser { + t.Fatalf("Watch returned a non-user stop error") + } +} + +func setHelper(value string, c *Client) { + time.Sleep(time.Second) + c.Set("watch_foo/foo", value, 100) +} + +func setLoop(value string, c *Client) { + time.Sleep(time.Second) + for i := 0; i < 10; i++ { + newValue := fmt.Sprintf("%s_%v", value, i) + c.Set("watch_foo/foo", newValue, 100) + time.Sleep(time.Second / 10) + } +} + +func receiver(c chan *Response, stop chan bool) { + for i := 0; i < 10; i++ { + <-c + } + stop <- true +}