Skip to content
Snippets Groups Projects
Commit 00cf11ed authored by ale's avatar ale
Browse files

Merge branch 'master' into debian

parents 0f8c3704 15e77d20
Branches
No related tags found
No related merge requests found
......@@ -7,6 +7,8 @@ import (
"encoding/json"
"errors"
"strings"
"sync"
"time"
"github.com/coreos/go-etcd/etcd"
)
......@@ -36,13 +38,50 @@ func mountPath(mountName string) string {
return mountPrefix + mountName[1:]
}
// Cache the list of active nodes.
type nodesCache struct {
ttl time.Duration
nodes []string
deadline time.Time
lock sync.Mutex
}
type getNodesFunc func() ([]string, error)
func newNodesCache() *nodesCache {
return &nodesCache{
ttl: 500 * time.Millisecond,
}
}
// Get returns the cached value of 'fn', if valid. If the value is
// expired and we get an error from 'fn', we will attempt to return
// the previously cached value anyway, along with the error: the
// caller can then pick the right failure behavior.
func (nc *nodesCache) Get(fn getNodesFunc) ([]string, error) {
nc.lock.Lock()
defer nc.lock.Unlock()
var err error
now := time.Now()
if now.After(nc.deadline) {
var nodes []string
if nodes, err = fn(); err == nil {
nc.nodes = nodes
nc.deadline = now.Add(nc.ttl)
}
}
return nc.nodes, err
}
// RadioAPI is the actual API to the streaming cluster's database.
type RadioAPI struct {
client *etcd.Client
client *etcd.Client
activeNodesCache *nodesCache
}
func NewRadioAPI(client *etcd.Client) *RadioAPI {
return &RadioAPI{client}
return &RadioAPI{client, newNodesCache()}
}
// GetMount returns data on a specific mountpoint (returns nil if not
......@@ -97,8 +136,20 @@ func (r *RadioAPI) ListMounts() ([]*Mount, error) {
return result, nil
}
// GetMasterAddr returns the address of the current master server.
func (r *RadioAPI) GetMasterAddr() (string, error) {
response, err := r.client.Get(masterElectionPath)
if err != nil {
return "", err
}
if len(response) < 1 {
return "", errors.New("no active master")
}
return response[0].Value, nil
}
// GetNodes returns the list of active cluster nodes.
func (r *RadioAPI) GetNodes() ([]string, error) {
func (r *RadioAPI) doGetNodes() ([]string, error) {
response, err := r.client.Get(nodePrefix)
if err != nil {
return nil, err
......@@ -110,16 +161,8 @@ func (r *RadioAPI) GetNodes() ([]string, error) {
return result, nil
}
// GetMasterAddr returns the address of the current master server.
func (r *RadioAPI) GetMasterAddr() (string, error) {
response, err := r.client.Get(masterElectionPath)
if err != nil {
return "", err
}
if len(response) < 1 {
return "", errors.New("no active master")
}
return response[0].Value, nil
func (r *RadioAPI) GetNodes() ([]string, error) {
return r.activeNodesCache.Get(r.doGetNodes)
}
// GeneratePassword returns a new random password.
......
......@@ -40,6 +40,7 @@ func main() {
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
}
log.Printf("starting HTTP server on %s/tcp", httpServer.Addr)
log.Fatal(httpServer.ListenAndServe())
}
......@@ -5,7 +5,6 @@ import (
"log"
"math/rand"
"net"
"strconv"
"strings"
"time"
......@@ -27,9 +26,7 @@ var (
)
type DnsRedirector struct {
client *RadioAPI
nodeCache *activeNodesCache
client *RadioAPI
origin string
originNumParts int
publicIp string
......@@ -42,17 +39,25 @@ func NewDnsRedirector(client *RadioAPI, origin, publicIp string, ttl int) *DnsRe
origin += "."
}
// Create a SOA record for the zone.
serialNo := strconv.FormatInt(time.Now().Unix(), 10)
soaRec := fmt.Sprintf("%s %d IN SOA localhost. hostmaster.%s %s 43200 3600 2419200 %d", origin, ttl, origin, serialNo, ttl)
soa, err := dns.NewRR(soaRec)
if err != nil {
log.Fatalf("Could not generate SOA record: %s", err)
// Create a SOA record for the zone. Some entries will be bogus.
soa := &dns.SOA{
Hdr: dns.RR_Header{
Name: origin,
Rrtype: dns.TypeSOA,
Class: dns.ClassINET,
Ttl: 43200,
},
Ns: "ns." + origin,
Mbox: "hostmaster." + origin,
Serial: uint32(time.Now().Unix()),
Refresh: 43200,
Retry: 3600,
Expire: uint32(ttl),
Minttl: uint32(ttl),
}
return &DnsRedirector{
client: client,
nodeCache: newActiveNodesCache(client),
origin: origin,
originNumParts: len(dns.SplitDomainName(origin)),
publicIp: publicIp,
......@@ -92,16 +97,22 @@ func ednsFromRequest(req, m *dns.Msg) {
}
// Create an A RR for a specific IP.
func recordForIp(name string, ttl int, ip string) *dns.A {
rec := new(dns.A)
rec.Hdr = dns.RR_Header{
Name: name,
Rrtype: dns.TypeA,
Class: dns.ClassINET,
Ttl: uint32(ttl),
func (d *DnsRedirector) recordForIp(name string, ip string) *dns.A {
var fqdn string
if name == "" {
fqdn = d.origin
} else {
fqdn = name + "." + d.origin
}
return &dns.A{
Hdr: dns.RR_Header{
Name: fqdn,
Rrtype: dns.TypeA,
Class: dns.ClassINET,
Ttl: uint32(d.ttl),
},
A: net.ParseIP(ip),
}
rec.A = net.ParseIP(ip)
return rec
}
func (d *DnsRedirector) getQuestionName(req *dns.Msg) string {
......@@ -122,7 +133,7 @@ func (d *DnsRedirector) serveDNS(w dns.ResponseWriter, req *dns.Msg) {
}
query := d.getQuestionName(req)
log.Printf("[zone %s] incoming %s %s %d from %s\n", d.origin, query, dns.TypeToString[req.Question[0].Qtype], req.MsgHdr.Id, w.RemoteAddr())
var responseMsg string
switch {
case query == "" && req.Question[0].Qtype == dns.TypeSOA:
......@@ -130,16 +141,18 @@ func (d *DnsRedirector) serveDNS(w dns.ResponseWriter, req *dns.Msg) {
m.SetReply(req)
m.MsgHdr.Authoritative = true
m.Answer = append(m.Answer, d.soa)
responseMsg = "SOA"
case req.Question[0].Qtype == dns.TypeA:
// Return an NXDOMAIN for unknown queries.
if !isValidQuery(query) {
m.SetRcode(req, dns.RcodeNameError)
log.Printf("Query(%s): NXDOMAIN", query)
responseMsg = "NXDOMAIN"
break
}
// Serve all active nodes on every request.
ips := d.nodeCache.GetNodes()
ips, _ := d.client.GetNodes()
if ips == nil || len(ips) == 0 {
// In case of errors retrieving the list of
......@@ -159,12 +172,14 @@ func (d *DnsRedirector) serveDNS(w dns.ResponseWriter, req *dns.Msg) {
m.SetReply(req)
m.MsgHdr.Authoritative = true
for _, ip := range ips {
rec := recordForIp(query, d.ttl, ip)
rec := d.recordForIp(query, ip)
m.Answer = append(m.Answer, rec)
}
log.Printf("Query(%s): %v", query, ips)
responseMsg = fmt.Sprintf("%v", ips)
}
log.Printf("[%d] %s.%s %s (from %s) -> %s", req.MsgHdr.Id, query, d.origin, dns.TypeToString[req.Question[0].Qtype], w.RemoteAddr(), responseMsg)
ednsFromRequest(req, m)
w.WriteMsg(m)
}
......@@ -177,7 +192,7 @@ func (d *DnsRedirector) Run(addr string) {
for _, proto := range []string{"tcp", "udp"} {
go func(proto string) {
server := &dns.Server{Addr: addr, Net: proto}
log.Printf("Starting DNS server on %s/53", proto)
log.Printf("Starting DNS server on %s/%s", addr, proto)
log.Fatal(server.ListenAndServe())
}(proto)
}
......
......@@ -8,41 +8,9 @@ import (
"net/http/httputil"
"strconv"
"strings"
"sync"
"time"
)
// Cache the list of active nodes
type activeNodesCache struct {
client *RadioAPI
nodes []string
deadline time.Time
lock sync.Mutex
}
var activeNodesTtl = 500 * time.Millisecond
func newActiveNodesCache(client *RadioAPI) *activeNodesCache {
return &activeNodesCache{
client: client,
nodes: []string{},
}
}
func (anc *activeNodesCache) GetNodes() []string {
anc.lock.Lock()
defer anc.lock.Unlock()
now := time.Now()
if now.After(anc.deadline) {
if nodes, err := anc.client.GetNodes(); err == nil {
anc.nodes = nodes
anc.deadline = now.Add(activeNodesTtl)
}
}
return anc.nodes
}
// HTTP redirector.
//
// All user-facing traffic reaches the redirector first (this is
......@@ -54,19 +22,17 @@ func (anc *activeNodesCache) GetNodes() []string {
//
type HttpRedirector struct {
client *RadioAPI
nodeCache *activeNodesCache
}
func NewHttpRedirector(client *RadioAPI) *HttpRedirector {
return &HttpRedirector{
client: client,
nodeCache: newActiveNodesCache(client),
}
}
// Return an active node, chosen randomly.
func (h *HttpRedirector) pickActiveNode() string {
nodes := h.nodeCache.GetNodes()
nodes, _ := h.client.GetNodes()
if nodes != nil && len(nodes) > 0 {
return nodes[rand.Intn(len(nodes))]
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment