Commit fa38d911 authored by ale's avatar ale

Add servers for network services

parent 37eb65e6
package main
import (
"context"
"flag"
"log"
"net"
"os"
"os/signal"
"strings"
"syscall"
"time"
"net/http"
_ "net/http/pprof"
"git.autistici.org/ale/autoradio"
"git.autistici.org/ale/autoradio/instrumentation"
"git.autistici.org/ale/autoradio/node"
"git.autistici.org/ale/autoradio/node/icecast"
"git.autistici.org/ale/autoradio/util"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/clientv3/concurrency"
)
var (
name = flag.String("name", shortHostname(), "Name for this node")
publicIPs = util.IPList("ip", "Public IP for this machine (may be specified more than once). If unset, the program will try to resolve the local hostname, or it will fall back to inspecting network devices.")
internalIPs = util.IPList("internal-ip", "Internal IP for this machine (within the cluster), if different from --ip")
netDev = flag.String("interface", "", "Network interface to monitor for utilization. If unset, default to the interface associated with --ip.")
bwLimit = flag.Int("bwlimit", 100, "Bandwidth usage limit (Mbps)")
maxClients = flag.Int("max-clients", 1000, "Maximum number of connected clients")
debugAddr = flag.String("debug-addr", "", "Set to a host:port to enable a HTTP server with debugging information")
name = flag.String("name", shortHostname(), "Name for this node")
publicIPs = util.IPListFlag("public-ip", "Public IP for this machine (may be specified more than once). If unset, the program will try to resolve the local hostname, or it will fall back to inspecting network devices.")
peerIP = util.IPFlag("peer-ip", "Internal IP for this machine (within the cluster), if different from --ip")
httpPort = flag.Int("http-port", 80, "HTTP port")
dnsPort = flag.Int("dns-port", 53, "DNS port")
gossipPort = flag.Int("gossip-port", 2323, "Gossip GRPC port")
bwLimit = flag.Int("bwlimit", 100, "Bandwidth usage limit (Mbps), for load-balancing")
maxClients = flag.Int("max-clients", 1000, "Maximum number of connected clients, for load-balancing")
etcdEndpoints = flag.String("etcd", "http://localhost:2379", "Etcd endpoints (comma-separated list of URLs)")
domain = flag.String("domain", "", "public DNS domain")
lbSpec = flag.String("lb-policy", "listeners_available,listeners_score,weighted", "Load balancing rules specification (see godoc documentation for details)")
nameservers = flag.String("nameservers", "", "Comma-separated list of name servers (not IPs) for the zone specified in --domain")
icecastConfigPath = flag.String("icecast-config", "/etc/icecast/icecast.xml", "Icecast configuration file")
icecastAdminPwPath = flag.String("icecast-pwfile", "/etc/icecast/.admin_pw", "Path to file with Icecast admin password")
sessionTTL = 5
)
func shortHostname() string {
......@@ -39,37 +54,81 @@ func main() {
log.SetFlags(0)
flag.Parse()
if err := util.DetectPublicNetworkParams(publicIPs, *internalIPs, netDev); err != nil {
// Sanity check the configuration.
if *name == "" {
log.Fatal("--name must be set")
}
if *domain == "" {
log.Fatal("--domain must be set")
}
if *nameservers == "" {
log.Fatal("--nameservers must be set")
}
var peerIPs []net.IP
if err := util.DetectPublicNetworkParams(publicIPs, peerIPs, nil); err != nil {
log.Fatal(err)
}
if len(*publicIPs) == 0 {
log.Fatal("--public-ip must be set")
}
if *peerIP == nil {
log.Fatal("--peer-ip must be set")
}
instrumentation.NewCounter("radiod.restarts").Incr()
// Create the etcd client and establish the Session that will
// control the lifecycle of the node.
etcd, err := clientv3.New(clientv3.Config{
Endpoints: strings.Split(*etcdEndpoints, ","),
DialTimeout: 5 * time.Second,
})
if err != nil {
log.Fatalf("failed to connect to etcd: %v", err)
}
defer etcd.Close()
client := autoradio.NewEtcdClient(true)
bwLimitBytes := float64(*bwLimit * 1000000 / 8)
n := node.NewRadioNode(*name, *publicIPs, *internalIPs, *netDev, bwLimitBytes, *maxClients, client)
session, err := concurrency.NewSession(etcd, concurrency.WithTTL(sessionTTL))
if err != nil {
log.Fatalf("could not establish etcd session: %v", err)
}
// Create a top-level Context that can be canceled when the
// program must terminate. This Context controls the lifetime
// of the Node itself, and all the associated background
// goroutines: canceling it immediately terminates all
// outbound requests.
ctx, cancel := context.WithCancel(context.Background())
go func() {
// Oops, the session is gone, stop everything.
<-session.Done()
cancel()
}()
// Set up a clean shutdown function on SIGTERM.
stopch := make(chan os.Signal)
// Set up a clean shutdown function on SIGTERM that will
// cancel the controlling Context.
sigCh := make(chan os.Signal)
go func() {
<-stopch
log.Printf("terminating...")
n.Stop()
<-sigCh
log.Printf("terminating due to signal...")
cancel()
}()
signal.Notify(stopch, syscall.SIGTERM, syscall.SIGINT)
if *debugAddr != "" {
http.Handle("/debug/node", n)
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/" {
http.Redirect(w, r, "/debug/node", 302)
}
http.NotFound(w, r)
})
go func() {
http.ListenAndServe(*debugAddr, nil)
}()
signal.Notify(sigCh, syscall.SIGTERM, syscall.SIGINT)
// Create the Icecast controller.
ice, err := icecast.NewController(ctx, autoradio.IcecastPort, *icecastConfigPath, *icecastAdminPwPath)
if err != nil {
log.Fatalf("could not start Icecast controller: %v", err)
}
// Create and start the local Node.
n, err := node.New(ctx, session, ice, *name, *publicIPs, *peerIP, *gossipPort, *lbSpec)
if err != nil {
log.Fatalf("could not initialize node: %v", err)
}
n.Run()
// Start all the network services.
srv := node.NewServer(n, *domain, strings.Split(*nameservers, ","), *publicIPs, *peerIP, *httpPort, *dnsPort, *gossipPort, autoradio.IcecastPort)
defer srv.Stop()
// Just wait for it to complete.
n.Wait()
}
package node
import (
"context"
"math/rand"
"net"
"strings"
......@@ -17,18 +18,19 @@ const (
maxRecords = 5
)
func NewDNSServer(lb *loadBalancer, origin string, nameservers []string, addr string) dns.Handler {
func newDNSHandler(n *Node, origin string, nameservers []string) dns.Handler {
if !strings.HasSuffix(origin, ".") {
origin += "."
}
dnssrv := newDNS(lb, origin, nameservers)
dnsz := newDNSZone(n.lb, origin, nameservers)
mux := dns.NewServeMux()
mux.Handle(origin, dnssrv)
mux.Handle(origin, dnsz)
return mux
}
type dnsServer struct {
// Serve DNS records for our zone.
type dnsZone struct {
lb *loadBalancer
soa *dns.SOA
nameservers []string
......@@ -36,7 +38,7 @@ type dnsServer struct {
originNumParts int
}
func newDNS(lb *loadBalancer, origin string, nameservers []string) *dnsServer {
func newDNSZone(lb *loadBalancer, origin string, nameservers []string) *dnsZone {
// Create a SOA record for the zone. Some entries will be bogus.
soa := &dns.SOA{
Hdr: dns.RR_Header{
......@@ -54,7 +56,7 @@ func newDNS(lb *loadBalancer, origin string, nameservers []string) *dnsServer {
Minttl: uint32(zoneTTL),
}
return &dnsServer{
return &dnsZone{
lb: lb,
soa: soa,
nameservers: nameservers,
......@@ -63,7 +65,7 @@ func newDNS(lb *loadBalancer, origin string, nameservers []string) *dnsServer {
}
}
func (d *dnsServer) ServeDNS(w dns.ResponseWriter, req *dns.Msg) {
func (d *dnsZone) ServeDNS(w dns.ResponseWriter, req *dns.Msg) {
m := new(dns.Msg)
ednsFromRequest(req, m)
......@@ -124,7 +126,7 @@ nxDomain:
w.WriteMsg(m) //nolint
}
func (d *dnsServer) getNodeIPs(q dns.Question) []net.IP {
func (d *dnsZone) getNodeIPs(q dns.Question) []net.IP {
// Pick all known endpoint IPs, filtering those that match the
// protocol in the DNS request.
var ips []net.IP
......@@ -147,14 +149,14 @@ func (d *dnsServer) getNodeIPs(q dns.Question) []net.IP {
}
// Strip the origin from the query.
func (d *dnsServer) getQuestionName(q dns.Question) string {
func (d *dnsZone) getQuestionName(q dns.Question) string {
lx := dns.SplitDomainName(q.Name)
ql := lx[0 : len(lx)-d.originNumParts]
return strings.ToLower(strings.Join(ql, "."))
}
// Add the origin to a query.
func (d *dnsServer) withOrigin(name string) string {
func (d *dnsZone) withOrigin(name string) string {
if name == "" {
return d.origin
}
......@@ -162,7 +164,7 @@ func (d *dnsServer) withOrigin(name string) string {
}
// Create an A resource record.
func (d *dnsServer) newA(name string, ip net.IP) dns.RR {
func (d *dnsZone) newA(name string, ip net.IP) dns.RR {
return &dns.A{
Hdr: dns.RR_Header{
Name: d.withOrigin(name),
......@@ -175,7 +177,7 @@ func (d *dnsServer) newA(name string, ip net.IP) dns.RR {
}
// Create an AAAA resource record.
func (d *dnsServer) newAAAA(name string, ip net.IP) dns.RR {
func (d *dnsZone) newAAAA(name string, ip net.IP) dns.RR {
return &dns.AAAA{
Hdr: dns.RR_Header{
Name: d.withOrigin(name),
......@@ -197,3 +199,26 @@ func ednsFromRequest(req, m *dns.Msg) {
}
}
}
// Wrapper to make the dns.Server match the genericServer interface.
type dnsServer struct {
*dns.Server
}
func newDNSServer(addr, proto string, h dns.Handler) *dnsServer {
return &dnsServer{&dns.Server{
Addr: addr,
Net: proto,
Handler: h,
}}
}
func (s *dnsServer) Serve() error {
return s.Server.ListenAndServe()
}
func (s *dnsServer) Stop() {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
s.Server.ShutdownContext(ctx) //nolint
cancel()
}
......@@ -4,13 +4,17 @@ package node
import (
"bytes"
"context"
"flag"
"fmt"
"io"
"log"
"net"
"net/http"
"net/url"
"strconv"
"strings"
"time"
"git.autistici.org/ale/autoradio"
pb "git.autistici.org/ale/autoradio/proto"
......@@ -18,7 +22,13 @@ import (
assetfs "github.com/elazarl/go-bindata-assetfs"
)
func NewHTTP(n *Node, lb *loadBalancer, icecastPort int, domain string) http.Handler {
var (
disableDebugHandlers = flag.Bool("http-disable-debug", false, "disable HTTP /debug handlers")
restrictDebugHandlers = flag.Bool("http-restrict-debug", false, "restrict access to /debug from localhost only")
)
// Build the HTTP handler for the public HTTP endpoint.
func newHTTPHandler(n *Node, icecastPort int, domain string) http.Handler {
mux := http.NewServeMux()
tpl := mustParseEmbeddedTemplates()
......@@ -30,6 +40,18 @@ func NewHTTP(n *Node, lb *loadBalancer, icecastPort int, domain string) http.Han
Prefix: "static",
})))
// Serve /debug/ pages using the default HTTP handler
// (packages will automatically register their debug handlers
// there). Using command-line flags it is possible to disable
// the default debug pages, or to restrict access to localhost.
if !*disableDebugHandlers {
var h http.Handler = http.DefaultServeMux
if *restrictDebugHandlers {
h = withLocalhost(h)
}
mux.Handle("/debug/", h)
}
// Requests for /_stream/ go straight to the local Icecast.
proxyHandler := http.StripPrefix(autoradio.IcecastMountPrefix,
withMount(n, func(m *pb.Mount, w http.ResponseWriter, r *http.Request) {
......@@ -204,3 +226,49 @@ func mustParseEmbeddedTemplates() *template.Template {
}
return root
}
// Restrict access to localhost.
func withLocalhost(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
host, _, err := net.SplitHostPort(r.RemoteAddr)
if err != nil {
http.Error(w, "Forbidden", http.StatusForbidden)
return
}
if ip := net.ParseIP(host); !ip.IsLoopback() {
http.Error(w, "Forbidden", http.StatusForbidden)
return
}
h.ServeHTTP(w, r)
})
}
// Wrapper to make an http.Server match the genericServer interface.
type httpServer struct {
*http.Server
}
func newHTTPServer(addr string, h http.Handler) *httpServer {
return &httpServer{&http.Server{
Addr: addr,
Handler: h,
ReadTimeout: 10 * time.Second,
ReadHeaderTimeout: 3 * time.Second,
WriteTimeout: 10 * time.Second,
IdleTimeout: 30 * time.Second,
}}
}
func (s *httpServer) Serve() error {
err := s.Server.ListenAndServe()
if err == http.ErrServerClosed {
err = nil
}
return err
}
func (s *httpServer) Stop() {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
s.Server.Shutdown(ctx) //nolint
cancel()
}
......@@ -3,23 +3,19 @@ package node
import (
"context"
"log"
"net"
"time"
"git.autistici.org/ale/autoradio"
"git.autistici.org/ale/autoradio/util"
"git.autistici.org/ale/autoradio/client"
"git.autistici.org/ale/autoradio/coordination/election"
"git.autistici.org/ale/autoradio/coordination/presence"
pb "git.autistici.org/ale/autoradio/proto"
"go.etcd.io/etcd/clientv3"
"git.autistici.org/ale/autoradio/util"
"go.etcd.io/etcd/clientv3/concurrency"
)
var (
statusUpdateInterval = 1 * time.Second
sessionTTL = 5 // seconds
)
var statusUpdateInterval = 1 * time.Second
// Icecast is the interface used to manage the local Icecast daemon.
type Icecast interface {
......@@ -37,76 +33,23 @@ type Node struct {
mounts client.MountConfig
watcher *election.ElectionWatcher
statusMgr *statusManager
updateCh chan struct{}
ice Icecast
publicPeers *presence.EndpointSet
lb *loadBalancer
ctx context.Context
cancel context.CancelFunc
ctx context.Context
}
func (n *Node) updateIcecastThread() {
for range n.updateCh {
elState := n.watcher.State()
if elState.State == election.StateUnknown {
log.Printf("not reloading Icecast because leadership status is unknown")
continue
}
err := n.ice.Update(n.ctx, n.mounts.GetMounts(),
(elState.State == election.StateLeader),
elState.Leader.Addrs[0])
if err != nil {
log.Printf("error reloading Icecast: %v", err)
}
time.Sleep(3 * time.Second)
}
}
func (n *Node) updateIcecast() {
select {
case n.updateCh <- struct{}{}:
default:
}
}
func (n *Node) getStatus() *pb.Status {
iceMounts, iceOk := n.ice.GetStatus()
ns := pb.Status{
Name: n.name,
Timestamp: uint64(time.Now().UTC().UnixNano()),
IcecastOk: iceOk,
IcecastMounts: iceMounts,
BwUsage: 0,
MaxListeners: 1000,
}
return &ns
}
func (n *Node) leaderAddr() string {
leader := n.watcher.State().Leader
if leader == nil || len(leader.Addrs) == 0 {
return ""
}
return leader.Addrs[0]
}
// New returns a new Node.
func New(cli *clientv3.Client, nodeID, publicAddrs, peerAddr string, ice Icecast, lbSpec string) (*Node, error) {
// The top-level etcd Session controls the lifetime of the
// ephemeral etcd keys that store presence- and status-related
// data for this Node.
session, err := concurrency.NewSession(cli, concurrency.WithTTL(sessionTTL))
if err != nil {
return nil, err
}
// Create a top-level Context that can be canceled when Stop
// is called. This Context controls the lifetime of the Node
// itself, and all the associated background goroutines:
// canceling it immediately terminates all outbound requests.
ctx, cancel := context.WithCancel(context.Background())
// New returns a new Node with a controlling Context, scoped to an etcd
// Session.
func New(parentCtx context.Context, session *concurrency.Session, ice Icecast, nodeID string, publicAddrs []net.IP, peerAddr net.IP, gossipPort int, lbSpec string) (*Node, error) {
// Create a sub-Context that can be canceled when Stop is called. This
// Context controls the lifetime of the Node itself, and all the
// associated background goroutines: canceling it immediately
// terminates all outbound requests.
ctx, cancel := context.WithCancel(parentCtx)
go func() {
// Oops, the session is gone, stop everything.
<-session.Done()
......@@ -116,15 +59,14 @@ func New(cli *clientv3.Client, nodeID, publicAddrs, peerAddr string, ice Icecast
n := &Node{
ice: ice,
ctx: ctx,
cancel: cancel,
name: nodeID,
updateCh: make(chan struct{}, 1),
}
// The runtime configuration is just a list of
// mounts. Synchronize it with etcd, wait until it is ready,
// and restart Icecast whenever it changes.
configReady, notify := client.WatchConfig(ctx, cli, &n.mounts)
// The runtime configuration is just a list of mounts. Synchronize it
// with etcd, wait until it is ready, and restart Icecast whenever it
// changes.
configReady, notify := client.WatchConfig(ctx, session.Client(), &n.mounts)
go func() {
for range notify {
n.updateIcecast()
......@@ -132,12 +74,14 @@ func New(cli *clientv3.Client, nodeID, publicAddrs, peerAddr string, ice Icecast
}()
<-configReady
// Register the Icecast endpoints.
_, err = presence.Register(
// Register the Icecast endpoints. First the gossip service, below
// StatusEndpointPrefix with gossipPort, then the public Icecast
// service.
_, err := presence.Register(
ctx,
session,
nodeID,
presence.NewRegistration(autoradio.StatusEndpointPrefix, peerAddr, autoradio.IcecastPort),
presence.NewRegistration(autoradio.StatusEndpointPrefix, []net.IP{peerAddr}, gossipPort),
presence.NewRegistration(autoradio.PublicEndpointPrefix, publicAddrs, autoradio.IcecastPort),
)
if err != nil {
......@@ -146,28 +90,23 @@ func New(cli *clientv3.Client, nodeID, publicAddrs, peerAddr string, ice Icecast
}
// Watch the authoritative list of peer nodes.
n.publicPeers = presence.WatchEndpoints(ctx, cli, autoradio.PublicEndpointPrefix)
// Run a leader election protocol advertising the peer address
// of our Icecast daemon.
icecastEP, err := pb.ParseEndpoint(nodeID, peerAddr, autoradio.IcecastPort)
if err != nil {
cancel()
return nil, err
}
// We participate in the leader election, but then do nothing
// special once we're the leader, just wait for
// cancellation. All the state transitions are managed by the
// election Watcher, which triggers an Icecast update.
n.publicPeers = presence.WatchEndpoints(ctx, session.Client(), autoradio.PublicEndpointPrefix)
// Run a leader election protocol advertising the peer address of our
// Icecast daemon. We participate in the leader election, but then do
// nothing special once we're the leader, just wait for
// cancellation. All the state transitions are managed by the election
// Watcher, which triggers an Icecast update.
icecastEP := pb.NewEndpointWithIPAndPort(nodeID, []net.IP{peerAddr}, autoradio.IcecastPort)
el := election.New(session, autoradio.IcecastElectionPrefix, icecastEP)
go func() {
err := el.Run(ctx, election.WaitForever)
// If the election runner's outer retry loop
// terminates, we are unable to talk to etcd to
// re-acquire leadership. In this case we should abort
// the Node, to prevent ending up in a state where all
// nodes are running but the cluster has no leader.
// If the election runner's outer retry loop terminates, we
// are unable to talk to etcd to re-acquire leadership. In
// this case we should abort the Node, to prevent ending up in
// a state where all nodes are running but the cluster has no
// leader.
if err != nil && err != context.Canceled {
log.Printf("leader election aborted: %v", err)
}
......@@ -182,19 +121,19 @@ func New(cli *clientv3.Client, nodeID, publicAddrs, peerAddr string, ice Icecast
}
}()
// Start the background thread that updates Icecast whenever
// something changes.
// Start the background thread that updates Icecast whenever something
// changes.
go n.updateIcecastThread()
// Start the status reporter that periodically sends our
// status to the frontends.
statusMgr := newStatusManager(ctx, cli)
// Start the status reporter that periodically sends our status to the
// frontends.
n.statusMgr = newStatusManager(ctx, session.Client())
go util.RunCron(ctx, statusUpdateInterval, func(_ context.Context) {
statusMgr.update(n.getStatus())
n.statusMgr.update(n.getStatus())
})
// Create the loadBalancer that runs within the node.
n.lb, err = newLoadBalancer(ctx, n.publicPeers, statusMgr, lbSpec)
n.lb, err = newLoadBalancer(ctx, n.publicPeers, n.statusMgr, lbSpec)
if err != nil {
cancel()
return nil, err
......@@ -203,10 +142,61 @@ func New(cli *clientv3.Client, nodeID, publicAddrs, peerAddr string, ice Icecast
return n, nil
}
func (n *Node) Stop() {
n.cancel()
}
// Wait for the node to terminate.
func (n *Node) Wait() {
<-n.ctx.Done()
}
// This goroutine runs in the background and calls Update on the
// icecast.Controller whenever the updateCh channel is triggered.
func (n *Node) updateIcecastThread() {
for range n.updateCh {
elState := n.watcher.State()
if elState.State == election.StateUnknown {
log.Printf("not reloading Icecast because leadership status is unknown")
continue
}
err := n.ice.Update(n.ctx, n.mounts.GetMounts(),
(elState.State == election.StateLeader),
elState.Leader.Addrs[0])
if err != nil {
log.Printf("error reloading Icecast: %v", err)
}
// Sleeping here prevents us from reloading Icecast too
// quickly when there are consecutive state changes.
time.Sleep(1 * time.Second)
}
}
// Trigger the updateCh in order to reload Icecast.
func (n *Node) updateIcecast() {
select {
case n.updateCh <- struct{}{}:
default:
}
}
// Return the Status protobuf for this node.
func (n *Node) getStatus() *pb.Status {
iceMounts, iceOk := n.ice.GetStatus()
ns := pb.Status{
Name: n.name,
Timestamp: uint64(time.Now().UTC().UnixNano()),
IcecastOk: iceOk,
IcecastMounts: iceMounts,
BwUsage: 0,
MaxListeners: 1000,
}
return &ns
}
// Return the current leader address (if any).
func (n *Node) leaderAddr() string {
leader := n.watcher.State().Leader
if leader == nil || len(leader.Addrs) == 0 {
return ""
}
return leader.Addrs[0]
}
......@@ -4,11 +4,13 @@ import (
"context"
"fmt"
"log"
"net"
"os"
"testing"
"time"
pb "git.autistici.org/ale/autoradio/proto"
"go.etcd.io/etcd/clientv3/concurrency"
"go.etcd.io/etcd/embed"
"go.etcd.io/etcd/etcdserver/api/v3client"
)
......@@ -34,30 +36,35 @@ func TestNode(t *testing.T) {
<-e.Server.ReadyNotify()
cli := v3client.New(e.Server)
session, _ := concurrency.NewSession(cli, concurrency.WithTTL(2))
ctx, cancel := context.WithCancel(context.Background())
var nodes []*Node
for i := 0; i < 2; i++ {
n, err := New(
cli,
fmt.Sprintf("node%d", i+1),