Commit f66e5116 authored by ale's avatar ale

freeze the 0.1 branch of the etcd client

parent 321fbeeb
......@@ -10,7 +10,7 @@ import (
"sync"
"time"
"github.com/coreos/go-etcd/etcd"
"git.autistici.org/ale/radioai/third_party/github.com/coreos/go-etcd/etcd"
)
var (
......
......@@ -7,7 +7,7 @@ import (
"net"
"strings"
"github.com/coreos/go-etcd/etcd"
"git.autistici.org/ale/radioai/third_party/github.com/coreos/go-etcd/etcd"
)
var (
......
......@@ -4,7 +4,7 @@ import (
"log"
"time"
"github.com/coreos/go-etcd/etcd"
"git.autistici.org/ale/radioai/third_party/github.com/coreos/go-etcd/etcd"
)
const (
......
......@@ -9,7 +9,7 @@ import (
"git.autistici.org/ale/radioai"
"git.autistici.org/ale/radioai/masterelection"
"github.com/coreos/go-etcd/etcd"
"git.autistici.org/ale/radioai/third_party/github.com/coreos/go-etcd/etcd"
)
func trigger(c chan bool) {
......
package etcd
import (
"crypto/tls"
"errors"
"io/ioutil"
"net"
"net/http"
"net/url"
"path"
"strings"
"time"
)
const (
HTTP = iota
HTTPS
)
type Cluster struct {
Leader string
Machines []string
}
type Config struct {
CertFile string
KeyFile string
Scheme string
Timeout time.Duration
}
type Client struct {
cluster Cluster
config Config
httpClient *http.Client
}
// Setup a basic conf and cluster
func NewClient(machines []string) *Client {
// if an empty slice was sent in then just assume localhost
if len(machines) == 0 {
machines = []string{"http://127.0.0.1:4001"}
}
// default leader and machines
cluster := Cluster{
Leader: machines[0],
Machines: machines,
}
config := Config{
// default use http
Scheme: "http",
// default timeout is one second
Timeout: time.Second,
}
tr := &http.Transport{
Dial: dialTimeout,
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
}
return &Client{
cluster: cluster,
config: config,
httpClient: &http.Client{Transport: tr},
}
}
func (c *Client) SetCertAndKey(cert string, key string) (bool, error) {
if cert != "" && key != "" {
tlsCert, err := tls.LoadX509KeyPair(cert, key)
if err != nil {
return false, err
}
tr := &http.Transport{
TLSClientConfig: &tls.Config{
Certificates: []tls.Certificate{tlsCert},
InsecureSkipVerify: true,
},
Dial: dialTimeout,
}
c.httpClient = &http.Client{Transport: tr}
return true, nil
}
return false, errors.New("Require both cert and key path")
}
func (c *Client) SetScheme(scheme int) (bool, error) {
if scheme == HTTP {
c.config.Scheme = "http"
return true, nil
}
if scheme == HTTPS {
c.config.Scheme = "https"
return true, nil
}
return false, errors.New("Unknown Scheme")
}
// Try to sync from the given machine
func (c *Client) SetCluster(machines []string) bool {
success := c.internalSyncCluster(machines)
return success
}
func (c *Client) GetCluster() []string {
return c.cluster.Machines
}
// sycn cluster information using the existing machine list
func (c *Client) SyncCluster() bool {
success := c.internalSyncCluster(c.cluster.Machines)
return success
}
// sync cluster information by providing machine list
func (c *Client) internalSyncCluster(machines []string) bool {
for _, machine := range machines {
httpPath := c.createHttpPath(machine, version+"/machines")
resp, err := c.httpClient.Get(httpPath)
if err != nil {
// try another machine in the cluster
continue
} else {
b, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
// try another machine in the cluster
continue
}
// update Machines List
c.cluster.Machines = strings.Split(string(b), ", ")
// update leader
// the first one in the machine list is the leader
logger.Debugf("update.leader[%s,%s]", c.cluster.Leader, c.cluster.Machines[0])
c.cluster.Leader = c.cluster.Machines[0]
logger.Debug("sync.machines ", c.cluster.Machines)
return true
}
}
return false
}
// serverName should contain both hostName and port
func (c *Client) createHttpPath(serverName string, _path string) string {
u, _ := url.Parse(serverName)
u.Path = path.Join(u.Path, "/", _path)
if u.Scheme == "" {
u.Scheme = "http"
}
return u.String()
}
// Dial with timeout.
func dialTimeout(network, addr string) (net.Conn, error) {
return net.DialTimeout(network, addr, time.Second)
}
func (c *Client) getHttpPath(s ...string) string {
u, _ := url.Parse(c.cluster.Leader)
u.Path = path.Join(u.Path, "/", version)
for _, seg := range s {
u.Path = path.Join(u.Path, seg)
}
return u.String()
}
func (c *Client) updateLeader(httpPath string) {
u, _ := url.Parse(httpPath)
var leader string
if u.Scheme == "" {
leader = "http://" + u.Host
} else {
leader = u.Scheme + "://" + u.Host
}
logger.Debugf("update.leader[%s,%s]", c.cluster.Leader, leader)
c.cluster.Leader = leader
}
// Wrap GET, POST and internal error handling
func (c *Client) sendRequest(method string, _path string, body string) (*http.Response, error) {
var resp *http.Response
var err error
var req *http.Request
retry := 0
// if we connect to a follower, we will retry until we found a leader
for {
httpPath := c.getHttpPath(_path)
logger.Debug("send.request.to ", httpPath)
if body == "" {
req, _ = http.NewRequest(method, httpPath, nil)
} else {
req, _ = http.NewRequest(method, httpPath, strings.NewReader(body))
req.Header.Set("Content-Type", "application/x-www-form-urlencoded; param=value")
}
resp, err = c.httpClient.Do(req)
logger.Debug("recv.response.from ", httpPath)
// network error, change a machine!
if err != nil {
retry++
if retry > 2*len(c.cluster.Machines) {
return nil, errors.New("Cannot reach servers")
}
num := retry % len(c.cluster.Machines)
logger.Debug("update.leader[", c.cluster.Leader, ",", c.cluster.Machines[num], "]")
c.cluster.Leader = c.cluster.Machines[num]
time.Sleep(time.Millisecond * 200)
continue
}
if resp != nil {
if resp.StatusCode == http.StatusTemporaryRedirect {
httpPath := resp.Header.Get("Location")
resp.Body.Close()
if httpPath == "" {
return nil, errors.New("Cannot get redirection location")
}
c.updateLeader(httpPath)
logger.Debug("send.redirect")
// try to connect the leader
continue
} else if resp.StatusCode == http.StatusInternalServerError {
resp.Body.Close()
retry++
if retry > 2*len(c.cluster.Machines) {
return nil, errors.New("Cannot reach servers")
}
continue
} else {
logger.Debug("send.return.response ", httpPath)
break
}
}
logger.Debug("error.from ", httpPath, " ", err.Error())
return nil, err
}
return resp, nil
}
package etcd
import (
"fmt"
"testing"
"net/url"
"net"
)
// To pass this test, we need to create a cluster of 3 machines
// The server should be listening on 127.0.0.1:4001, 4002, 4003
func TestSync(t *testing.T) {
fmt.Println("Make sure there are three nodes at 0.0.0.0:4001-4003")
c := NewClient(nil)
success := c.SyncCluster()
if !success {
t.Fatal("cannot sync machines")
}
for _, m := range(c.GetCluster()) {
u, err := url.Parse(m)
if err != nil {
t.Fatal(err)
}
if u.Scheme != "http" {
t.Fatal("scheme must be http")
}
host, _, err := net.SplitHostPort(u.Host)
if err != nil {
t.Fatal(err)
}
if host != "127.0.0.1" {
t.Fatal("Host must be 127.0.0.1")
}
}
badMachines := []string{"abc", "edef"}
success = c.SetCluster(badMachines)
if success {
t.Fatal("should not sync on bad machines")
}
goodMachines := []string{"127.0.0.1:4002"}
success = c.SetCluster(goodMachines)
if !success {
t.Fatal("cannot sync machines")
} else {
fmt.Println(c.cluster.Machines)
}
}
package etcd
import (
"github.com/coreos/go-log/log"
"os"
)
var logger *log.Logger
func init() {
setLogger(log.PriErr)
}
func OpenDebug() {
setLogger(log.PriDebug)
}
func CloseDebug() {
setLogger(log.PriErr)
}
func setLogger(priority log.Priority) {
logger = log.NewSimple(
log.PriorityFilter(
priority,
log.WriterSink(os.Stdout, log.BasicFormat, log.BasicFields)))
}
package etcd
import (
"encoding/json"
"io/ioutil"
"net/http"
"path"
)
func (c *Client) Delete(key string) (*Response, error) {
resp, err := c.sendRequest("DELETE", path.Join("keys", key), "")
if err != nil {
return nil, err
}
b, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
return nil, handleError(b)
}
var result Response
err = json.Unmarshal(b, &result)
if err != nil {
return nil, err
}
return &result, nil
}
package etcd
import (
"testing"
)
func TestDelete(t *testing.T) {
c := NewClient(nil)
c.Set("foo", "bar", 100)
result, err := c.Delete("foo")
if err != nil {
t.Fatal(err)
}
if result.PrevValue != "bar" || result.Value != "" {
t.Fatalf("Delete failed with %s %s", result.PrevValue,
result.Value)
}
}
package etcd
import (
"encoding/json"
"fmt"
)
type EtcdError struct {
ErrorCode int `json:"errorCode"`
Message string `json:"message"`
Cause string `json:"cause,omitempty"`
}
func (e EtcdError) Error() string {
return fmt.Sprintf("%d: %s (%s)", e.ErrorCode, e.Message, e.Cause)
}
func handleError(b []byte) error {
var err EtcdError
json.Unmarshal(b, &err)
return err
}
package etcd
import (
"encoding/json"
"io/ioutil"
"net/http"
"path"
)
func (c *Client) Get(key string) ([]*Response, error) {
logger.Debugf("get %s [%s]", key, c.cluster.Leader)
resp, err := c.sendRequest("GET", path.Join("keys", key), "")
if err != nil {
return nil, err
}
b, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
return nil, handleError(b)
}
return convertGetResponse(b)
}
// GetTo gets the value of the key from a given machine address.
// If the given machine is not available it returns an error.
// Mainly use for testing purpose
func (c *Client) GetFrom(key string, addr string) ([]*Response, error) {
httpPath := c.createHttpPath(addr, path.Join(version, "keys", key))
resp, err := c.httpClient.Get(httpPath)
if err != nil {
return nil, err
}
b, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
return nil, handleError(b)
}
return convertGetResponse(b)
}
// Convert byte stream to response.
func convertGetResponse(b []byte) ([]*Response, error) {
var results []*Response
var result *Response
err := json.Unmarshal(b, &result)
if err != nil {
err = json.Unmarshal(b, &results)
if err != nil {
return nil, err
}
} else {
results = make([]*Response, 1)
results[0] = result
}
return results, nil
}
package etcd
import (
"testing"
"time"
)
func TestGet(t *testing.T) {
c := NewClient(nil)
c.Set("foo", "bar", 100)
// wait for commit
time.Sleep(100 * time.Millisecond)
results, err := c.Get("foo")
if err != nil || results[0].Key != "/foo" || results[0].Value != "bar" {
if err != nil {
t.Fatal(err)
}
t.Fatalf("Get failed with %s %s %v", results[0].Key, results[0].Value, results[0].TTL)
}
results, err = c.Get("goo")
if err == nil {
t.Fatalf("should not be able to get non-exist key")
}
results, err = c.GetFrom("foo", "0.0.0.0:4001")
if err != nil || results[0].Key != "/foo" || results[0].Value != "bar" {
if err != nil {
t.Fatal(err)
}
t.Fatalf("Get failed with %s %s %v", results[0].Key, results[0].Value, results[0].TTL)
}
results, err = c.GetFrom("foo", "0.0.0.0:4009")
if err == nil {
t.Fatal("should not get from port 4009")
}
}
package etcd
import (
"testing"
"time"
)
func TestList(t *testing.T) {
c := NewClient(nil)
c.Set("foo_list/foo", "bar", 100)
c.Set("foo_list/fooo", "barbar", 100)
c.Set("foo_list/foooo/foo", "barbarbar", 100)
// wait for commit
time.Sleep(time.Second)
_, err := c.Get("foo_list")
if err != nil {
t.Fatal(err)
}
}
package etcd
import (
"time"
)
// The response object from the server.
type Response struct {
Action string `json:"action"`
Key string `json:"key"`
Dir bool `json:"dir,omitempty"`
PrevValue string `json:"prevValue,omitempty"`
Value string `json:"value,omitempty"`
// If the key did not exist before the action,
// this field should be set to true
NewKey bool `json:"newKey,omitempty"`
Expiration *time.Time `json:"expiration,omitempty"`
// Time to live in second
TTL int64 `json:"ttl,omitempty"`
// The command index of the raft machine when the command is executed
Index uint64 `json:"index"`
}
package etcd
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"path"
)
func (c *Client) Set(key string, value string, ttl uint64) (*Response, error) {
logger.Debugf("set %s, %s, ttl: %d, [%s]", key, value, ttl, c.cluster.Leader)
v := url.Values{}
v.Set("value", value)
if ttl > 0 {
v.Set("ttl", fmt.Sprintf("%v", ttl))
}
resp, err := c.sendRequest("POST", path.Join("keys", key), v.Encode())
if err != nil {
return nil, err
}
b, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
return nil, handleError(b)
}
return convertSetResponse(b)
}
// SetTo sets the value of the key to a given machine address.
// If the given machine is not available or is not leader it returns an error
// Mainly use for testing purpose.
func (c *Client) SetTo(key string, value string, ttl uint64, addr string) (*Response, error) {