diff --git a/api.go b/api.go index 305d7c21306fbdcddbec0194e777bb132bf2f1ae..c5661075e1ee7c2ec05259c3ac2c967309c963ff 100644 --- a/api.go +++ b/api.go @@ -7,6 +7,8 @@ import ( "encoding/json" "errors" "strings" + "sync" + "time" "github.com/coreos/go-etcd/etcd" ) @@ -36,13 +38,50 @@ func mountPath(mountName string) string { return mountPrefix + mountName[1:] } +// Cache the list of active nodes. +type nodesCache struct { + ttl time.Duration + nodes []string + deadline time.Time + lock sync.Mutex +} + +type getNodesFunc func() ([]string, error) + +func newNodesCache() *nodesCache { + return &nodesCache{ + ttl: 500 * time.Millisecond, + } +} + +// Get returns the cached value of 'fn', if valid. If the value is +// expired and we get an error from 'fn', we will attempt to return +// the previously cached value anyway, along with the error: the +// caller can then pick the right failure behavior. +func (nc *nodesCache) Get(fn getNodesFunc) ([]string, error) { + nc.lock.Lock() + defer nc.lock.Unlock() + + var err error + now := time.Now() + if now.After(nc.deadline) { + var nodes []string + if nodes, err = fn(); err == nil { + nc.nodes = nodes + nc.deadline = now.Add(nc.ttl) + } + } + return nc.nodes, err +} + // RadioAPI is the actual API to the streaming cluster's database. type RadioAPI struct { - client *etcd.Client + client *etcd.Client + activeNodesCache *nodesCache } func NewRadioAPI(client *etcd.Client) *RadioAPI { - return &RadioAPI{client} + return &RadioAPI{client, newNodesCache()} } // GetMount returns data on a specific mountpoint (returns nil if not @@ -97,8 +136,20 @@ func (r *RadioAPI) ListMounts() ([]*Mount, error) { return result, nil } +// GetMasterAddr returns the address of the current master server. +func (r *RadioAPI) GetMasterAddr() (string, error) { + response, err := r.client.Get(masterElectionPath) + if err != nil { + return "", err + } + if len(response) < 1 { + return "", errors.New("no active master") + } + return response[0].Value, nil +} + // GetNodes returns the list of active cluster nodes. -func (r *RadioAPI) GetNodes() ([]string, error) { +func (r *RadioAPI) doGetNodes() ([]string, error) { response, err := r.client.Get(nodePrefix) if err != nil { return nil, err @@ -110,16 +161,8 @@ func (r *RadioAPI) GetNodes() ([]string, error) { return result, nil } -// GetMasterAddr returns the address of the current master server. -func (r *RadioAPI) GetMasterAddr() (string, error) { - response, err := r.client.Get(masterElectionPath) - if err != nil { - return "", err - } - if len(response) < 1 { - return "", errors.New("no active master") - } - return response[0].Value, nil +func (r *RadioAPI) GetNodes() ([]string, error) { + return r.activeNodesCache.Get(r.doGetNodes) } // GeneratePassword returns a new random password. diff --git a/cmd/redirectord/redirectord.go b/cmd/redirectord/redirectord.go index bba08a672cc885e93589008b4c5307a996dc5e55..f910ff89a9668a3e07cfff2cedf888660baf3829 100644 --- a/cmd/redirectord/redirectord.go +++ b/cmd/redirectord/redirectord.go @@ -40,6 +40,7 @@ func main() { ReadTimeout: 10 * time.Second, WriteTimeout: 10 * time.Second, } - + + log.Printf("starting HTTP server on %s/tcp", httpServer.Addr) log.Fatal(httpServer.ListenAndServe()) } diff --git a/dns.go b/dns.go index 2bd5ba1544169cd54886e7ab2d11065b01cb1d10..84c655adddd78f7c4c2cac06b855a114a6e20e1d 100644 --- a/dns.go +++ b/dns.go @@ -5,7 +5,6 @@ import ( "log" "math/rand" "net" - "strconv" "strings" "time" @@ -27,9 +26,7 @@ var ( ) type DnsRedirector struct { - client *RadioAPI - nodeCache *activeNodesCache - + client *RadioAPI origin string originNumParts int publicIp string @@ -42,17 +39,25 @@ func NewDnsRedirector(client *RadioAPI, origin, publicIp string, ttl int) *DnsRe origin += "." } - // Create a SOA record for the zone. - serialNo := strconv.FormatInt(time.Now().Unix(), 10) - soaRec := fmt.Sprintf("%s %d IN SOA localhost. hostmaster.%s %s 43200 3600 2419200 %d", origin, ttl, origin, serialNo, ttl) - soa, err := dns.NewRR(soaRec) - if err != nil { - log.Fatalf("Could not generate SOA record: %s", err) + // Create a SOA record for the zone. Some entries will be bogus. + soa := &dns.SOA{ + Hdr: dns.RR_Header{ + Name: origin, + Rrtype: dns.TypeSOA, + Class: dns.ClassINET, + Ttl: 43200, + }, + Ns: "ns." + origin, + Mbox: "hostmaster." + origin, + Serial: uint32(time.Now().Unix()), + Refresh: 43200, + Retry: 3600, + Expire: uint32(ttl), + Minttl: uint32(ttl), } return &DnsRedirector{ client: client, - nodeCache: newActiveNodesCache(client), origin: origin, originNumParts: len(dns.SplitDomainName(origin)), publicIp: publicIp, @@ -92,16 +97,22 @@ func ednsFromRequest(req, m *dns.Msg) { } // Create an A RR for a specific IP. -func recordForIp(name string, ttl int, ip string) *dns.A { - rec := new(dns.A) - rec.Hdr = dns.RR_Header{ - Name: name, - Rrtype: dns.TypeA, - Class: dns.ClassINET, - Ttl: uint32(ttl), +func (d *DnsRedirector) recordForIp(name string, ip string) *dns.A { + var fqdn string + if name == "" { + fqdn = d.origin + } else { + fqdn = name + "." + d.origin + } + return &dns.A{ + Hdr: dns.RR_Header{ + Name: fqdn, + Rrtype: dns.TypeA, + Class: dns.ClassINET, + Ttl: uint32(d.ttl), + }, + A: net.ParseIP(ip), } - rec.A = net.ParseIP(ip) - return rec } func (d *DnsRedirector) getQuestionName(req *dns.Msg) string { @@ -122,7 +133,7 @@ func (d *DnsRedirector) serveDNS(w dns.ResponseWriter, req *dns.Msg) { } query := d.getQuestionName(req) - log.Printf("[zone %s] incoming %s %s %d from %s\n", d.origin, query, dns.TypeToString[req.Question[0].Qtype], req.MsgHdr.Id, w.RemoteAddr()) + var responseMsg string switch { case query == "" && req.Question[0].Qtype == dns.TypeSOA: @@ -130,16 +141,18 @@ func (d *DnsRedirector) serveDNS(w dns.ResponseWriter, req *dns.Msg) { m.SetReply(req) m.MsgHdr.Authoritative = true m.Answer = append(m.Answer, d.soa) + responseMsg = "SOA" case req.Question[0].Qtype == dns.TypeA: // Return an NXDOMAIN for unknown queries. if !isValidQuery(query) { m.SetRcode(req, dns.RcodeNameError) - log.Printf("Query(%s): NXDOMAIN", query) + responseMsg = "NXDOMAIN" + break } // Serve all active nodes on every request. - ips := d.nodeCache.GetNodes() + ips, _ := d.client.GetNodes() if ips == nil || len(ips) == 0 { // In case of errors retrieving the list of @@ -159,12 +172,14 @@ func (d *DnsRedirector) serveDNS(w dns.ResponseWriter, req *dns.Msg) { m.SetReply(req) m.MsgHdr.Authoritative = true for _, ip := range ips { - rec := recordForIp(query, d.ttl, ip) + rec := d.recordForIp(query, ip) m.Answer = append(m.Answer, rec) } - log.Printf("Query(%s): %v", query, ips) + responseMsg = fmt.Sprintf("%v", ips) } + log.Printf("[%d] %s.%s %s (from %s) -> %s", req.MsgHdr.Id, query, d.origin, dns.TypeToString[req.Question[0].Qtype], w.RemoteAddr(), responseMsg) + ednsFromRequest(req, m) w.WriteMsg(m) } @@ -177,7 +192,7 @@ func (d *DnsRedirector) Run(addr string) { for _, proto := range []string{"tcp", "udp"} { go func(proto string) { server := &dns.Server{Addr: addr, Net: proto} - log.Printf("Starting DNS server on %s/53", proto) + log.Printf("Starting DNS server on %s/%s", addr, proto) log.Fatal(server.ListenAndServe()) }(proto) } diff --git a/http.go b/http.go index 9fe3a01cb3813fc0576bf4ba2a76019ed2daedcd..a2986e7e065a330414acb6f8c590f33e46ceb3f9 100644 --- a/http.go +++ b/http.go @@ -8,41 +8,9 @@ import ( "net/http/httputil" "strconv" "strings" - "sync" "time" ) -// Cache the list of active nodes -type activeNodesCache struct { - client *RadioAPI - nodes []string - deadline time.Time - lock sync.Mutex -} - -var activeNodesTtl = 500 * time.Millisecond - -func newActiveNodesCache(client *RadioAPI) *activeNodesCache { - return &activeNodesCache{ - client: client, - nodes: []string{}, - } -} - -func (anc *activeNodesCache) GetNodes() []string { - anc.lock.Lock() - defer anc.lock.Unlock() - - now := time.Now() - if now.After(anc.deadline) { - if nodes, err := anc.client.GetNodes(); err == nil { - anc.nodes = nodes - anc.deadline = now.Add(activeNodesTtl) - } - } - return anc.nodes -} - // HTTP redirector. // // All user-facing traffic reaches the redirector first (this is @@ -54,19 +22,17 @@ func (anc *activeNodesCache) GetNodes() []string { // type HttpRedirector struct { client *RadioAPI - nodeCache *activeNodesCache } func NewHttpRedirector(client *RadioAPI) *HttpRedirector { return &HttpRedirector{ client: client, - nodeCache: newActiveNodesCache(client), } } // Return an active node, chosen randomly. func (h *HttpRedirector) pickActiveNode() string { - nodes := h.nodeCache.GetNodes() + nodes, _ := h.client.GetNodes() if nodes != nil && len(nodes) > 0 { return nodes[rand.Intn(len(nodes))] }