diff --git a/README.rst b/README.rst index bedb9eda484e4539c5d6730211b7573fcb24acb3..198973af6c12dbca6c4c43634002c1d88c85d646 100644 --- a/README.rst +++ b/README.rst @@ -40,6 +40,46 @@ variable to what you've assigned to the cluster. The jobs will automatically start as soon as the configuration is saved. +Full cluster install procedure +++++++++++++++++++++++++++++++ + +This assumes that you have an existing domain name (here +*example.com*) that you control, and that you will run the cluster +under a sub-domain (*radio.example.com*). Follow these steps to +bootstrap a new streaming cluster: + +#. Make sure that, on each of your servers, the output of ``hostname + -f`` is the fully-qualified hostname of the machine, and that it + resolves to its public IP (possibly using ``/etc/hosts``). + +#. On every server, run the above-mentioned steps to set up the APT + repository and install (do not configure) the ``etcd`` and + ``radioai`` packages. + +#. Pick one of your servers and add a delegation for + *radio.example.com* to it. For instance, with ``bind``:: + + radio IN NS 3600 machine1.example.com. + +#. On *machine1*, edit ``/etc/default/etcd`` and set + ``BOOTSTRAP=1``. Once you save the file, the ``etcd`` daemon will + start with an empty database. + +#. On *machine1*, edit ``/etc/default/radioai`` and set + ``DOMAIN=radio.example.com``. This will start the ``radiod`` and + ``redirectord`` daemons, and you will be able to serve DNS records + for the *radio.example.com* zone. Check with:: + + $ ping -c1 radio.example.com + + This should send a ping to *machine1*. + +#. Set up all other machines, setting + ``ETCD_SERVER=etcd.radio.example.com`` in ``/etc/default/etcd`` and + ``DOMAIN=radio.example.com`` in ``/etc/default/radioai``. + + + Securing etcd +++++++++++++ diff --git a/api.go b/api.go index ce3e184927e2d35e985dd8d1d38f5081b4f987aa..5d8e9311488cced157ca3e1ac031ade7bb57dfdc 100644 --- a/api.go +++ b/api.go @@ -10,13 +10,18 @@ import ( "sync" "time" - "github.com/coreos/go-etcd/etcd" + "git.autistici.org/ale/radioai/third_party/github.com/coreos/go-etcd/etcd" ) var ( MasterElectionPath = "/icecast/cluster/master" MountPrefix = "/icecast/mounts/" NodePrefix = "/icecast/nodes/" + + IcecastPort = 8000 + + ErrIsDirectory = errors.New("key is a directory") + ErrIsFile = errors.New("key is a file") ) // A mountpoint for a stream. @@ -38,7 +43,8 @@ func mountPath(mountName string) string { return MountPrefix + mountName[1:] } -// Cache the list of active nodes. +// Cache the list of active nodes (the front-ends that need to +// retrieve this information continuously, so we limit them to 2qps). type nodesCache struct { ttl time.Duration nodes []string @@ -87,16 +93,16 @@ func NewRadioAPI(client *etcd.Client) *RadioAPI { // GetMount returns data on a specific mountpoint (returns nil if not // found). func (r *RadioAPI) GetMount(mountName string) (*Mount, error) { - response, err := r.client.Get(mountPath(mountName)) + response, err := r.client.Get(mountPath(mountName), false) if err != nil { return nil, err } - if len(response) != 1 { - return nil, nil + if response.Dir { + return nil, ErrIsDirectory } var m Mount - if err := json.NewDecoder(strings.NewReader(response[0].Value)).Decode(&m); err != nil { + if err := json.NewDecoder(strings.NewReader(response.Value)).Decode(&m); err != nil { return nil, err } return &m, nil @@ -121,14 +127,21 @@ func (r *RadioAPI) DelMount(mountName string) error { // ListMounts returns a list of all the configured mountpoints. func (r *RadioAPI) ListMounts() ([]*Mount, error) { - response, err := r.client.Get(MountPrefix) + response, err := r.client.Get(MountPrefix, false) if err != nil { return nil, err } - result := make([]*Mount, 0, len(response)) - for _, entry := range response { + if !response.Dir { + return nil, ErrIsFile + } + + result := make([]*Mount, 0, len(response.Kvs)) + for _, kv := range response.Kvs { + if kv.Dir { + continue + } var m Mount - if err := json.NewDecoder(strings.NewReader(entry.Value)).Decode(&m); err != nil { + if err := json.NewDecoder(strings.NewReader(kv.Value)).Decode(&m); err != nil { continue } result = append(result, &m) @@ -138,24 +151,27 @@ func (r *RadioAPI) ListMounts() ([]*Mount, error) { // GetMasterAddr returns the address of the current master server. func (r *RadioAPI) GetMasterAddr() (string, error) { - response, err := r.client.Get(MasterElectionPath) + response, err := r.client.Get(MasterElectionPath, false) if err != nil { return "", err } - if len(response) < 1 { - return "", errors.New("no active master") + if response.Dir { + return "", ErrIsDirectory } - return response[0].Value, nil + return response.Value, nil } // GetNodes returns the list of active cluster nodes. func (r *RadioAPI) doGetNodes() ([]string, error) { - response, err := r.client.Get(NodePrefix) + response, err := r.client.Get(NodePrefix, false) if err != nil { return nil, err } - result := make([]string, 0, len(response)) - for _, entry := range response { + if !response.Dir { + return nil, ErrIsFile + } + result := make([]string, 0, len(response.Kvs)) + for _, entry := range response.Kvs { result = append(result, entry.Value) } return result, nil diff --git a/cmd/radioctl/radioctl.go b/cmd/radioctl/radioctl.go index e827fba89960f3fd8c8a1c03870578324ed7dab1..3b3f45e005f9d43c4fd92e7efc4ec23cb961d582 100644 --- a/cmd/radioctl/radioctl.go +++ b/cmd/radioctl/radioctl.go @@ -104,9 +104,10 @@ func generateUsername(path string) string { func createMount(args []string) { path := args[0] - if !strings.HasPrefix(path, "/") { - log.Fatal("Mount points should specify a full path") + if strings.Contains(path, "/") { + log.Fatal("Mount points should not contain a slash ('/').") } + path = "/" + path // Check if the mount already exists. client := getClient() diff --git a/debian/changelog b/debian/changelog index a707f98eacf8c90d4034fbbafd2cb9b3ded7e744..e179ded086cbc2129b6ca203c673ef4ccb62f0df 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,9 @@ +radioai (0.2) unstable; urgency=low + + * Update to the etcd 0.2 API. + + -- ale <ale@incal.net> Fri, 15 Nov 2013 20:40:06 +0000 + radioai (0.1) unstable; urgency=low * Initial Release. diff --git a/debian/postinst b/debian/postinst index b09c0f196f8a01591e31c25e629c8042c50a0e74..90ffe8ec336b6804b16f5fb75654b7328be37b42 100644 --- a/debian/postinst +++ b/debian/postinst @@ -25,7 +25,7 @@ create_users() { } create_log_dir() { - logdir=/var/log/radioai + logdir=/var/log for d in radiod redirectord ; do mkdir -p ${logdir}/${d} chown log:log ${logdir}/${d} @@ -39,11 +39,16 @@ activate_services() { done } +set_capabilities() { + setcap cap_net_bind_service=+ep /usr/bin/redirectord +} + case "$1" in configure) create_users create_log_dir + set_capabilities activate_services radiod redirectord ;; diff --git a/debian/prerm b/debian/prerm new file mode 100644 index 0000000000000000000000000000000000000000..987cca8d1f35231ae8091ac606ccbb205888737d --- /dev/null +++ b/debian/prerm @@ -0,0 +1,43 @@ +#!/bin/sh +# prerm script for midas +# +# see: dh_installdeb(1) + +set -e + +. /usr/share/debconf/confmodule + +# summary of how this script can be called: +# * <prerm> `remove' +# * <old-prerm> `upgrade' <new-version> +# * <new-prerm> `failed-upgrade' <old-version> +# * <conflictor's-prerm> `remove' `in-favour' <package> <new-version> +# * <deconfigured's-prerm> `deconfigure' `in-favour' +# <package-being-installed> <version> `removing' +# <conflicting-package> <version> +# for details, see http://www.debian.org/doc/debian-policy/ or +# the debian-policy package + +case "$1" in + remove|upgrade|deconfigure) + # Disable runit services. + /bin/rm -f /etc/service/radiod 2>/dev/null || true + /bin/rm -f /etc/service/redirectord 2>/dev/null || true + ;; + + failed-upgrade) + ;; + + *) + echo "prerm called with unknown argument \`$1'" >&2 + exit 1 + ;; +esac + +# dh_installdeb will replace this with shell code automatically +# generated by other debhelper scripts. + +#DEBHELPER# + +exit 0 + diff --git a/debian/rules b/debian/rules index e85512692b958216ed77decca78205c8fc571ea3..b0031679d8129a2125293b02c1ab2938df463591 100755 --- a/debian/rules +++ b/debian/rules @@ -58,7 +58,7 @@ override_dh_install: install -d -o root -g root $(SVCDIR)/$$f/log ; \ install -m 755 -o root -g root $(CURDIR)/debian/services/$$f \ $(SVCDIR)/$$f/run ; \ - echo '#!/bin/sh\nexec chpst -u log svlogd -tt /var/log/radioai/'$$f > \ + echo '#!/bin/sh\nexec chpst -u log svlogd -tt /var/log/'$$f > \ $(SVCDIR)/$$f/log/run ; \ chmod 0755 $(SVCDIR)/$$f/log/run ; \ done) diff --git a/debian/services/lib.sh b/debian/services/lib.sh index 31a86ca3f3020279e1d5a40c6145724ae3434a7d..32a93e0857846e5dad3c809bbdfb86849e03bac8 100644 --- a/debian/services/lib.sh +++ b/debian/services/lib.sh @@ -10,6 +10,20 @@ if [ -z "${DOMAIN}" ]; then exit 1 fi +resolveall() { + local name="$1" + local port="$2" + getent hosts ${name} | awk "{print \$1 \":${port}\"}" +} + +commaseplist() { + local out= + for arg in "$@" ; do + out="${out}${out:+,}${arg}" + done + echo "${out}" +} + set_public_ip() { # Try to guess the public IP of this host if unset, resolving the # fully-qualified host name. @@ -24,5 +38,6 @@ set_public_ip() { set_etcd_params() { local default_etcd_server="etcd.${DOMAIN}" ETCD_SERVER="${ETCD_SERVER:-${default_etcd_server}}" - ETCD_OPTIONS="${ETCD_OPTIONS} --etcd-server=localhost:4001,${ETCD_SERVER}" + local server_ips=$(commaseplist localhost:4001 $(resolveall ${ETCD_SERVER} 4001)) + ETCD_OPTIONS="${ETCD_OPTIONS} --etcd-servers=${server_ips}" } diff --git a/debian/services/radiod b/debian/services/radiod index 7f171f2ec509059ce0f6f667272f61af4164287c..9c316e5abbcdfd9c54831170b1d5806dd36c52f1 100755 --- a/debian/services/radiod +++ b/debian/services/radiod @@ -6,4 +6,5 @@ set_public_ip set_etcd_params exec chpst -u icecast2 \ - radiod --ip=${PUBLIC_IP} ${ETCD_OPTIONS} ${DAEMON_OPTIONS} + radiod --ip=${PUBLIC_IP} ${ETCD_OPTIONS} ${DAEMON_OPTIONS} \ + 2>&1 diff --git a/debian/services/redirectord b/debian/services/redirectord index 02ed7e747257f7574112e10d562a6f3eae6a050c..795a0b5751a97111c14d09965dfd050e6e57b919 100755 --- a/debian/services/redirectord +++ b/debian/services/redirectord @@ -5,7 +5,7 @@ set_public_ip set_etcd_params -exec setcap cap_net_bind_service=+ep \ - chpst -u nobody \ +exec chpst -u nobody \ redirectord --ip=${PUBLIC_IP} --domain=${DOMAIN} \ - ${ETCD_OPTIONS} ${DAEMON_OPTIONS} + ${ETCD_OPTIONS} ${DAEMON_OPTIONS} \ + 2>&1 diff --git a/etcd_client.go b/etcd_client.go index a6251e7c8edec1def38f9ad4d75f87c350bc8380..2bbc2179cd231b8539e1b363fe17b885f5a137bd 100644 --- a/etcd_client.go +++ b/etcd_client.go @@ -7,7 +7,7 @@ import ( "net" "strings" - "github.com/coreos/go-etcd/etcd" + "git.autistici.org/ale/radioai/third_party/github.com/coreos/go-etcd/etcd" ) var ( @@ -59,9 +59,10 @@ func NewEtcdClient() *etcd.Client { c := etcd.NewClient(machines) if proto == "https" { c.SetScheme(etcd.HTTPS) - if _, err := c.SetCertAndKey(loadFile(*etcdCertFile), loadFile(*etcdKeyFile)); err != nil { + if err := c.SetCertAndKey(loadFile(*etcdCertFile), loadFile(*etcdKeyFile)); err != nil { log.Fatal("Error setting up SSL for etcd client: %s", err) } } + c.SetConsistency(etcd.WEAK_CONSISTENCY) return c } diff --git a/fe/dns.go b/fe/dns.go index e52fa49c74564fe8be013ab7b27daf5bfbd9479d..09ea0b84a96f865ba565632474c9d346094e1a97 100644 --- a/fe/dns.go +++ b/fe/dns.go @@ -26,6 +26,7 @@ var ( } ) +// DNS server. type DnsRedirector struct { client *radioai.RadioAPI origin string @@ -35,6 +36,8 @@ type DnsRedirector struct { soa dns.RR } +// NewDnsRedirector returns a DNS server for the given origin and +// publicIp. The A records served will have the specified ttl. func NewDnsRedirector(client *radioai.RadioAPI, origin, publicIp string, ttl int) *DnsRedirector { if !strings.HasSuffix(origin, ".") { origin += "." @@ -116,6 +119,7 @@ func (d *DnsRedirector) recordForIp(name string, ip string) *dns.A { } } +// Strip the origin from the query. func (d *DnsRedirector) getQuestionName(req *dns.Msg) string { lx := dns.SplitDomainName(req.Question[0].Name) ql := lx[0 : len(lx)-d.originNumParts] @@ -177,6 +181,11 @@ func (d *DnsRedirector) serveDNS(w dns.ResponseWriter, req *dns.Msg) { m.Answer = append(m.Answer, rec) } responseMsg = fmt.Sprintf("%v", ips) + + default: + // Return an error for anything else. + m.SetRcode(req, dns.RcodeNameError) + responseMsg = "NXDOMAIN" } log.Printf("[%d] %s.%s %s (from %s) -> %s", req.MsgHdr.Id, query, d.origin, dns.TypeToString[req.Question[0].Qtype], w.RemoteAddr(), responseMsg) @@ -185,6 +194,8 @@ func (d *DnsRedirector) serveDNS(w dns.ResponseWriter, req *dns.Msg) { w.WriteMsg(m) } +// Run starts the DNS servers on the given address (both tcp and udp). +// It creates new goroutines and returns immediately. func (d *DnsRedirector) Run(addr string) { dns.HandleFunc(d.origin, func(w dns.ResponseWriter, r *dns.Msg) { d.serveDNS(w, r) diff --git a/fe/doc.go b/fe/doc.go new file mode 100644 index 0000000000000000000000000000000000000000..6be3e4083d51d1ebf4bad1b70a3617710fd4e468 --- /dev/null +++ b/fe/doc.go @@ -0,0 +1,37 @@ +// The front-end ('fe') code has the purpose of directing user traffic +// where we want it: that is, on a node that is alive and (possibly) +// not overloaded. We do this at two different levels, DNS and HTTP, +// with two slightly different targets: the former is focused on +// availability, while the latter attempts to evenly distribute +// resource usage. +// +// DNS is used to provide at least one address of an active server: +// the capability to return multiple results, and the high-ttl nature +// of the service mean that we can simply return all the active nodes +// on every request, maximizing the chances that at least one of them +// will be active over a longer period of time. +// +// HTTP requests are istantaneous, and we can't rely on the client +// doing retries, so we must point the user at a single, active node +// on every request. There are two different policies, depending on +// the type of the request: +// +// - SOURCE requests must always reach the current master node. Since +// redirects tend to confuse streaming sources, which might have very +// simple HTTP implementations, we simply proxy the stream to the +// master node. +// +// - listener requests must be routed to an available relay, taking +// utilization into account (be it in terms of bandwidth, cpu usage, +// or more). The fact that the DNS layer returns multiple addresses +// provides already a very rough form of load balancing, but for +// accurate bandwidth planning we can't just rely on clients' +// cooperation. So when a client requests a stream using its public +// URL we need to serve a redirect to the desired node, computed +// according to the load balancing policy. This is currently done by +// serving a M3U file pointing directly at the target node's icecast +// daemon (but this may lock clients to that specific target node on +// failure... client reconnection policies still need some +// investigation). +// +package fe diff --git a/fe/gzip.go b/fe/gzip.go deleted file mode 100644 index 9513dbb50d05a92cc5d8be72a9a4d3a03e4f7b4a..0000000000000000000000000000000000000000 --- a/fe/gzip.go +++ /dev/null @@ -1,205 +0,0 @@ -package fe - -import ( - "compress/gzip" - "io" - "net/http" - "strings" -) - -// Slightly modified by ale@incal.net, based on: -// https://github.com/PuerkitoBio/ghost - -// Thanks to Andrew Gerrand for inspiration: -// https://groups.google.com/d/msg/golang-nuts/eVnTcMwNVjM/4vYU8id9Q2UJ -// -// Also, node's Connect library implementation of the compress middleware: -// https://github.com/senchalabs/connect/blob/master/lib/middleware/compress.js -// -// And StackOverflow's explanation of Vary: Accept-Encoding header: -// http://stackoverflow.com/questions/7848796/what-does-varyaccept-encoding-mean - -// Internal gzipped writer that satisfies both the (body) writer in gzipped format, -// and maintains the rest of the ResponseWriter interface for header manipulation. -type gzipResponseWriter struct { - io.Writer - http.ResponseWriter - r *http.Request // Keep a hold of the Request, for the filter function - filtered bool // Has the request been run through the filter function? - dogzip bool // Should we do GZIP compression for this request? - filterFn func(http.ResponseWriter, *http.Request) bool -} - -// Make sure the filter function is applied. -func (w *gzipResponseWriter) applyFilter() { - if !w.filtered { - if w.dogzip = w.filterFn(w, w.r); w.dogzip { - setGzipHeaders(w.Header()) - } - w.filtered = true - } -} - -// Unambiguous Write() implementation (otherwise both ResponseWriter and Writer -// want to claim this method). -func (w *gzipResponseWriter) Write(b []byte) (int, error) { - w.applyFilter() - if w.dogzip { - // Write compressed - return w.Writer.Write(b) - } - // Write uncompressed - return w.ResponseWriter.Write(b) -} - -// Intercept the WriteHeader call to correctly set the GZIP headers. -func (w *gzipResponseWriter) WriteHeader(code int) { - w.applyFilter() - w.ResponseWriter.WriteHeader(code) -} - -// Implement WrapWriter interface -func (w *gzipResponseWriter) WrappedWriter() http.ResponseWriter { - return w.ResponseWriter -} - -var ( - defaultFilterTypes = [...]string{ - "text/", - "javascript", - "json", - } -) - -// Default filter to check if the response should be GZIPped. -// By default, all text (html, css, xml, ...), javascript and json -// content types are candidates for GZIP. -func defaultFilter(w http.ResponseWriter, r *http.Request) bool { - hdr := w.Header() - for _, tp := range defaultFilterTypes { - ok := headerMatch(hdr, "Content-Type", tp) - if ok { - return true - } - } - return false -} - -// GZIPHandlerFunc is the same as GZIPHandler, it is just a convenience -// signature that accepts a func(http.ResponseWriter, *http.Request) instead of -// a http.Handler interface. It saves the boilerplate http.HandlerFunc() cast. -func GZIPHandlerFunc(h http.HandlerFunc, filterFn func(http.ResponseWriter, *http.Request) bool) http.HandlerFunc { - return GZIPHandler(h, filterFn) -} - -// Gzip compression HTTP handler. If the client supports it, it compresses the response -// written by the wrapped handler. The filter function is called when the response is about -// to be written to determine if compression should be applied. If this argument is nil, -// the default filter will GZIP only content types containing /json|text|javascript/. -func GZIPHandler(h http.Handler, filterFn func(http.ResponseWriter, *http.Request) bool) http.HandlerFunc { - if filterFn == nil { - filterFn = defaultFilter - } - return func(w http.ResponseWriter, r *http.Request) { - if _, ok := getGzipWriter(w); ok { - // Self-awareness, gzip handler is already set up - h.ServeHTTP(w, r) - return - } - hdr := w.Header() - setVaryHeader(hdr) - - // Do nothing on a HEAD request - if r.Method == "HEAD" { - h.ServeHTTP(w, r) - return - } - if !acceptsGzip(r.Header) { - // No gzip support from the client, return uncompressed - h.ServeHTTP(w, r) - return - } - - // Prepare a gzip response container - gz := gzip.NewWriter(w) - gzw := &gzipResponseWriter{ - Writer: gz, - ResponseWriter: w, - r: r, - filterFn: filterFn, - } - h.ServeHTTP(gzw, r) - // Iff the handler completed successfully (no panic) and GZIP was indeed used, close the gzip writer, - // which seems to generate a Write to the underlying writer. - if gzw.dogzip { - gz.Close() - } - } -} - -// Add the vary by "accept-encoding" header if it is not already set. -func setVaryHeader(hdr http.Header) { - if !headerMatch(hdr, "Vary", "accept-encoding") { - hdr.Add("Vary", "Accept-Encoding") - } -} - -// Checks if the client accepts GZIP-encoded responses. -func acceptsGzip(hdr http.Header) bool { - ok := headerMatch(hdr, "Accept-Encoding", "gzip") - if !ok { - ok = headerEquals(hdr, "Accept-Encoding", "*") - } - return ok -} - -func setGzipHeaders(hdr http.Header) { - // The content-type will be explicitly set somewhere down the path of handlers - hdr.Set("Content-Encoding", "gzip") - hdr.Del("Content-Length") -} - -// Helper function to retrieve the gzip writer. -func getGzipWriter(w http.ResponseWriter) (*gzipResponseWriter, bool) { - gz, ok := GetResponseWriter(w, func(tst http.ResponseWriter) bool { - _, ok := tst.(*gzipResponseWriter) - return ok - }) - if ok { - return gz.(*gzipResponseWriter), true - } - return nil, false -} - -func headerMatch(hdr http.Header, name, s string) bool { - return strings.Contains(hdr.Get(name), s) -} - -func headerEquals(hdr http.Header, name, s string) bool { - return hdr.Get(name) == s -} - -// This interface can be implemented by an augmented ResponseWriter, so that -// it doesn't hide other augmented writers in the chain. -type WrapWriter interface { - http.ResponseWriter - WrappedWriter() http.ResponseWriter -} - -// Helper function to retrieve a specific ResponseWriter. -func GetResponseWriter(w http.ResponseWriter, - predicate func(http.ResponseWriter) bool) (http.ResponseWriter, bool) { - - for { - // Check if this writer is the one we're looking for - if w != nil && predicate(w) { - return w, true - } - // If it is a WrapWriter, move back the chain of wrapped writers - ww, ok := w.(WrapWriter) - if !ok { - return nil, false - } - w = ww.WrappedWriter() - } -} diff --git a/fe/http.go b/fe/http.go index 6fef2209001060b321c098c7a0f68379f4bf93ad..ff0333bbfc98b6ab9e9a0432266351d9b32f2743 100644 --- a/fe/http.go +++ b/fe/http.go @@ -7,25 +7,21 @@ import ( "io" "log" "math/rand" + "net" "net/http" - "net/http/httputil" "path/filepath" "strconv" "strings" + "sync" "time" + _ "net/http/pprof" + "git.autistici.org/ale/radioai" + "github.com/PuerkitoBio/ghost/handlers" ) // HTTP redirector. -// -// All user-facing traffic reaches the redirector first (this is -// where the first-level, high-ttl redirection points to). -// -// The purpose of the HTTP redirector is two-fold: sources will be -// proxied to the master icecast server, while clients will be served -// a .m3u file directly pointing at the relays. -// type HttpRedirector struct { domain string client *radioai.RadioAPI @@ -39,7 +35,9 @@ func NewHttpRedirector(client *radioai.RadioAPI, domain string) *HttpRedirector } } -// Return an active node, chosen randomly. +// Return an active node, chosen randomly (this is currently our load +// balancing policy, since there is no status information about the +// nodes yet). func (h *HttpRedirector) pickActiveNode() string { nodes, _ := h.client.GetNodes() if nodes != nil && len(nodes) > 0 { @@ -57,6 +55,10 @@ func (h *HttpRedirector) getMount(r *http.Request) (*radioai.Mount, error) { return h.client.GetMount(path) } +func makeIcecastUrl(server string) string { + return net.JoinHostPort(server, strconv.Itoa(radioai.IcecastPort)) +} + // Serve a response for a client connection to a relay. func (h *HttpRedirector) serveRelay(w http.ResponseWriter, r *http.Request) { mount, err := h.getMount(r) @@ -73,7 +75,7 @@ func (h *HttpRedirector) serveRelay(w http.ResponseWriter, r *http.Request) { } // Create the m3u response. - m3u := fmt.Sprintf("http://%s%s\n", relayAddr, mount.Name) + m3u := fmt.Sprintf("http://%s%s\n", makeIcecastUrl(relayAddr), mount.Name) w.Header().Set("Content-Length", strconv.Itoa(len(m3u))) w.Header().Set("Content-Type", "audio/x-mpegurl") w.Header().Set("Expires", "-1") @@ -82,8 +84,9 @@ func (h *HttpRedirector) serveRelay(w http.ResponseWriter, r *http.Request) { } func (h *HttpRedirector) serveSource(w http.ResponseWriter, r *http.Request) { - mount, err := h.getMount(r) + _, err := h.getMount(r) if err != nil { + log.Printf("source: error retrieving mount for %+v: %s", r, err) http.Error(w, "Not Found", http.StatusNotFound) return } @@ -91,19 +94,57 @@ func (h *HttpRedirector) serveSource(w http.ResponseWriter, r *http.Request) { // Find the current master node. masterAddr, err := h.client.GetMasterAddr() if err != nil { + log.Printf("source: no master: %s", err) + http.Error(w, err.Error(), http.StatusServiceUnavailable) + return + } + + // Hijack the incoming connection. This is just so that we can + // reset the timeout on the underlying network connection + // (which we have no use for once the stream has been + // established), but then we get to run the two-way proxy... + conn, _, err := w.(http.Hijacker).Hijack() + if err != nil { + log.Printf("source: hijack failed: %v", err) http.Error(w, err.Error(), http.StatusServiceUnavailable) return } + defer conn.Close() + if err := conn.SetDeadline(time.Time{}); err != nil { + log.Printf("source: could not reset deadline: %v", err) + } - // Proxy the resulting connection. - proxy := &httputil.ReverseProxy{ - Director: func(req *http.Request) { - req.URL.Host = masterAddr - req.URL.Path = mount.Name - }, - FlushInterval: time.Second, + // Create the upstream connection, and write the original + // request to it as-is (the URL path on the backend is the + // same, and the headers do not need to change). + upstream, err := net.Dial("tcp", makeIcecastUrl(masterAddr)) + if err != nil { + log.Printf("source: dial upstream: %v", err) + return } - proxy.ServeHTTP(w, r) + defer upstream.Close() + if err := r.Write(upstream); err != nil { + log.Printf("source: write upstream request: %v", err) + return + } + + // Start two copiers, one for the source data, one for the + // replies. Wait until both are done. + var wg sync.WaitGroup + wg.Add(2) + go func() { + if _, err := io.Copy(conn, upstream); err != nil { + log.Printf("upstream -> source: Copy: %v", err) + } + wg.Done() + }() + go func() { + if _, err := io.Copy(upstream, conn); err != nil { + log.Printf("source -> upstream: Copy: %v", err) + } + wg.Done() + }() + wg.Wait() } func (h *HttpRedirector) serveStatusPage(w http.ResponseWriter, r *http.Request) { @@ -120,12 +161,20 @@ func (h *HttpRedirector) serveStatusPage(w http.ResponseWriter, r *http.Request) http.Error(w, err.Error(), http.StatusInternalServerError) return } + w.Header().Set("Content-Type", "text/html; charset=utf-8") + w.Header().Set("Content-Length", strconv.Itoa(buf.Len())) w.Write(buf.Bytes()) } func (h *HttpRedirector) ServeHTTP(w http.ResponseWriter, r *http.Request) { if r.URL.Path == "" || r.URL.Path == "/" { - h.serveStatusPage(w, r) + // Serve the status page through a GZIPHandler. Binds + // to h using function closure. + handler := handlers.GZIPHandler( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + h.serveStatusPage(w, r) + }), nil) + handler.ServeHTTP(w, r) } else if r.Method == "SOURCE" { h.serveSource(w, r) } else { @@ -133,21 +182,31 @@ func (h *HttpRedirector) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } +// Run starts the HTTP server on the given addr. Does not return. func (h *HttpRedirector) Run(addr, staticDir, templateDir string) { h.template = template.Must( template.ParseGlob( filepath.Join(templateDir, "*.html"))) + // The purpose of the odd usage of GZIPHandler is to bypass it + // on SOURCE and m3u requests. May not be necessary though. mux := http.NewServeMux() mux.HandleFunc( "/static/", - GZIPHandler( + handlers.GZIPHandler( http.StripPrefix( "/static/", http.FileServer(http.Dir(staticDir))), nil)) mux.Handle("/", h) + // It would be nice to add a logging handler on top of + // everything, but we would become unable to hijack the + // connection on SOURCE requests... + // + //logopts := handlers.NewLogOptions(nil, handlers.Lshort) + //logger := handlers.LogHandler(mux, logopts) + httpServer := &http.Server{ Addr: addr, Handler: mux, diff --git a/masterelection/masterelection.go b/masterelection/masterelection.go index c73a67f1e0e8704c7140f30440d2671024f722b9..eb89748cd7163f43ace6eca550f57c4ad32ec868 100644 --- a/masterelection/masterelection.go +++ b/masterelection/masterelection.go @@ -4,7 +4,7 @@ import ( "log" "time" - "github.com/coreos/go-etcd/etcd" + "git.autistici.org/ale/radioai/third_party/github.com/coreos/go-etcd/etcd" ) const ( @@ -23,12 +23,11 @@ func stateToString(state int) string { } type MasterElection struct { - client *etcd.Client - stop chan bool + client *etcd.Client + stop chan bool stopped bool Addr string - MasterAddr string Path string TTL uint64 @@ -55,11 +54,21 @@ func (m *MasterElection) IsMaster() bool { return m.State == STATE_MASTER } +func (m *MasterElection) GetMasterAddr() string { + response, err := m.client.Get(m.Path, false) + if err != nil { + return "" + } + return response.Value +} + func (m *MasterElection) setState(state int) { if m.State == state { return } - log.Printf("masterelection: status=%s", stateToString(state)) + log.Printf("masterelection: %s -> %s", + stateToString(m.State), + stateToString(state)) if m.StateChange != nil { m.StateChange <- state } @@ -78,50 +87,100 @@ func (m *MasterElection) stopper() { // Remove the lock file if we are the master. if m.State == STATE_MASTER { + log.Printf("releasing masterelection lock") m.client.Delete(m.Path) } } +func (m *MasterElection) runMaster(index uint64) { + m.setState(STATE_MASTER) + + // If we renew the lease every TTL / N, we allow N renewal + // errors before we stop believing being the master. + ttl := time.Second * time.Duration(m.TTL) + tick := time.NewTicker(ttl / 3) + lastUpdate := time.Now() + + for { + select { + case t := <-tick.C: + // To verify that we actually are still the + // master (not just we believe we are), try + // yet another compare-and-swap to check that + // the stored master address is still our own, + // and no-one stole our lock. If not, the TTL + // will be updated (and the lock renewed). + response, err := m.client.CompareAndSwap(m.Path, m.Addr, m.TTL, m.Addr, index) + if err != nil { + log.Printf("error updating lock: %s", err) + + // If we can't renew the lock for a + // TTL, we must assume we lost it. + if t.Sub(lastUpdate) > ttl { + log.Printf("too many errors, lost lock") + return + } + } + index = response.ModifiedIndex + lastUpdate = t + case <-m.stop: + return + } + } +} + +func (m *MasterElection) runSlave(index uint64) { + m.setState(STATE_SLAVE) + + for { + // Start a watch on the lock, waiting for its removal. + response, err := m.client.Watch(m.Path, index+1, nil, m.stop) + if err != nil { + log.Printf("slave Watch() error: %+v", err) + return + } + + if response.Action == "delete" || response.Action == "expire" { + return + } + + index = response.ModifiedIndex + } + +} + func (m *MasterElection) Run() { go m.stopper() // Start as a slave. m.setState(STATE_SLAVE) - halfttl := time.Second * time.Duration(m.TTL / 2) + var watchIndex uint64 for !m.stopped { + + // Since a failed Create does not return the + // RAFT index, let's optimistically query the lock + // before starting just to set a baseline for the + // index. + if iresponse, err := m.client.Get(m.Path, false); err == nil { + log.Printf("lock already exists: %+v", iresponse) + watchIndex = iresponse.ModifiedIndex + } + // Try to acquire the lock. If we are currently the // master, the previous value should be our own // address, otherwise it should be unset. - prevValue := "" - if m.State == STATE_MASTER { - prevValue = m.Addr - } - resp, ok, err := m.client.TestAndSet(m.Path, prevValue, m.Addr, m.TTL) - if err != nil { - log.Printf("%s: error from etcd: %s", m.Path, err) - time.Sleep(20 * time.Millisecond) - continue - } + response, err := m.client.Create(m.Path, m.Addr, m.TTL) - if ok { + if err == nil { // Howdy, we're the master now. Wait a while // and renew our TTL. - m.setState(STATE_MASTER) - m.MasterAddr = m.Addr - time.Sleep(halfttl) + m.runMaster(response.ModifiedIndex) } else { - // We're not the master. Watch for a DELETE - // (in theory, but we're not actually - // verifying the action type, just waiting for - // the first event...) - m.setState(STATE_SLAVE) - m.MasterAddr = resp.PrevValue - _, err := m.client.Watch(m.Path, resp.Index, nil, nil) - if err != nil { - log.Printf("%s: watch error: %s", m.Path, err) - } + // We're not the master. Wait until the lock + // is deleted or expires. + m.runSlave(watchIndex) } } } diff --git a/node/doc.go b/node/doc.go new file mode 100644 index 0000000000000000000000000000000000000000..17645cdd4e10c472798392fa584a0fdaa306145c --- /dev/null +++ b/node/doc.go @@ -0,0 +1,18 @@ +// The 'node' supervisor is the daemon that controls the Icecast +// server, updating its configuration according to what is stored in +// the distributed database (etcd). +// +// A configuration change will regenerate the Icecast configuration +// file and trigger a reload of the Icecast daemon, which implies that +// the node takes ownership of the entire Icecast config (manual edits +// will be overwritten). +// +// Nodes run a presence protocol (using the database) to discover each +// other and ensure that the full list of active nodes is available to +// clients at any given time. +// +// Nodes also run a master election protocol: a single 'master' server +// is elected among them, which will act as the receiver for the +// source streams; the other nodes will be configured as relays. +// +package node diff --git a/node/icecast.go b/node/icecast.go index 4a1c28ae2b0c8a6d521f308c95baab706769fd98..87c7df8ae4a6dc7484b3a6c9690ab285c96b1ea3 100644 --- a/node/icecast.go +++ b/node/icecast.go @@ -4,6 +4,7 @@ import ( "encoding/xml" "io" "net/http" + "errors" "os" "os/exec" "time" @@ -38,7 +39,7 @@ type IcecastController struct { func NewIcecastController(publicIp string) *IcecastController { return &IcecastController{ PublicIp: publicIp, - ConfigFile: "/etc/icecast2/icecast.conf", + ConfigFile: "/etc/icecast2/icecast.xml", InitScript: "/etc/init.d/icecast2", config: newIcecastConfig(publicIp), stop: make(chan bool, 1), @@ -53,7 +54,12 @@ func (ic *IcecastController) reload() error { return err } +// Update reloads the Icecast daemon with a new configuration. func (ic *IcecastController) Update(conf *ClusterConfig, isMaster bool, masterAddr string) error { + if !isMaster && masterAddr == "" { + return errors.New("unknown system state") + } + tmpf := ic.ConfigFile + ".tmp" defer os.Remove(tmpf) diff --git a/node/icecast_config.go b/node/icecast_config.go index b01b4aafba6448c48c6c5d06974665ae58123d00..ca1958276121540819df5e229e1a4e02c4c1174c 100644 --- a/node/icecast_config.go +++ b/node/icecast_config.go @@ -10,8 +10,7 @@ import ( ) var ( - baseHttpPort = 8000 - shoutHttpPort = 8001 + //shoutHttpPort = 8001 maxClients = 10000 ) @@ -81,6 +80,8 @@ type iceMountConfig struct { OnDisconnect string `xml:"on-disconnect,omitempty"` } +// Configuration of the local Icecast daemon (meant for serialization +// to XML). type icecastConfig struct { XMLName xml.Name Limits iceLimitsConfig `xml:"limits"` @@ -95,6 +96,20 @@ type icecastConfig struct { Mounts []iceMountConfig `xml:"mount"` } +// Create an Icecast configuration suitable for a Debian-based system +// install of the 'icecast2' package. Things to note about the +// generated config: +// +// - It binds to the IcecastPort (defined in api.go) on all +// interfaces. +// +// - Local administration is practically disabled. A random admin +// password is created every time the daemon starts. Same goes for the +// global source password. +// +// Some of the parameters should probably be command-line flags, so +// that it is possible to set them on a per-host basis. +// func defaultDebianConfig(publicIp string) *icecastConfig { // Pick some random passwords on startup. We don't use them, // but icecast is happier if they're set. @@ -107,10 +122,10 @@ func defaultDebianConfig(publicIp string) *icecastConfig { Clients: maxClients, Sources: maxClients / 2, Threadpool: 16, - QueueSize: 2 << 10, + QueueSize: 1 << 20, ClientTimeout: 30, HeaderTimeout: 15, - SourceTimeout: 5, + SourceTimeout: 60, BurstOnConnect: 1, BurstSize: 65535, }, @@ -135,8 +150,8 @@ func defaultDebianConfig(publicIp string) *icecastConfig { }, Security: iceSecurityConfig{0}, Listen: []iceListenConfig{ - {"0.0.0.0", baseHttpPort, 0}, - {"0.0.0.0", shoutHttpPort, 1}, + {"0.0.0.0", radioai.IcecastPort, 0}, + //{"0.0.0.0", shoutHttpPort, 1}, }, } } @@ -145,6 +160,7 @@ func newIcecastConfig(publicIp string) *icecastConfig { return defaultDebianConfig(publicIp) } +// Encode the configuration to XML. func (c *icecastConfig) Encode() ([]byte, error) { var buf bytes.Buffer @@ -159,6 +175,7 @@ func (c *icecastConfig) Encode() ([]byte, error) { return buf.Bytes(), nil } +// EncodeToFile writes the configuration to a file. func (c *icecastConfig) EncodeToFile(path string) error { file, err := os.Create(path) if err != nil { @@ -195,7 +212,7 @@ func mountToRelay(masterAddr string, m *radioai.Mount) iceRelayConfig { Mount: m.Name, LocalMount: m.Name, Server: masterAddr, - Port: baseHttpPort, + Port: radioai.IcecastPort, Username: m.Username, Password: m.Password, OnDemand: 1, @@ -203,6 +220,9 @@ func mountToRelay(masterAddr string, m *radioai.Mount) iceRelayConfig { } } +// Update the configuration with the current list of mounts and +// masterelection state. This will clear the Mounts and Relays fields +// and set them to new values. func (ic *icecastConfig) Update(config *ClusterConfig, isMaster bool, masterAddr string) { ic.Mounts = nil ic.Relays = nil diff --git a/node/node.go b/node/node.go index 30ec6458e0060515ce697ab2af89e2a34200e716..e4cd94886941030b04f757b362db782bedf864fc 100644 --- a/node/node.go +++ b/node/node.go @@ -9,7 +9,7 @@ import ( "git.autistici.org/ale/radioai" "git.autistici.org/ale/radioai/masterelection" - "github.com/coreos/go-etcd/etcd" + "git.autistici.org/ale/radioai/third_party/github.com/coreos/go-etcd/etcd" ) func trigger(c chan bool) { @@ -19,6 +19,12 @@ func trigger(c chan bool) { } } +// Remove mountPrefix from the beginning of the path, but keep the +// leading slash. +func keyToMount(key string) string { + return key[len(radioai.MountPrefix)-1:] +} + // In-memory representation of the overall configuration (basically // just a list of the known mounts). type ClusterConfig struct { @@ -80,34 +86,25 @@ func NewConfigSyncer(client *etcd.Client, config *ClusterConfig, upch chan bool, return &ConfigSyncer{ client: client, config: config, - rch: make(chan *etcd.Response, 100), + rch: make(chan *etcd.Response), upch: upch, stop: stop, } } func (w *ConfigSyncer) syncer() { + log.Printf("starting syncer") for { select { case response := <-w.rch: - // Remove mountPrefix from the beginning of - // the path, but keep the leading slash. - mountName := response.Key[len(radioai.MountPrefix)-1:] - switch response.Action { - case "DELETE": + if response.Action == "delete" { + mountName := keyToMount(response.Key) log.Printf("deleted mount %s", mountName) w.config.delMount(mountName) - case "SET": - log.Printf("update to mount %s: %+v", mountName, response) - var m radioai.Mount - if err := json.NewDecoder(strings.NewReader(response.Value)).Decode(&m); err != nil { - log.Printf("corrupted data: %s", err) - continue - } else { - w.config.setMount(&m) - } - default: + } else if response.Action == "set" || response.Action == "create" || response.Action == "update" { + w.updateConfigWithResponse(response.Key, response.Value) + } else { continue } @@ -115,7 +112,7 @@ func (w *ConfigSyncer) syncer() { // the Watcher dies, it knows where to start // from and we do not have to download the // full configuration again. - w.index = response.Index + w.index = response.ModifiedIndex // Trigger an update. trigger(w.upch) @@ -126,21 +123,31 @@ func (w *ConfigSyncer) syncer() { } } +func (w *ConfigSyncer) updateConfigWithResponse(key, value string) { + mountName := keyToMount(key) + log.Printf("updating mount %s: %s", mountName, value) + var m radioai.Mount + if err := json.NewDecoder(strings.NewReader(value)).Decode(&m); err != nil { + log.Printf("corrupted data: %s: %s", value, err) + } else { + w.config.setMount(&m) + } +} + // Run the ConfigSyncer in the background. It will wait for // initialization to complete, so that when this function returns, the // in-memory configuration has already been fully synchronized. func (w *ConfigSyncer) Run() { - go w.syncer() - // Run until the first successful Get(). + log.Printf("attempting to retrieve initial config...") for { - responses, err := w.client.Get(radioai.MountPrefix) - if err == nil { - // Inject all the replies into the channel. - for _, r := range responses { - w.index = r.Index - w.rch <- r + response, err := w.client.Get(radioai.MountPrefix, false) + if err == nil && response.Dir { + // Directly update the configuration. + for _, r := range response.Kvs { + w.updateConfigWithResponse(r.Key, r.Value) } + w.index = response.ModifiedIndex break } log.Printf("Get error: %s", err) @@ -154,14 +161,16 @@ func (w *ConfigSyncer) Run() { } // Update the icecast daemon now that we have a full config. + log.Printf("triggering initial reload") trigger(w.upch) - // Now start the watcher. + // Now start the watcher, and the syncer. + go w.syncer() go func() { for { curIndex := w.index + 1 log.Printf("starting watcher at index %d", curIndex) - _, err := w.client.Watch(radioai.MountPrefix, curIndex, w.rch, w.stop) + _, err := w.client.WatchAll(radioai.MountPrefix, curIndex, w.rch, w.stop) if err == etcd.ErrWatchStoppedByUser { return } else if err != nil { @@ -173,7 +182,7 @@ func (w *ConfigSyncer) Run() { // An active streaming node, managing the local icecast server. type RadioNode struct { - Config *ClusterConfig + Config *ClusterConfig ip string client *etcd.Client @@ -196,8 +205,7 @@ func NewRadioNode(ip string, client *etcd.Client) *RadioNode { // MasterElection changes trigger an update. mech := make(chan int) go func() { - for state := range mech { - log.Printf("master election status changed: %d", state) + for _ = range mech { trigger(upch) } }() @@ -206,7 +214,7 @@ func NewRadioNode(ip string, client *etcd.Client) *RadioNode { stopch := make(chan bool) return &RadioNode{ - Config: config, + Config: config, ip: ip, client: client, me: masterelection.NewMasterElection( @@ -227,12 +235,12 @@ func NewRadioNode(ip string, client *etcd.Client) *RadioNode { // The presence goroutine continuously updates our entry in the list // of nodes. func (rc *RadioNode) presence() { - ticker := time.NewTicker(time.Duration(rc.livenessTtl / 2) * time.Second) + ticker := time.NewTicker(time.Duration(rc.livenessTtl/2) * time.Second) for { select { case <-ticker.C: - if _, err := rc.client.Set(radioai.NodePrefix + rc.ip, rc.ip, rc.livenessTtl); err != nil { + if _, err := rc.client.Set(radioai.NodePrefix+rc.ip, rc.ip, rc.livenessTtl); err != nil { log.Printf("presence: Set(): %s", err) } case <-rc.stop: @@ -251,11 +259,19 @@ func (rc *RadioNode) Run() { // Start the presence heartbeat. go rc.presence() + // Start the masterelection runner. + go rc.me.Run() + + // Wait an instant to give a chance to the other services to + // initialize. + time.Sleep(200 * time.Millisecond) + log.Printf("starting icecast updater") for { select { case <-rc.upch: - if err := rc.icecast.Update(rc.Config, rc.me.IsMaster(), rc.me.MasterAddr); err != nil { + log.Printf("reloading icecast config") + if err := rc.icecast.Update(rc.Config, rc.me.IsMaster(), rc.me.GetMasterAddr()); err != nil { log.Printf("Update(): %s", err) } @@ -272,4 +288,7 @@ func (rc *RadioNode) Run() { // Stop everything. func (rc *RadioNode) Stop() { close(rc.stop) + + // We should use WaitGroups here. Instead, wait 2 seconds. + time.Sleep(2 * time.Second) } diff --git a/third_party/github.com/coreos/go-etcd/etcd/add_child.go b/third_party/github.com/coreos/go-etcd/etcd/add_child.go new file mode 100644 index 0000000000000000000000000000000000000000..f275599c5a41ce1e3510a673ca7c627b96e9ee7b --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/add_child.go @@ -0,0 +1,11 @@ +package etcd + +// Add a new directory with a random etcd-generated key under the given path. +func (c *Client) AddChildDir(key string, ttl uint64) (*Response, error) { + return c.post(key, "", ttl) +} + +// Add a new file with a random etcd-generated key under the given path. +func (c *Client) AddChild(key string, value string, ttl uint64) (*Response, error) { + return c.post(key, value, ttl) +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/add_child_test.go b/third_party/github.com/coreos/go-etcd/etcd/add_child_test.go new file mode 100644 index 0000000000000000000000000000000000000000..efe155467aeec9a3ca904edf351e7fb207260d48 --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/add_child_test.go @@ -0,0 +1,73 @@ +package etcd + +import "testing" + +func TestAddChild(t *testing.T) { + c := NewClient(nil) + defer func() { + c.DeleteAll("fooDir") + c.DeleteAll("nonexistentDir") + }() + + c.SetDir("fooDir", 5) + + _, err := c.AddChild("fooDir", "v0", 5) + if err != nil { + t.Fatal(err) + } + + _, err = c.AddChild("fooDir", "v1", 5) + if err != nil { + t.Fatal(err) + } + + resp, err := c.Get("fooDir", true) + // The child with v0 should proceed the child with v1 because it's added + // earlier, so it should have a lower key. + if !(len(resp.Kvs) == 2 && (resp.Kvs[0].Value == "v0" && resp.Kvs[1].Value == "v1")) { + t.Fatalf("AddChild 1 failed. There should be two chlidren whose values are v0 and v1, respectively."+ + " The response was: %#v", resp) + } + + // Creating a child under a nonexistent directory should succeed. + // The directory should be created. + resp, err = c.AddChild("nonexistentDir", "foo", 5) + if err != nil { + t.Fatal(err) + } +} + +func TestAddChildDir(t *testing.T) { + c := NewClient(nil) + defer func() { + c.DeleteAll("fooDir") + c.DeleteAll("nonexistentDir") + }() + + c.SetDir("fooDir", 5) + + _, err := c.AddChildDir("fooDir", 5) + if err != nil { + t.Fatal(err) + } + + _, err = c.AddChildDir("fooDir", 5) + if err != nil { + t.Fatal(err) + } + + resp, err := c.Get("fooDir", true) + // The child with v0 should proceed the child with v1 because it's added + // earlier, so it should have a lower key. + if !(len(resp.Kvs) == 2 && (len(resp.Kvs[0].KVPairs) == 0 && len(resp.Kvs[1].KVPairs) == 0)) { + t.Fatalf("AddChildDir 1 failed. There should be two chlidren whose values are v0 and v1, respectively."+ + " The response was: %#v", resp) + } + + // Creating a child under a nonexistent directory should succeed. + // The directory should be created. + resp, err = c.AddChildDir("nonexistentDir", 5) + if err != nil { + t.Fatal(err) + } +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/client.go b/third_party/github.com/coreos/go-etcd/etcd/client.go new file mode 100644 index 0000000000000000000000000000000000000000..63ce6ab7fa67f315462933b33c999f8c05e8f409 --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/client.go @@ -0,0 +1,349 @@ +package etcd + +import ( + "crypto/tls" + "encoding/json" + "errors" + "io" + "io/ioutil" + "net" + "net/http" + "net/url" + "os" + "path" + "reflect" + "strings" + "time" +) + +const ( + HTTP = iota + HTTPS +) + +// See SetConsistency for how to use these constants. +const ( + // Using strings rather than iota because the consistency level + // could be persisted to disk, so it'd be better to use + // human-readable values. + STRONG_CONSISTENCY = "STRONG" + WEAK_CONSISTENCY = "WEAK" +) + +type Cluster struct { + Leader string `json:"leader"` + Machines []string `json:"machines"` +} + +type Config struct { + CertFile string `json:"certFile"` + KeyFile string `json:"keyFile"` + Scheme string `json:"scheme"` + Timeout time.Duration `json:"timeout"` + Consistency string `json: "consistency"` +} + +type Client struct { + cluster Cluster `json:"cluster"` + config Config `json:"config"` + httpClient *http.Client + persistence io.Writer +} + +type options map[string]interface{} + +// An internally-used data structure that represents a mapping +// between valid options and their kinds +type validOptions map[string]reflect.Kind + +// NewClient create a basic client that is configured to be used +// with the given machine list. +func NewClient(machines []string) *Client { + // if an empty slice was sent in then just assume localhost + if len(machines) == 0 { + machines = []string{"http://127.0.0.1:4001"} + } + + // default leader and machines + cluster := Cluster{ + Leader: machines[0], + Machines: machines, + } + + config := Config{ + // default use http + Scheme: "http", + // default timeout is one second + Timeout: time.Second, + // default consistency level is STRONG + Consistency: STRONG_CONSISTENCY, + } + + client := &Client{ + cluster: cluster, + config: config, + } + + err := setupHttpClient(client) + if err != nil { + panic(err) + } + + return client +} + +// NewClientFile creates a client from a given file path. +// The given file is expected to use the JSON format. +func NewClientFile(fpath string) (*Client, error) { + fi, err := os.Open(fpath) + if err != nil { + return nil, err + } + defer func() { + if err := fi.Close(); err != nil { + panic(err) + } + }() + + return NewClientReader(fi) +} + +// NewClientReader creates a Client configured from a given reader. +// The config is expected to use the JSON format. +func NewClientReader(reader io.Reader) (*Client, error) { + var client Client + + b, err := ioutil.ReadAll(reader) + if err != nil { + return nil, err + } + + err = json.Unmarshal(b, &client) + if err != nil { + return nil, err + } + + err = setupHttpClient(&client) + if err != nil { + return nil, err + } + + return &client, nil +} + +func setupHttpClient(client *Client) error { + if client.config.CertFile != "" && client.config.KeyFile != "" { + err := client.SetCertAndKey(client.config.CertFile, client.config.KeyFile) + if err != nil { + return err + } + } else { + client.config.CertFile = "" + client.config.KeyFile = "" + tr := &http.Transport{ + Dial: dialTimeout, + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true, + }, + } + client.httpClient = &http.Client{Transport: tr} + } + + return nil +} + +// SetPersistence sets a writer to which the config will be +// written every time it's changed. +func (c *Client) SetPersistence(writer io.Writer) { + c.persistence = writer +} + +// SetConsistency changes the consistency level of the client. +// +// When consistency is set to STRONG_CONSISTENCY, all requests, +// including GET, are sent to the leader. This means that, assuming +// the absence of leader failures, GET requests are guranteed to see +// the changes made by previous requests. +// +// When consistency is set to WEAK_CONSISTENCY, other requests +// are still sent to the leader, but GET requests are sent to a +// random server from the server pool. This reduces the read +// load on the leader, but it's not guranteed that the GET requests +// will see changes made by previous requests (they might have not +// yet been commited on non-leader servers). +func (c *Client) SetConsistency(consistency string) error { + if !(consistency == STRONG_CONSISTENCY || consistency == WEAK_CONSISTENCY) { + return errors.New("The argument must be either STRONG_CONSISTENCY or WEAK_CONSISTENCY.") + } + c.config.Consistency = consistency + return nil +} + +// MarshalJSON implements the Marshaller interface +// as defined by the standard JSON package. +func (c *Client) MarshalJSON() ([]byte, error) { + b, err := json.Marshal(struct { + Config Config `json:"config"` + Cluster Cluster `json:"cluster"` + }{ + Config: c.config, + Cluster: c.cluster, + }) + + if err != nil { + return nil, err + } + + return b, nil +} + +// UnmarshalJSON implements the Unmarshaller interface +// as defined by the standard JSON package. +func (c *Client) UnmarshalJSON(b []byte) error { + temp := struct { + Config Config `json: "config"` + Cluster Cluster `json: "cluster"` + }{} + err := json.Unmarshal(b, &temp) + if err != nil { + return err + } + + c.cluster = temp.Cluster + c.config = temp.Config + return nil +} + +// saveConfig saves the current config using c.persistence. +func (c *Client) saveConfig() error { + if c.persistence != nil { + b, err := json.Marshal(c) + if err != nil { + return err + } + + _, err = c.persistence.Write(b) + if err != nil { + return err + } + } + + return nil +} + +func (c *Client) SetCertAndKey(cert string, key string) error { + if cert != "" && key != "" { + tlsCert, err := tls.LoadX509KeyPair(cert, key) + + if err != nil { + return err + } + + tr := &http.Transport{ + TLSClientConfig: &tls.Config{ + Certificates: []tls.Certificate{tlsCert}, + InsecureSkipVerify: true, + }, + Dial: dialTimeout, + } + + c.httpClient = &http.Client{Transport: tr} + c.saveConfig() + return nil + } + return errors.New("Require both cert and key path") +} + +func (c *Client) SetScheme(scheme int) error { + if scheme == HTTP { + c.config.Scheme = "http" + c.saveConfig() + return nil + } + if scheme == HTTPS { + c.config.Scheme = "https" + c.saveConfig() + return nil + } + return errors.New("Unknown Scheme") +} + +// SetCluster updates config using the given machine list. +func (c *Client) SetCluster(machines []string) bool { + success := c.internalSyncCluster(machines) + return success +} + +func (c *Client) GetCluster() []string { + return c.cluster.Machines +} + +// SyncCluster updates config using the internal machine list. +func (c *Client) SyncCluster() bool { + success := c.internalSyncCluster(c.cluster.Machines) + return success +} + +// internalSyncCluster syncs cluster information using the given machine list. +func (c *Client) internalSyncCluster(machines []string) bool { + for _, machine := range machines { + httpPath := c.createHttpPath(machine, version+"/machines") + resp, err := c.httpClient.Get(httpPath) + if err != nil { + // try another machine in the cluster + continue + } else { + b, err := ioutil.ReadAll(resp.Body) + resp.Body.Close() + if err != nil { + // try another machine in the cluster + continue + } + + // update Machines List + c.cluster.Machines = strings.Split(string(b), ", ") + + // update leader + // the first one in the machine list is the leader + logger.Debugf("update.leader[%s,%s]", c.cluster.Leader, c.cluster.Machines[0]) + c.cluster.Leader = c.cluster.Machines[0] + + logger.Debug("sync.machines ", c.cluster.Machines) + c.saveConfig() + return true + } + } + return false +} + +// createHttpPath creates a complete HTTP URL. +// serverName should contain both the host name and a port number, if any. +func (c *Client) createHttpPath(serverName string, _path string) string { + u, _ := url.Parse(serverName) + u.Path = path.Join(u.Path, "/", _path) + + if u.Scheme == "" { + u.Scheme = "http" + } + return u.String() +} + +// Dial with timeout. +func dialTimeout(network, addr string) (net.Conn, error) { + return net.DialTimeout(network, addr, time.Second) +} + +func (c *Client) updateLeader(httpPath string) { + u, _ := url.Parse(httpPath) + + var leader string + if u.Scheme == "" { + leader = "http://" + u.Host + } else { + leader = u.Scheme + "://" + u.Host + } + + logger.Debugf("update.leader[%s,%s]", c.cluster.Leader, leader) + c.cluster.Leader = leader + c.saveConfig() +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/client_test.go b/third_party/github.com/coreos/go-etcd/etcd/client_test.go new file mode 100644 index 0000000000000000000000000000000000000000..b25611b22fdc76d63528ea8fe1d9ce33c72acc94 --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/client_test.go @@ -0,0 +1,94 @@ +package etcd + +import ( + "encoding/json" + "fmt" + "net" + "net/url" + "os" + "testing" +) + +// To pass this test, we need to create a cluster of 3 machines +// The server should be listening on 127.0.0.1:4001, 4002, 4003 +func TestSync(t *testing.T) { + fmt.Println("Make sure there are three nodes at 0.0.0.0:4001-4003") + + c := NewClient(nil) + + success := c.SyncCluster() + if !success { + t.Fatal("cannot sync machines") + } + + for _, m := range c.GetCluster() { + u, err := url.Parse(m) + if err != nil { + t.Fatal(err) + } + if u.Scheme != "http" { + t.Fatal("scheme must be http") + } + + host, _, err := net.SplitHostPort(u.Host) + if err != nil { + t.Fatal(err) + } + if host != "127.0.0.1" { + t.Fatal("Host must be 127.0.0.1") + } + } + + badMachines := []string{"abc", "edef"} + + success = c.SetCluster(badMachines) + + if success { + t.Fatal("should not sync on bad machines") + } + + goodMachines := []string{"127.0.0.1:4002"} + + success = c.SetCluster(goodMachines) + + if !success { + t.Fatal("cannot sync machines") + } else { + fmt.Println(c.cluster.Machines) + } + +} + +func TestPersistence(t *testing.T) { + c := NewClient(nil) + c.SyncCluster() + + fo, err := os.Create("config.json") + if err != nil { + t.Fatal(err) + } + defer func() { + if err := fo.Close(); err != nil { + panic(err) + } + }() + + c.SetPersistence(fo) + err = c.saveConfig() + if err != nil { + t.Fatal(err) + } + + c2, err := NewClientFile("config.json") + if err != nil { + t.Fatal(err) + } + + // Verify that the two clients have the same config + b1, _ := json.Marshal(c) + b2, _ := json.Marshal(c2) + + if string(b1) != string(b2) { + t.Fatalf("The two configs should be equal!") + } +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/compare_and_swap.go b/third_party/github.com/coreos/go-etcd/etcd/compare_and_swap.go new file mode 100644 index 0000000000000000000000000000000000000000..565a03ef1ac9f362a85c5b034fddf2c91268f4e8 --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/compare_and_swap.go @@ -0,0 +1,18 @@ +package etcd + +import "fmt" + +func (c *Client) CompareAndSwap(key string, value string, ttl uint64, prevValue string, prevIndex uint64) (*Response, error) { + if prevValue == "" && prevIndex == 0 { + return nil, fmt.Errorf("You must give either prevValue or prevIndex.") + } + + options := options{} + if prevValue != "" { + options["prevValue"] = prevValue + } + if prevIndex != 0 { + options["prevIndex"] = prevIndex + } + return c.put(key, value, ttl, options) +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/compare_and_swap_test.go b/third_party/github.com/coreos/go-etcd/etcd/compare_and_swap_test.go new file mode 100644 index 0000000000000000000000000000000000000000..bc452a910cd28e047c954e59e6727b5bf7313321 --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/compare_and_swap_test.go @@ -0,0 +1,51 @@ +package etcd + +import ( + "testing" +) + +func TestCompareAndSwap(t *testing.T) { + c := NewClient(nil) + defer func() { + c.DeleteAll("foo") + }() + + c.Set("foo", "bar", 5) + + // This should succeed + resp, err := c.CompareAndSwap("foo", "bar2", 5, "bar", 0) + if err != nil { + t.Fatal(err) + } + if !(resp.Value == "bar2" && resp.PrevValue == "bar" && + resp.Key == "/foo" && resp.TTL == 5) { + t.Fatalf("CompareAndSwap 1 failed: %#v", resp) + } + + // This should fail because it gives an incorrect prevValue + resp, err = c.CompareAndSwap("foo", "bar3", 5, "xxx", 0) + if err == nil { + t.Fatalf("CompareAndSwap 2 should have failed. The response is: %#v", resp) + } + + resp, err = c.Set("foo", "bar", 5) + if err != nil { + t.Fatal(err) + } + + // This should succeed + resp, err = c.CompareAndSwap("foo", "bar2", 5, "", resp.ModifiedIndex) + if err != nil { + t.Fatal(err) + } + if !(resp.Value == "bar2" && resp.PrevValue == "bar" && + resp.Key == "/foo" && resp.TTL == 5) { + t.Fatalf("CompareAndSwap 1 failed: %#v", resp) + } + + // This should fail because it gives an incorrect prevIndex + resp, err = c.CompareAndSwap("foo", "bar3", 5, "", 29817514) + if err == nil { + t.Fatalf("CompareAndSwap 2 should have failed. The response is: %#v", resp) + } +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/debug.go b/third_party/github.com/coreos/go-etcd/etcd/debug.go new file mode 100644 index 0000000000000000000000000000000000000000..bd67398813a226905869a6b759b666678d7df070 --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/debug.go @@ -0,0 +1,29 @@ +package etcd + +import ( + "github.com/coreos/go-log/log" + "os" +) + +var logger *log.Logger + +func init() { + setLogger(log.PriErr) + // Uncomment the following line if you want to see lots of logs + // OpenDebug() +} + +func OpenDebug() { + setLogger(log.PriDebug) +} + +func CloseDebug() { + setLogger(log.PriErr) +} + +func setLogger(priority log.Priority) { + logger = log.NewSimple( + log.PriorityFilter( + priority, + log.WriterSink(os.Stdout, log.BasicFormat, log.BasicFields))) +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/delete.go b/third_party/github.com/coreos/go-etcd/etcd/delete.go new file mode 100644 index 0000000000000000000000000000000000000000..00348f6ba8858fca1658ec610d7190b9011762c8 --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/delete.go @@ -0,0 +1,17 @@ +package etcd + +// DeleteAll deletes everything under the given key. If the key +// points to a file, the file will be deleted. If the key points +// to a directory, then everything under the directory, include +// all child directories, will be deleted. +func (c *Client) DeleteAll(key string) (*Response, error) { + return c.delete(key, options{ + "recursive": true, + }) +} + +// Delete deletes the given key. If the key points to a +// directory, the method will fail. +func (c *Client) Delete(key string) (*Response, error) { + return c.delete(key, nil) +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/delete_test.go b/third_party/github.com/coreos/go-etcd/etcd/delete_test.go new file mode 100644 index 0000000000000000000000000000000000000000..0f8475a235c7bd2b56141c36f2d56bc05a006b80 --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/delete_test.go @@ -0,0 +1,64 @@ +package etcd + +import ( + "testing" +) + +func TestDelete(t *testing.T) { + c := NewClient(nil) + defer func() { + c.DeleteAll("foo") + }() + + c.Set("foo", "bar", 5) + resp, err := c.Delete("foo") + if err != nil { + t.Fatal(err) + } + + if !(resp.PrevValue == "bar" && resp.Value == "") { + t.Fatalf("Delete failed with %s %s", resp.PrevValue, + resp.Value) + } + + resp, err = c.Delete("foo") + if err == nil { + t.Fatalf("Delete should have failed because the key foo did not exist. "+ + "The response was: %v", resp) + } +} + +func TestDeleteAll(t *testing.T) { + c := NewClient(nil) + defer func() { + c.DeleteAll("foo") + c.DeleteAll("fooDir") + }() + + c.Set("foo", "bar", 5) + resp, err := c.DeleteAll("foo") + if err != nil { + t.Fatal(err) + } + + if !(resp.PrevValue == "bar" && resp.Value == "") { + t.Fatalf("DeleteAll 1 failed: %#v", resp) + } + + c.SetDir("fooDir", 5) + c.Set("fooDir/foo", "bar", 5) + resp, err = c.DeleteAll("fooDir") + if err != nil { + t.Fatal(err) + } + + if !(resp.PrevValue == "" && resp.Value == "") { + t.Fatalf("DeleteAll 2 failed: %#v", resp) + } + + resp, err = c.DeleteAll("foo") + if err == nil { + t.Fatalf("DeleteAll should have failed because the key foo did not exist. "+ + "The response was: %v", resp) + } +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/error.go b/third_party/github.com/coreos/go-etcd/etcd/error.go new file mode 100644 index 0000000000000000000000000000000000000000..9a3268d607cea8924ca671a6d58f3756da7a4b84 --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/error.go @@ -0,0 +1,24 @@ +package etcd + +import ( + "encoding/json" + "fmt" +) + +type EtcdError struct { + ErrorCode int `json:"errorCode"` + Message string `json:"message"` + Cause string `json:"cause,omitempty"` +} + +func (e EtcdError) Error() string { + return fmt.Sprintf("%d: %s (%s)", e.ErrorCode, e.Message, e.Cause) +} + +func handleError(b []byte) error { + var err EtcdError + + json.Unmarshal(b, &err) + + return err +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/get.go b/third_party/github.com/coreos/go-etcd/etcd/get.go new file mode 100644 index 0000000000000000000000000000000000000000..d42a83c7d93d3a478dc82ac1d038c3916f682414 --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/get.go @@ -0,0 +1,23 @@ +package etcd + +// GetDir gets the all contents under the given key. +// If the key points to a file, the file is returned. +// If the key points to a directory, everything under it is returnd, +// including all contents under all child directories. +func (c *Client) GetAll(key string, sort bool) (*Response, error) { + return c.get(key, options{ + "recursive": true, + "sorted": sort, + }) +} + +// Get gets the file or directory associated with the given key. +// If the key points to a directory, files and directories under +// it will be returned in sorted or unsorted order, depending on +// the sort flag. Note that contents under child directories +// will not be returned. To get those contents, use GetAll. +func (c *Client) Get(key string, sort bool) (*Response, error) { + return c.get(key, options{ + "sorted": sort, + }) +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/get_test.go b/third_party/github.com/coreos/go-etcd/etcd/get_test.go new file mode 100644 index 0000000000000000000000000000000000000000..a34946c7eddb10ca2caedfa5268eef8641e54340 --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/get_test.go @@ -0,0 +1,99 @@ +package etcd + +import ( + "reflect" + "testing" +) + +func TestGet(t *testing.T) { + c := NewClient(nil) + defer func() { + c.DeleteAll("foo") + }() + + c.Set("foo", "bar", 5) + + result, err := c.Get("foo", false) + + if err != nil { + t.Fatal(err) + } + + if result.Key != "/foo" || result.Value != "bar" { + t.Fatalf("Get failed with %s %s %v", result.Key, result.Value, result.TTL) + } + + result, err = c.Get("goo", false) + if err == nil { + t.Fatalf("should not be able to get non-exist key") + } +} + +func TestGetAll(t *testing.T) { + c := NewClient(nil) + defer func() { + c.DeleteAll("fooDir") + }() + + c.SetDir("fooDir", 5) + c.Set("fooDir/k0", "v0", 5) + c.Set("fooDir/k1", "v1", 5) + + // Return kv-pairs in sorted order + result, err := c.Get("fooDir", true) + + if err != nil { + t.Fatal(err) + } + + expected := kvPairs{ + KeyValuePair{ + Key: "/fooDir/k0", + Value: "v0", + }, + KeyValuePair{ + Key: "/fooDir/k1", + Value: "v1", + }, + } + + if !reflect.DeepEqual(result.Kvs, expected) { + t.Fatalf("(actual) %v != (expected) %v", result.Kvs, expected) + } + + // Test the `recursive` option + c.SetDir("fooDir/childDir", 5) + c.Set("fooDir/childDir/k2", "v2", 5) + + // Return kv-pairs in sorted order + result, err = c.GetAll("fooDir", true) + + if err != nil { + t.Fatal(err) + } + + expected = kvPairs{ + KeyValuePair{ + Key: "/fooDir/childDir", + Dir: true, + KVPairs: kvPairs{ + KeyValuePair{ + Key: "/fooDir/childDir/k2", + Value: "v2", + }, + }, + }, + KeyValuePair{ + Key: "/fooDir/k0", + Value: "v0", + }, + KeyValuePair{ + Key: "/fooDir/k1", + Value: "v1", + }, + } + + if !reflect.DeepEqual(result.Kvs, expected) { + t.Fatalf("(actual) %v != (expected) %v", result.Kvs) + } +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/requests.go b/third_party/github.com/coreos/go-etcd/etcd/requests.go new file mode 100644 index 0000000000000000000000000000000000000000..83e3b519ef799b4f3b9974dd4f5263f36b78966f --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/requests.go @@ -0,0 +1,290 @@ +package etcd + +import ( + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "math/rand" + "net/http" + "net/url" + "path" + "reflect" + "strings" + "time" +) + +// Valid options for GET, PUT, POST, DELETE +// Using CAPITALIZED_UNDERSCORE to emphasize that these +// values are meant to be used as constants. +var ( + VALID_GET_OPTIONS = validOptions{ + "recursive": reflect.Bool, + "consistent": reflect.Bool, + "sorted": reflect.Bool, + "wait": reflect.Bool, + "waitIndex": reflect.Uint64, + } + + VALID_PUT_OPTIONS = validOptions{ + "prevValue": reflect.String, + "prevIndex": reflect.Uint64, + "prevExist": reflect.Bool, + } + + VALID_POST_OPTIONS = validOptions{} + + VALID_DELETE_OPTIONS = validOptions{ + "recursive": reflect.Bool, + } + + curlChan chan string +) + +// SetCurlChan sets a channel to which cURL commands which can be used to +// re-produce requests are sent. This is useful for debugging. +func SetCurlChan(c chan string) { + curlChan = c +} + +// get issues a GET request +func (c *Client) get(key string, options options) (*Response, error) { + logger.Debugf("get %s [%s]", key, c.cluster.Leader) + + p := path.Join("keys", key) + // If consistency level is set to STRONG, append + // the `consistent` query string. + if c.config.Consistency == STRONG_CONSISTENCY { + options["consistent"] = true + } + if options != nil { + str, err := optionsToString(options, VALID_GET_OPTIONS) + if err != nil { + return nil, err + } + p += str + } + + resp, err := c.sendRequest("GET", p, url.Values{}) + + if err != nil { + return nil, err + } + + return resp, nil +} + +// put issues a PUT request +func (c *Client) put(key string, value string, ttl uint64, options options) (*Response, error) { + logger.Debugf("put %s, %s, ttl: %d, [%s]", key, value, ttl, c.cluster.Leader) + v := url.Values{} + + if value != "" { + v.Set("value", value) + } + + if ttl > 0 { + v.Set("ttl", fmt.Sprintf("%v", ttl)) + } + + p := path.Join("keys", key) + if options != nil { + str, err := optionsToString(options, VALID_PUT_OPTIONS) + if err != nil { + return nil, err + } + p += str + } + + resp, err := c.sendRequest("PUT", p, v) + + if err != nil { + return nil, err + } + + return resp, nil +} + +// post issues a POST request +func (c *Client) post(key string, value string, ttl uint64) (*Response, error) { + logger.Debugf("post %s, %s, ttl: %d, [%s]", key, value, ttl, c.cluster.Leader) + v := url.Values{} + + if value != "" { + v.Set("value", value) + } + + if ttl > 0 { + v.Set("ttl", fmt.Sprintf("%v", ttl)) + } + + resp, err := c.sendRequest("POST", path.Join("keys", key), v) + + if err != nil { + return nil, err + } + + return resp, nil +} + +// delete issues a DELETE request +func (c *Client) delete(key string, options options) (*Response, error) { + logger.Debugf("delete %s [%s]", key, c.cluster.Leader) + v := url.Values{} + + p := path.Join("keys", key) + if options != nil { + str, err := optionsToString(options, VALID_DELETE_OPTIONS) + if err != nil { + return nil, err + } + p += str + } + + resp, err := c.sendRequest("DELETE", p, v) + + if err != nil { + return nil, err + } + + return resp, nil +} + +// sendRequest sends a HTTP request and returns a Response as defined by etcd +func (c *Client) sendRequest(method string, _path string, values url.Values) (*Response, error) { + var body string = values.Encode() + var resp *http.Response + var req *http.Request + + retry := 0 + // if we connect to a follower, we will retry until we found a leader + for { + var httpPath string + + // If _path has schema already, then it's assumed to be + // a complete URL and therefore needs no further processing. + u, err := url.Parse(_path) + if err != nil { + return nil, err + } + + if u.Scheme != "" { + httpPath = _path + } else { + if method == "GET" && c.config.Consistency == WEAK_CONSISTENCY { + // If it's a GET and consistency level is set to WEAK, + // then use a random machine. + httpPath = c.getHttpPath(true, _path) + } else { + // Else use the leader. + httpPath = c.getHttpPath(false, _path) + } + } + + // Return a cURL command if curlChan is set + if curlChan != nil { + command := fmt.Sprintf("curl -X %s %s", method, httpPath) + for key, value := range values { + command += fmt.Sprintf(" -d %s=%s", key, value[0]) + } + curlChan <- command + } + + logger.Debug("send.request.to ", httpPath, " | method ", method) + if body == "" { + + req, _ = http.NewRequest(method, httpPath, nil) + + } else { + req, _ = http.NewRequest(method, httpPath, strings.NewReader(body)) + req.Header.Set("Content-Type", "application/x-www-form-urlencoded; param=value") + } + + resp, err = c.httpClient.Do(req) + + logger.Debug("recv.response.from ", httpPath) + // network error, change a machine! + if err != nil { + retry++ + if retry > 2*len(c.cluster.Machines) { + return nil, errors.New("Cannot reach servers") + } + num := retry % len(c.cluster.Machines) + logger.Debug("update.leader[", c.cluster.Leader, ",", c.cluster.Machines[num], "]") + c.cluster.Leader = c.cluster.Machines[num] + time.Sleep(time.Millisecond * 200) + continue + } + + if resp != nil { + if resp.StatusCode == http.StatusTemporaryRedirect { + httpPath := resp.Header.Get("Location") + + resp.Body.Close() + + if httpPath == "" { + return nil, errors.New("Cannot get redirection location") + } + + c.updateLeader(httpPath) + logger.Debug("send.redirect") + // try to connect the leader + continue + } else if resp.StatusCode == http.StatusInternalServerError { + resp.Body.Close() + + retry++ + if retry > 2*len(c.cluster.Machines) { + return nil, errors.New("Cannot reach servers") + } + continue + } else { + logger.Debug("send.return.response ", httpPath) + break + } + + } + logger.Debug("error.from ", httpPath, " ", err.Error()) + return nil, err + } + + // Convert HTTP response to etcd response + b, err := ioutil.ReadAll(resp.Body) + + resp.Body.Close() + + if err != nil { + return nil, err + } + + if !(resp.StatusCode == http.StatusOK || + resp.StatusCode == http.StatusCreated) { + return nil, handleError(b) + } + + var result Response + + err = json.Unmarshal(b, &result) + + if err != nil { + return nil, err + } + + return &result, nil +} + +func (c *Client) getHttpPath(random bool, s ...string) string { + var machine string + if random { + machine = c.cluster.Machines[rand.Intn(len(c.cluster.Machines))] + } else { + machine = c.cluster.Leader + } + + fullPath := machine + "/" + version + for _, seg := range s { + fullPath = fullPath + "/" + seg + } + + return fullPath +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/response.go b/third_party/github.com/coreos/go-etcd/etcd/response.go new file mode 100644 index 0000000000000000000000000000000000000000..d05b8f45d1302235a5a7b8844c078da87816ca82 --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/response.go @@ -0,0 +1,50 @@ +package etcd + +import ( + "time" +) + +// The response object from the server. +type Response struct { + Action string `json:"action"` + Key string `json:"key"` + Dir bool `json:"dir,omitempty"` + PrevValue string `json:"prevValue,omitempty"` + Value string `json:"value,omitempty"` + Kvs kvPairs `json:"kvs,omitempty"` + + // If the key did not exist before the action, + // this field should be set to true + NewKey bool `json:"newKey,omitempty"` + + Expiration *time.Time `json:"expiration,omitempty"` + + // Time to live in second + TTL int64 `json:"ttl,omitempty"` + + // The command index of the raft machine when the command is executed + ModifiedIndex uint64 `json:"modifiedIndex"` +} + +// When user list a directory, we add all the node into key-value pair slice +type KeyValuePair struct { + Key string `json:"key, omitempty"` + Value string `json:"value,omitempty"` + Dir bool `json:"dir,omitempty"` + KVPairs kvPairs `json:"kvs,omitempty"` +} + +type kvPairs []KeyValuePair + +// interfaces for sorting +func (kvs kvPairs) Len() int { + return len(kvs) +} + +func (kvs kvPairs) Less(i, j int) bool { + return kvs[i].Key < kvs[j].Key +} + +func (kvs kvPairs) Swap(i, j int) { + kvs[i], kvs[j] = kvs[j], kvs[i] +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/set_curl_chan_test.go b/third_party/github.com/coreos/go-etcd/etcd/set_curl_chan_test.go new file mode 100644 index 0000000000000000000000000000000000000000..6d06331095bceef59ee80a44074d623c2e5d31ac --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/set_curl_chan_test.go @@ -0,0 +1,43 @@ +package etcd + +import ( + "fmt" + "testing" +) + +func TestSetCurlChan(t *testing.T) { + c := NewClient(nil) + defer func() { + c.DeleteAll("foo") + }() + + curlChan := make(chan string, 1) + SetCurlChan(curlChan) + + _, err := c.Set("foo", "bar", 5) + if err != nil { + t.Fatal(err) + } + + expected := fmt.Sprintf("curl -X PUT %s/v2/keys/foo -d value=bar -d ttl=5", + c.cluster.Leader) + actual := <-curlChan + if expected != actual { + t.Fatalf(`Command "%s" is not equal to expected value "%s"`, + actual, expected) + } + + c.SetConsistency(STRONG_CONSISTENCY) + _, err = c.Get("foo", false) + if err != nil { + t.Fatal(err) + } + + expected = fmt.Sprintf("curl -X GET %s/v2/keys/foo?consistent=true&sorted=false", + c.cluster.Leader) + actual = <-curlChan + if expected != actual { + t.Fatalf(`Command "%s" is not equal to expected value "%s"`, + actual, expected) + } +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/set_update_create.go b/third_party/github.com/coreos/go-etcd/etcd/set_update_create.go new file mode 100644 index 0000000000000000000000000000000000000000..281cd577cf65a993151fb1ed305914c3985630b7 --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/set_update_create.go @@ -0,0 +1,43 @@ +package etcd + +// SetDir sets the given key to a directory. +func (c *Client) SetDir(key string, ttl uint64) (*Response, error) { + return c.put(key, "", ttl, nil) +} + +// UpdateDir updates the given key to a directory. It succeeds only if the +// given key already exists. +func (c *Client) UpdateDir(key string, ttl uint64) (*Response, error) { + return c.put(key, "", ttl, options{ + "prevExist": true, + }) +} + +// UpdateDir creates a directory under the given key. It succeeds only if +// the given key does not yet exist. +func (c *Client) CreateDir(key string, ttl uint64) (*Response, error) { + return c.put(key, "", ttl, options{ + "prevExist": false, + }) +} + +// Set sets the given key to the given value. +func (c *Client) Set(key string, value string, ttl uint64) (*Response, error) { + return c.put(key, value, ttl, nil) +} + +// Update updates the given key to the given value. It succeeds only if the +// given key already exists. +func (c *Client) Update(key string, value string, ttl uint64) (*Response, error) { + return c.put(key, value, ttl, options{ + "prevExist": true, + }) +} + +// Create creates a file with the given value under the given key. It succeeds +// only if the given key does not yet exist. +func (c *Client) Create(key string, value string, ttl uint64) (*Response, error) { + return c.put(key, value, ttl, options{ + "prevExist": false, + }) +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/set_update_create_test.go b/third_party/github.com/coreos/go-etcd/etcd/set_update_create_test.go new file mode 100644 index 0000000000000000000000000000000000000000..6f27fdfa6e1c41d27c4ae4862579875c0a13e2ff --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/set_update_create_test.go @@ -0,0 +1,183 @@ +package etcd + +import ( + "testing" +) + +func TestSet(t *testing.T) { + c := NewClient(nil) + defer func() { + c.DeleteAll("foo") + }() + + resp, err := c.Set("foo", "bar", 5) + if err != nil { + t.Fatal(err) + } + if resp.Key != "/foo" || resp.Value != "bar" || resp.TTL != 5 { + t.Fatalf("Set 1 failed: %#v", resp) + } + + resp, err = c.Set("foo", "bar2", 5) + if err != nil { + t.Fatal(err) + } + if !(resp.Key == "/foo" && resp.Value == "bar2" && + resp.PrevValue == "bar" && resp.TTL == 5) { + t.Fatalf("Set 2 failed: %#v", resp) + } +} + +func TestUpdate(t *testing.T) { + c := NewClient(nil) + defer func() { + c.DeleteAll("foo") + c.DeleteAll("nonexistent") + }() + + resp, err := c.Set("foo", "bar", 5) + t.Logf("%#v", resp) + if err != nil { + t.Fatal(err) + } + + // This should succeed. + resp, err = c.Update("foo", "wakawaka", 5) + if err != nil { + t.Fatal(err) + } + + if !(resp.Action == "update" && resp.Key == "/foo" && + resp.PrevValue == "bar" && resp.TTL == 5) { + t.Fatalf("Update 1 failed: %#v", resp) + } + + // This should fail because the key does not exist. + resp, err = c.Update("nonexistent", "whatever", 5) + if err == nil { + t.Fatalf("The key %v did not exist, so the update should have failed."+ + "The response was: %#v", resp.Key, resp) + } +} + +func TestCreate(t *testing.T) { + c := NewClient(nil) + defer func() { + c.DeleteAll("newKey") + }() + + newKey := "/newKey" + newValue := "/newValue" + + // This should succeed + resp, err := c.Create(newKey, newValue, 5) + if err != nil { + t.Fatal(err) + } + + if !(resp.Action == "create" && resp.Key == newKey && + resp.Value == newValue && resp.PrevValue == "" && resp.TTL == 5) { + t.Fatalf("Create 1 failed: %#v", resp) + } + + // This should fail, because the key is already there + resp, err = c.Create(newKey, newValue, 5) + if err == nil { + t.Fatalf("The key %v did exist, so the creation should have failed."+ + "The response was: %#v", resp.Key, resp) + } +} + +func TestSetDir(t *testing.T) { + c := NewClient(nil) + defer func() { + c.DeleteAll("foo") + c.DeleteAll("fooDir") + }() + + resp, err := c.SetDir("fooDir", 5) + if err != nil { + t.Fatal(err) + } + if !(resp.Key == "/fooDir" && resp.Value == "" && resp.TTL == 5) { + t.Fatalf("SetDir 1 failed: %#v", resp) + } + + // This should fail because /fooDir already points to a directory + resp, err = c.SetDir("/fooDir", 5) + if err == nil { + t.Fatalf("fooDir already points to a directory, so SetDir should have failed."+ + "The response was: %#v", resp) + } + + _, err = c.Set("foo", "bar", 5) + if err != nil { + t.Fatal(err) + } + + // This should succeed + resp, err = c.SetDir("foo", 5) + if err != nil { + t.Fatal(err) + } + if !(resp.Key == "/foo" && resp.Value == "" && + resp.PrevValue == "bar" && resp.TTL == 5) { + t.Fatalf("SetDir 2 failed: %#v", resp) + } +} + +func TestUpdateDir(t *testing.T) { + c := NewClient(nil) + defer func() { + c.DeleteAll("fooDir") + }() + + resp, err := c.SetDir("fooDir", 5) + t.Logf("%#v", resp) + if err != nil { + t.Fatal(err) + } + + // This should succeed. + resp, err = c.UpdateDir("fooDir", 5) + if err != nil { + t.Fatal(err) + } + + if !(resp.Action == "update" && resp.Key == "/fooDir" && + resp.Value == "" && resp.PrevValue == "" && resp.TTL == 5) { + t.Fatalf("UpdateDir 1 failed: %#v", resp) + } + + // This should fail because the key does not exist. + resp, err = c.UpdateDir("nonexistentDir", 5) + if err == nil { + t.Fatalf("The key %v did not exist, so the update should have failed."+ + "The response was: %#v", resp.Key, resp) + } +} + +func TestCreateDir(t *testing.T) { + c := NewClient(nil) + defer func() { + c.DeleteAll("fooDir") + }() + + // This should succeed + resp, err := c.CreateDir("fooDir", 5) + if err != nil { + t.Fatal(err) + } + + if !(resp.Action == "create" && resp.Key == "/fooDir" && + resp.Value == "" && resp.PrevValue == "" && resp.TTL == 5) { + t.Fatalf("CreateDir 1 failed: %#v", resp) + } + + // This should fail, because the key is already there + resp, err = c.CreateDir("fooDir", 5) + if err == nil { + t.Fatalf("The key %v did exist, so the creation should have failed."+ + "The response was: %#v", resp.Key, resp) + } +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/utils.go b/third_party/github.com/coreos/go-etcd/etcd/utils.go new file mode 100644 index 0000000000000000000000000000000000000000..eb2f6046fc53c33c4829d8057a9d96994600fee6 --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/utils.go @@ -0,0 +1,33 @@ +// Utility functions + +package etcd + +import ( + "fmt" + "net/url" + "reflect" +) + +// Convert options to a string of HTML parameters +func optionsToString(options options, vops validOptions) (string, error) { + p := "?" + v := url.Values{} + for opKey, opVal := range options { + // Check if the given option is valid (that it exists) + kind := vops[opKey] + if kind == reflect.Invalid { + return "", fmt.Errorf("Invalid option: %v", opKey) + } + + // Check if the given option is of the valid type + t := reflect.TypeOf(opVal) + if kind != t.Kind() { + return "", fmt.Errorf("Option %s should be of %v kind, not of %v kind.", + opKey, kind, t.Kind()) + } + + v.Set(opKey, fmt.Sprintf("%v", opVal)) + } + p += v.Encode() + return p, nil +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/version.go b/third_party/github.com/coreos/go-etcd/etcd/version.go new file mode 100644 index 0000000000000000000000000000000000000000..b3d05df70bc24bd4388c3ee405dc6977f243bf5a --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/version.go @@ -0,0 +1,3 @@ +package etcd + +const version = "v2" diff --git a/third_party/github.com/coreos/go-etcd/etcd/watch.go b/third_party/github.com/coreos/go-etcd/etcd/watch.go new file mode 100644 index 0000000000000000000000000000000000000000..bbce2039ba97bc82fda0500698e479cdf3f56aa2 --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/watch.go @@ -0,0 +1,90 @@ +package etcd + +import ( + "errors" +) + +// Errors introduced by the Watch command. +var ( + ErrWatchStoppedByUser = errors.New("Watch stopped by the user via stop channel") +) + +// WatchAll returns the first change under the given prefix since the given index. To +// watch for the latest change, set waitIndex = 0. +// +// If the prefix points to a directory, any change under it, including all child directories, +// will be returned. +// +// If a receiver channel is given, it will be a long-term watch. Watch will block at the +// channel. And after someone receive the channel, it will go on to watch that prefix. +// If a stop channel is given, client can close long-term watch using the stop channel +func (c *Client) WatchAll(prefix string, waitIndex uint64, receiver chan *Response, stop chan bool) (*Response, error) { + return c.watch(prefix, waitIndex, true, receiver, stop) +} + +// Watch returns the first change to the given key since the given index. To +// watch for the latest change, set waitIndex = 0. +// +// If a receiver channel is given, it will be a long-term watch. Watch will block at the +// channel. And after someone receive the channel, it will go on to watch that +// prefix. If a stop channel is given, client can close long-term watch using +// the stop channel +func (c *Client) Watch(key string, waitIndex uint64, receiver chan *Response, stop chan bool) (*Response, error) { + return c.watch(key, waitIndex, false, receiver, stop) +} + +func (c *Client) watch(prefix string, waitIndex uint64, recursive bool, receiver chan *Response, stop chan bool) (*Response, error) { + logger.Debugf("watch %s [%s]", prefix, c.cluster.Leader) + if receiver == nil { + return c.watchOnce(prefix, waitIndex, recursive, stop) + } else { + for { + resp, err := c.watchOnce(prefix, waitIndex, recursive, stop) + if resp != nil { + waitIndex = resp.ModifiedIndex + 1 + receiver <- resp + } else { + return nil, err + } + } + } + + return nil, nil +} + +// helper func +// return when there is change under the given prefix +func (c *Client) watchOnce(key string, waitIndex uint64, recursive bool, stop chan bool) (*Response, error) { + + respChan := make(chan *Response) + errChan := make(chan error) + + go func() { + options := options{ + "wait": true, + } + if waitIndex > 0 { + options["waitIndex"] = waitIndex + } + if recursive { + options["recursive"] = true + } + + resp, err := c.get(key, options) + + if err != nil { + errChan <- err + } + + respChan <- resp + }() + + select { + case resp := <-respChan: + return resp, nil + case err := <-errChan: + return nil, err + case <-stop: + return nil, ErrWatchStoppedByUser + } +} diff --git a/third_party/github.com/coreos/go-etcd/etcd/watch_test.go b/third_party/github.com/coreos/go-etcd/etcd/watch_test.go new file mode 100644 index 0000000000000000000000000000000000000000..10fc2b6b5b230ffe540d830715bcdd9901eb78cb --- /dev/null +++ b/third_party/github.com/coreos/go-etcd/etcd/watch_test.go @@ -0,0 +1,106 @@ +package etcd + +import ( + "fmt" + "testing" + "time" +) + +func TestWatch(t *testing.T) { + c := NewClient(nil) + defer func() { + c.DeleteAll("watch_foo") + }() + + go setHelper("watch_foo", "bar", c) + + resp, err := c.Watch("watch_foo", 0, nil, nil) + if err != nil { + t.Fatal(err) + } + if !(resp.Key == "/watch_foo" && resp.Value == "bar") { + t.Fatalf("Watch 1 failed: %#v", resp) + } + + go setHelper("watch_foo", "bar", c) + + resp, err = c.Watch("watch_foo", resp.ModifiedIndex, nil, nil) + if err != nil { + t.Fatal(err) + } + if !(resp.Key == "/watch_foo" && resp.Value == "bar") { + t.Fatalf("Watch 2 failed: %#v", resp) + } + + ch := make(chan *Response, 10) + stop := make(chan bool, 1) + + go setLoop("watch_foo", "bar", c) + + go receiver(ch, stop) + + _, err = c.Watch("watch_foo", 0, ch, stop) + if err != ErrWatchStoppedByUser { + t.Fatalf("Watch returned a non-user stop error") + } +} + +func TestWatchAll(t *testing.T) { + c := NewClient(nil) + defer func() { + c.DeleteAll("watch_foo") + }() + + go setHelper("watch_foo/foo", "bar", c) + + resp, err := c.WatchAll("watch_foo", 0, nil, nil) + if err != nil { + t.Fatal(err) + } + if !(resp.Key == "/watch_foo/foo" && resp.Value == "bar") { + t.Fatalf("WatchAll 1 failed: %#v", resp) + } + + go setHelper("watch_foo/foo", "bar", c) + + resp, err = c.WatchAll("watch_foo", resp.ModifiedIndex, nil, nil) + if err != nil { + t.Fatal(err) + } + if !(resp.Key == "/watch_foo/foo" && resp.Value == "bar") { + t.Fatalf("WatchAll 2 failed: %#v", resp) + } + + ch := make(chan *Response, 10) + stop := make(chan bool, 1) + + go setLoop("watch_foo/foo", "bar", c) + + go receiver(ch, stop) + + _, err = c.WatchAll("watch_foo", 0, ch, stop) + if err != ErrWatchStoppedByUser { + t.Fatalf("Watch returned a non-user stop error") + } +} + +func setHelper(key, value string, c *Client) { + time.Sleep(time.Second) + c.Set(key, value, 100) +} + +func setLoop(key, value string, c *Client) { + time.Sleep(time.Second) + for i := 0; i < 10; i++ { + newValue := fmt.Sprintf("%s_%v", value, i) + c.Set(key, newValue, 100) + time.Sleep(time.Second / 10) + } +} + +func receiver(c chan *Response, stop chan bool) { + for i := 0; i < 10; i++ { + <-c + } + stop <- true +}