Skip to content
Snippets Groups Projects
Commit 15e77d20 authored by ale's avatar ale
Browse files

move the node cache into the API; various fixes to the DNS server

parent e2473df0
Branches
No related tags found
No related merge requests found
......@@ -7,6 +7,8 @@ import (
"encoding/json"
"errors"
"strings"
"sync"
"time"
"github.com/coreos/go-etcd/etcd"
)
......@@ -36,13 +38,50 @@ func mountPath(mountName string) string {
return mountPrefix + mountName[1:]
}
// Cache the list of active nodes.
type nodesCache struct {
ttl time.Duration
nodes []string
deadline time.Time
lock sync.Mutex
}
type getNodesFunc func() ([]string, error)
func newNodesCache() *nodesCache {
return &nodesCache{
ttl: 500 * time.Millisecond,
}
}
// Get returns the cached value of 'fn', if valid. If the value is
// expired and we get an error from 'fn', we will attempt to return
// the previously cached value anyway, along with the error: the
// caller can then pick the right failure behavior.
func (nc *nodesCache) Get(fn getNodesFunc) ([]string, error) {
nc.lock.Lock()
defer nc.lock.Unlock()
var err error
now := time.Now()
if now.After(nc.deadline) {
var nodes []string
if nodes, err = fn(); err == nil {
nc.nodes = nodes
nc.deadline = now.Add(nc.ttl)
}
}
return nc.nodes, err
}
// RadioAPI is the actual API to the streaming cluster's database.
type RadioAPI struct {
client *etcd.Client
client *etcd.Client
activeNodesCache *nodesCache
}
func NewRadioAPI(client *etcd.Client) *RadioAPI {
return &RadioAPI{client}
return &RadioAPI{client, newNodesCache()}
}
// GetMount returns data on a specific mountpoint (returns nil if not
......@@ -97,8 +136,20 @@ func (r *RadioAPI) ListMounts() ([]*Mount, error) {
return result, nil
}
// GetMasterAddr returns the address of the current master server.
func (r *RadioAPI) GetMasterAddr() (string, error) {
response, err := r.client.Get(masterElectionPath)
if err != nil {
return "", err
}
if len(response) < 1 {
return "", errors.New("no active master")
}
return response[0].Value, nil
}
// GetNodes returns the list of active cluster nodes.
func (r *RadioAPI) GetNodes() ([]string, error) {
func (r *RadioAPI) doGetNodes() ([]string, error) {
response, err := r.client.Get(nodePrefix)
if err != nil {
return nil, err
......@@ -110,16 +161,8 @@ func (r *RadioAPI) GetNodes() ([]string, error) {
return result, nil
}
// GetMasterAddr returns the address of the current master server.
func (r *RadioAPI) GetMasterAddr() (string, error) {
response, err := r.client.Get(masterElectionPath)
if err != nil {
return "", err
}
if len(response) < 1 {
return "", errors.New("no active master")
}
return response[0].Value, nil
func (r *RadioAPI) GetNodes() ([]string, error) {
return r.activeNodesCache.Get(r.doGetNodes)
}
// GeneratePassword returns a new random password.
......
......@@ -40,6 +40,7 @@ func main() {
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
}
log.Printf("starting HTTP server on %s/tcp", httpServer.Addr)
log.Fatal(httpServer.ListenAndServe())
}
......@@ -5,7 +5,6 @@ import (
"log"
"math/rand"
"net"
"strconv"
"strings"
"time"
......@@ -27,9 +26,7 @@ var (
)
type DnsRedirector struct {
client *RadioAPI
nodeCache *activeNodesCache
client *RadioAPI
origin string
originNumParts int
publicIp string
......@@ -42,17 +39,25 @@ func NewDnsRedirector(client *RadioAPI, origin, publicIp string, ttl int) *DnsRe
origin += "."
}
// Create a SOA record for the zone.
serialNo := strconv.FormatInt(time.Now().Unix(), 10)
soaRec := fmt.Sprintf("%s %d IN SOA localhost. hostmaster.%s %s 43200 3600 2419200 %d", origin, ttl, origin, serialNo, ttl)
soa, err := dns.NewRR(soaRec)
if err != nil {
log.Fatalf("Could not generate SOA record: %s", err)
// Create a SOA record for the zone. Some entries will be bogus.
soa := &dns.SOA{
Hdr: dns.RR_Header{
Name: origin,
Rrtype: dns.TypeSOA,
Class: dns.ClassINET,
Ttl: 43200,
},
Ns: "ns." + origin,
Mbox: "hostmaster." + origin,
Serial: uint32(time.Now().Unix()),
Refresh: 43200,
Retry: 3600,
Expire: uint32(ttl),
Minttl: uint32(ttl),
}
return &DnsRedirector{
client: client,
nodeCache: newActiveNodesCache(client),
origin: origin,
originNumParts: len(dns.SplitDomainName(origin)),
publicIp: publicIp,
......@@ -92,16 +97,22 @@ func ednsFromRequest(req, m *dns.Msg) {
}
// Create an A RR for a specific IP.
func recordForIp(name string, ttl int, ip string) *dns.A {
rec := new(dns.A)
rec.Hdr = dns.RR_Header{
Name: name,
Rrtype: dns.TypeA,
Class: dns.ClassINET,
Ttl: uint32(ttl),
func (d *DnsRedirector) recordForIp(name string, ip string) *dns.A {
var fqdn string
if name == "" {
fqdn = d.origin
} else {
fqdn = name + "." + d.origin
}
return &dns.A{
Hdr: dns.RR_Header{
Name: fqdn,
Rrtype: dns.TypeA,
Class: dns.ClassINET,
Ttl: uint32(d.ttl),
},
A: net.ParseIP(ip),
}
rec.A = net.ParseIP(ip)
return rec
}
func (d *DnsRedirector) getQuestionName(req *dns.Msg) string {
......@@ -122,7 +133,7 @@ func (d *DnsRedirector) serveDNS(w dns.ResponseWriter, req *dns.Msg) {
}
query := d.getQuestionName(req)
log.Printf("[zone %s] incoming %s %s %d from %s\n", d.origin, query, dns.TypeToString[req.Question[0].Qtype], req.MsgHdr.Id, w.RemoteAddr())
var responseMsg string
switch {
case query == "" && req.Question[0].Qtype == dns.TypeSOA:
......@@ -130,16 +141,18 @@ func (d *DnsRedirector) serveDNS(w dns.ResponseWriter, req *dns.Msg) {
m.SetReply(req)
m.MsgHdr.Authoritative = true
m.Answer = append(m.Answer, d.soa)
responseMsg = "SOA"
case req.Question[0].Qtype == dns.TypeA:
// Return an NXDOMAIN for unknown queries.
if !isValidQuery(query) {
m.SetRcode(req, dns.RcodeNameError)
log.Printf("Query(%s): NXDOMAIN", query)
responseMsg = "NXDOMAIN"
break
}
// Serve all active nodes on every request.
ips := d.nodeCache.GetNodes()
ips, _ := d.client.GetNodes()
if ips == nil || len(ips) == 0 {
// In case of errors retrieving the list of
......@@ -159,12 +172,14 @@ func (d *DnsRedirector) serveDNS(w dns.ResponseWriter, req *dns.Msg) {
m.SetReply(req)
m.MsgHdr.Authoritative = true
for _, ip := range ips {
rec := recordForIp(query, d.ttl, ip)
rec := d.recordForIp(query, ip)
m.Answer = append(m.Answer, rec)
}
log.Printf("Query(%s): %v", query, ips)
responseMsg = fmt.Sprintf("%v", ips)
}
log.Printf("[%d] %s.%s %s (from %s) -> %s", req.MsgHdr.Id, query, d.origin, dns.TypeToString[req.Question[0].Qtype], w.RemoteAddr(), responseMsg)
ednsFromRequest(req, m)
w.WriteMsg(m)
}
......@@ -177,7 +192,7 @@ func (d *DnsRedirector) Run(addr string) {
for _, proto := range []string{"tcp", "udp"} {
go func(proto string) {
server := &dns.Server{Addr: addr, Net: proto}
log.Printf("Starting DNS server on %s/53", proto)
log.Printf("Starting DNS server on %s/%s", addr, proto)
log.Fatal(server.ListenAndServe())
}(proto)
}
......
......@@ -8,41 +8,9 @@ import (
"net/http/httputil"
"strconv"
"strings"
"sync"
"time"
)
// Cache the list of active nodes
type activeNodesCache struct {
client *RadioAPI
nodes []string
deadline time.Time
lock sync.Mutex
}
var activeNodesTtl = 500 * time.Millisecond
func newActiveNodesCache(client *RadioAPI) *activeNodesCache {
return &activeNodesCache{
client: client,
nodes: []string{},
}
}
func (anc *activeNodesCache) GetNodes() []string {
anc.lock.Lock()
defer anc.lock.Unlock()
now := time.Now()
if now.After(anc.deadline) {
if nodes, err := anc.client.GetNodes(); err == nil {
anc.nodes = nodes
anc.deadline = now.Add(activeNodesTtl)
}
}
return anc.nodes
}
// HTTP redirector.
//
// All user-facing traffic reaches the redirector first (this is
......@@ -54,19 +22,17 @@ func (anc *activeNodesCache) GetNodes() []string {
//
type HttpRedirector struct {
client *RadioAPI
nodeCache *activeNodesCache
}
func NewHttpRedirector(client *RadioAPI) *HttpRedirector {
return &HttpRedirector{
client: client,
nodeCache: newActiveNodesCache(client),
}
}
// Return an active node, chosen randomly.
func (h *HttpRedirector) pickActiveNode() string {
nodes := h.nodeCache.GetNodes()
nodes, _ := h.client.GetNodes()
if nodes != nil && len(nodes) > 0 {
return nodes[rand.Intn(len(nodes))]
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment