diff --git a/cmd/radiod/radiod.go b/cmd/radiod/radiod.go index c4443669d519d4288303d3adf3b76810504da76f..580c36109a38c7133a677c81c3cdf7b60dba178c 100644 --- a/cmd/radiod/radiod.go +++ b/cmd/radiod/radiod.go @@ -1,30 +1,45 @@ package main import ( + "context" "flag" "log" + "net" "os" "os/signal" "strings" "syscall" + "time" - "net/http" _ "net/http/pprof" "git.autistici.org/ale/autoradio" - "git.autistici.org/ale/autoradio/instrumentation" "git.autistici.org/ale/autoradio/node" + "git.autistici.org/ale/autoradio/node/icecast" "git.autistici.org/ale/autoradio/util" + "go.etcd.io/etcd/clientv3" + "go.etcd.io/etcd/clientv3/concurrency" ) var ( - name = flag.String("name", shortHostname(), "Name for this node") - publicIPs = util.IPList("ip", "Public IP for this machine (may be specified more than once). If unset, the program will try to resolve the local hostname, or it will fall back to inspecting network devices.") - internalIPs = util.IPList("internal-ip", "Internal IP for this machine (within the cluster), if different from --ip") - netDev = flag.String("interface", "", "Network interface to monitor for utilization. If unset, default to the interface associated with --ip.") - bwLimit = flag.Int("bwlimit", 100, "Bandwidth usage limit (Mbps)") - maxClients = flag.Int("max-clients", 1000, "Maximum number of connected clients") - debugAddr = flag.String("debug-addr", "", "Set to a host:port to enable a HTTP server with debugging information") + name = flag.String("name", shortHostname(), "Name for this node") + publicIPs = util.IPListFlag("public-ip", "Public IP for this machine (may be specified more than once). If unset, the program will try to resolve the local hostname, or it will fall back to inspecting network devices.") + peerIP = util.IPFlag("peer-ip", "Internal IP for this machine (within the cluster), if different from --ip") + httpPort = flag.Int("http-port", 80, "HTTP port") + dnsPort = flag.Int("dns-port", 53, "DNS port") + gossipPort = flag.Int("gossip-port", 2323, "Gossip GRPC port") + bwLimit = flag.Int("bwlimit", 100, "Bandwidth usage limit (Mbps), for load-balancing") + maxClients = flag.Int("max-clients", 1000, "Maximum number of connected clients, for load-balancing") + etcdEndpoints = flag.String("etcd", "http://localhost:2379", "Etcd endpoints (comma-separated list of URLs)") + + domain = flag.String("domain", "", "public DNS domain") + lbSpec = flag.String("lb-policy", "listeners_available,listeners_score,weighted", "Load balancing rules specification (see godoc documentation for details)") + nameservers = flag.String("nameservers", "", "Comma-separated list of name servers (not IPs) for the zone specified in --domain") + + icecastConfigPath = flag.String("icecast-config", "/etc/icecast/icecast.xml", "Icecast configuration file") + icecastAdminPwPath = flag.String("icecast-pwfile", "/etc/icecast/.admin_pw", "Path to file with Icecast admin password") + + sessionTTL = 5 ) func shortHostname() string { @@ -39,37 +54,81 @@ func main() { log.SetFlags(0) flag.Parse() - if err := util.DetectPublicNetworkParams(publicIPs, *internalIPs, netDev); err != nil { + // Sanity check the configuration. + if *name == "" { + log.Fatal("--name must be set") + } + if *domain == "" { + log.Fatal("--domain must be set") + } + if *nameservers == "" { + log.Fatal("--nameservers must be set") + } + var peerIPs []net.IP + if err := util.DetectPublicNetworkParams(publicIPs, peerIPs, nil); err != nil { log.Fatal(err) } + if len(*publicIPs) == 0 { + log.Fatal("--public-ip must be set") + } + if *peerIP == nil { + log.Fatal("--peer-ip must be set") + } - instrumentation.NewCounter("radiod.restarts").Incr() + // Create the etcd client and establish the Session that will + // control the lifecycle of the node. + etcd, err := clientv3.New(clientv3.Config{ + Endpoints: strings.Split(*etcdEndpoints, ","), + DialTimeout: 5 * time.Second, + }) + if err != nil { + log.Fatalf("failed to connect to etcd: %v", err) + } + defer etcd.Close() - client := autoradio.NewEtcdClient(true) - bwLimitBytes := float64(*bwLimit * 1000000 / 8) - n := node.NewRadioNode(*name, *publicIPs, *internalIPs, *netDev, bwLimitBytes, *maxClients, client) + session, err := concurrency.NewSession(etcd, concurrency.WithTTL(sessionTTL)) + if err != nil { + log.Fatalf("could not establish etcd session: %v", err) + } + + // Create a top-level Context that can be canceled when the + // program must terminate. This Context controls the lifetime + // of the Node itself, and all the associated background + // goroutines: canceling it immediately terminates all + // outbound requests. + ctx, cancel := context.WithCancel(context.Background()) + go func() { + // Oops, the session is gone, stop everything. + <-session.Done() + cancel() + }() - // Set up a clean shutdown function on SIGTERM. - stopch := make(chan os.Signal) + // Set up a clean shutdown function on SIGTERM that will + // cancel the controlling Context. + sigCh := make(chan os.Signal) go func() { - <-stopch - log.Printf("terminating...") - n.Stop() + <-sigCh + log.Printf("terminating due to signal...") + cancel() }() - signal.Notify(stopch, syscall.SIGTERM, syscall.SIGINT) - - if *debugAddr != "" { - http.Handle("/debug/node", n) - http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { - if r.URL.Path == "/" { - http.Redirect(w, r, "/debug/node", 302) - } - http.NotFound(w, r) - }) - go func() { - http.ListenAndServe(*debugAddr, nil) - }() + signal.Notify(sigCh, syscall.SIGTERM, syscall.SIGINT) + + // Create the Icecast controller. + ice, err := icecast.NewController(ctx, autoradio.IcecastPort, *icecastConfigPath, *icecastAdminPwPath) + if err != nil { + log.Fatalf("could not start Icecast controller: %v", err) + } + + // Create and start the local Node. + n, err := node.New(ctx, session, ice, *name, *publicIPs, *peerIP, *gossipPort, *lbSpec) + if err != nil { + log.Fatalf("could not initialize node: %v", err) } - n.Run() + // Start all the network services. + srv := node.NewServer(n, *domain, strings.Split(*nameservers, ","), *publicIPs, *peerIP, *httpPort, *dnsPort, *gossipPort, autoradio.IcecastPort) + defer srv.Stop() + + // Just wait for it to complete. + n.Wait() } diff --git a/node/dns.go b/node/dns.go index c9bb8bbca540e9a9169d55a3fed618d1e5d12799..b1f543f2c76cf4820d5e232c79bf832b4fd9be93 100644 --- a/node/dns.go +++ b/node/dns.go @@ -1,6 +1,7 @@ package node import ( + "context" "math/rand" "net" "strings" @@ -17,18 +18,19 @@ const ( maxRecords = 5 ) -func NewDNSServer(lb *loadBalancer, origin string, nameservers []string, addr string) dns.Handler { +func newDNSHandler(n *Node, origin string, nameservers []string) dns.Handler { if !strings.HasSuffix(origin, ".") { origin += "." } - dnssrv := newDNS(lb, origin, nameservers) + dnsz := newDNSZone(n.lb, origin, nameservers) mux := dns.NewServeMux() - mux.Handle(origin, dnssrv) + mux.Handle(origin, dnsz) return mux } -type dnsServer struct { +// Serve DNS records for our zone. +type dnsZone struct { lb *loadBalancer soa *dns.SOA nameservers []string @@ -36,7 +38,7 @@ type dnsServer struct { originNumParts int } -func newDNS(lb *loadBalancer, origin string, nameservers []string) *dnsServer { +func newDNSZone(lb *loadBalancer, origin string, nameservers []string) *dnsZone { // Create a SOA record for the zone. Some entries will be bogus. soa := &dns.SOA{ Hdr: dns.RR_Header{ @@ -54,7 +56,7 @@ func newDNS(lb *loadBalancer, origin string, nameservers []string) *dnsServer { Minttl: uint32(zoneTTL), } - return &dnsServer{ + return &dnsZone{ lb: lb, soa: soa, nameservers: nameservers, @@ -63,7 +65,7 @@ func newDNS(lb *loadBalancer, origin string, nameservers []string) *dnsServer { } } -func (d *dnsServer) ServeDNS(w dns.ResponseWriter, req *dns.Msg) { +func (d *dnsZone) ServeDNS(w dns.ResponseWriter, req *dns.Msg) { m := new(dns.Msg) ednsFromRequest(req, m) @@ -124,7 +126,7 @@ nxDomain: w.WriteMsg(m) //nolint } -func (d *dnsServer) getNodeIPs(q dns.Question) []net.IP { +func (d *dnsZone) getNodeIPs(q dns.Question) []net.IP { // Pick all known endpoint IPs, filtering those that match the // protocol in the DNS request. var ips []net.IP @@ -147,14 +149,14 @@ func (d *dnsServer) getNodeIPs(q dns.Question) []net.IP { } // Strip the origin from the query. -func (d *dnsServer) getQuestionName(q dns.Question) string { +func (d *dnsZone) getQuestionName(q dns.Question) string { lx := dns.SplitDomainName(q.Name) ql := lx[0 : len(lx)-d.originNumParts] return strings.ToLower(strings.Join(ql, ".")) } // Add the origin to a query. -func (d *dnsServer) withOrigin(name string) string { +func (d *dnsZone) withOrigin(name string) string { if name == "" { return d.origin } @@ -162,7 +164,7 @@ func (d *dnsServer) withOrigin(name string) string { } // Create an A resource record. -func (d *dnsServer) newA(name string, ip net.IP) dns.RR { +func (d *dnsZone) newA(name string, ip net.IP) dns.RR { return &dns.A{ Hdr: dns.RR_Header{ Name: d.withOrigin(name), @@ -175,7 +177,7 @@ func (d *dnsServer) newA(name string, ip net.IP) dns.RR { } // Create an AAAA resource record. -func (d *dnsServer) newAAAA(name string, ip net.IP) dns.RR { +func (d *dnsZone) newAAAA(name string, ip net.IP) dns.RR { return &dns.AAAA{ Hdr: dns.RR_Header{ Name: d.withOrigin(name), @@ -197,3 +199,26 @@ func ednsFromRequest(req, m *dns.Msg) { } } } + +// Wrapper to make the dns.Server match the genericServer interface. +type dnsServer struct { + *dns.Server +} + +func newDNSServer(addr, proto string, h dns.Handler) *dnsServer { + return &dnsServer{&dns.Server{ + Addr: addr, + Net: proto, + Handler: h, + }} +} + +func (s *dnsServer) Serve() error { + return s.Server.ListenAndServe() +} + +func (s *dnsServer) Stop() { + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + s.Server.ShutdownContext(ctx) //nolint + cancel() +} diff --git a/node/http.go b/node/http.go index 95861ffdf2a576ec45901f440ba06b69ac0f9e5b..07bdc169d941bf50ecdf56d40c62f92732960906 100644 --- a/node/http.go +++ b/node/http.go @@ -4,13 +4,17 @@ package node import ( "bytes" + "context" + "flag" "fmt" "io" "log" + "net" "net/http" "net/url" "strconv" "strings" + "time" "git.autistici.org/ale/autoradio" pb "git.autistici.org/ale/autoradio/proto" @@ -18,7 +22,13 @@ import ( assetfs "github.com/elazarl/go-bindata-assetfs" ) -func NewHTTP(n *Node, lb *loadBalancer, icecastPort int, domain string) http.Handler { +var ( + disableDebugHandlers = flag.Bool("http-disable-debug", false, "disable HTTP /debug handlers") + restrictDebugHandlers = flag.Bool("http-restrict-debug", false, "restrict access to /debug from localhost only") +) + +// Build the HTTP handler for the public HTTP endpoint. +func newHTTPHandler(n *Node, icecastPort int, domain string) http.Handler { mux := http.NewServeMux() tpl := mustParseEmbeddedTemplates() @@ -30,6 +40,18 @@ func NewHTTP(n *Node, lb *loadBalancer, icecastPort int, domain string) http.Han Prefix: "static", }))) + // Serve /debug/ pages using the default HTTP handler + // (packages will automatically register their debug handlers + // there). Using command-line flags it is possible to disable + // the default debug pages, or to restrict access to localhost. + if !*disableDebugHandlers { + var h http.Handler = http.DefaultServeMux + if *restrictDebugHandlers { + h = withLocalhost(h) + } + mux.Handle("/debug/", h) + } + // Requests for /_stream/ go straight to the local Icecast. proxyHandler := http.StripPrefix(autoradio.IcecastMountPrefix, withMount(n, func(m *pb.Mount, w http.ResponseWriter, r *http.Request) { @@ -204,3 +226,49 @@ func mustParseEmbeddedTemplates() *template.Template { } return root } + +// Restrict access to localhost. +func withLocalhost(h http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + host, _, err := net.SplitHostPort(r.RemoteAddr) + if err != nil { + http.Error(w, "Forbidden", http.StatusForbidden) + return + } + if ip := net.ParseIP(host); !ip.IsLoopback() { + http.Error(w, "Forbidden", http.StatusForbidden) + return + } + h.ServeHTTP(w, r) + }) +} + +// Wrapper to make an http.Server match the genericServer interface. +type httpServer struct { + *http.Server +} + +func newHTTPServer(addr string, h http.Handler) *httpServer { + return &httpServer{&http.Server{ + Addr: addr, + Handler: h, + ReadTimeout: 10 * time.Second, + ReadHeaderTimeout: 3 * time.Second, + WriteTimeout: 10 * time.Second, + IdleTimeout: 30 * time.Second, + }} +} + +func (s *httpServer) Serve() error { + err := s.Server.ListenAndServe() + if err == http.ErrServerClosed { + err = nil + } + return err +} + +func (s *httpServer) Stop() { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + s.Server.Shutdown(ctx) //nolint + cancel() +} diff --git a/node/node.go b/node/node.go index 29260d81d75d35b26e291b5601300ab606f3fd07..7e03dae03ff15a756841170b21c2980c96dde772 100644 --- a/node/node.go +++ b/node/node.go @@ -3,23 +3,19 @@ package node import ( "context" "log" + "net" "time" "git.autistici.org/ale/autoradio" - "git.autistici.org/ale/autoradio/util" "git.autistici.org/ale/autoradio/client" "git.autistici.org/ale/autoradio/coordination/election" "git.autistici.org/ale/autoradio/coordination/presence" pb "git.autistici.org/ale/autoradio/proto" - "go.etcd.io/etcd/clientv3" + "git.autistici.org/ale/autoradio/util" "go.etcd.io/etcd/clientv3/concurrency" ) -var ( - statusUpdateInterval = 1 * time.Second - - sessionTTL = 5 // seconds -) +var statusUpdateInterval = 1 * time.Second // Icecast is the interface used to manage the local Icecast daemon. type Icecast interface { @@ -37,76 +33,23 @@ type Node struct { mounts client.MountConfig watcher *election.ElectionWatcher + statusMgr *statusManager updateCh chan struct{} ice Icecast publicPeers *presence.EndpointSet lb *loadBalancer - ctx context.Context - cancel context.CancelFunc + ctx context.Context } -func (n *Node) updateIcecastThread() { - for range n.updateCh { - elState := n.watcher.State() - if elState.State == election.StateUnknown { - log.Printf("not reloading Icecast because leadership status is unknown") - continue - } - - err := n.ice.Update(n.ctx, n.mounts.GetMounts(), - (elState.State == election.StateLeader), - elState.Leader.Addrs[0]) - if err != nil { - log.Printf("error reloading Icecast: %v", err) - } - time.Sleep(3 * time.Second) - } -} - -func (n *Node) updateIcecast() { - select { - case n.updateCh <- struct{}{}: - default: - } -} - -func (n *Node) getStatus() *pb.Status { - iceMounts, iceOk := n.ice.GetStatus() - ns := pb.Status{ - Name: n.name, - Timestamp: uint64(time.Now().UTC().UnixNano()), - IcecastOk: iceOk, - IcecastMounts: iceMounts, - BwUsage: 0, - MaxListeners: 1000, - } - return &ns -} - -func (n *Node) leaderAddr() string { - leader := n.watcher.State().Leader - if leader == nil || len(leader.Addrs) == 0 { - return "" - } - return leader.Addrs[0] -} - -// New returns a new Node. -func New(cli *clientv3.Client, nodeID, publicAddrs, peerAddr string, ice Icecast, lbSpec string) (*Node, error) { - // The top-level etcd Session controls the lifetime of the - // ephemeral etcd keys that store presence- and status-related - // data for this Node. - session, err := concurrency.NewSession(cli, concurrency.WithTTL(sessionTTL)) - if err != nil { - return nil, err - } - - // Create a top-level Context that can be canceled when Stop - // is called. This Context controls the lifetime of the Node - // itself, and all the associated background goroutines: - // canceling it immediately terminates all outbound requests. - ctx, cancel := context.WithCancel(context.Background()) +// New returns a new Node with a controlling Context, scoped to an etcd +// Session. +func New(parentCtx context.Context, session *concurrency.Session, ice Icecast, nodeID string, publicAddrs []net.IP, peerAddr net.IP, gossipPort int, lbSpec string) (*Node, error) { + // Create a sub-Context that can be canceled when Stop is called. This + // Context controls the lifetime of the Node itself, and all the + // associated background goroutines: canceling it immediately + // terminates all outbound requests. + ctx, cancel := context.WithCancel(parentCtx) go func() { // Oops, the session is gone, stop everything. <-session.Done() @@ -116,15 +59,14 @@ func New(cli *clientv3.Client, nodeID, publicAddrs, peerAddr string, ice Icecast n := &Node{ ice: ice, ctx: ctx, - cancel: cancel, name: nodeID, updateCh: make(chan struct{}, 1), } - // The runtime configuration is just a list of - // mounts. Synchronize it with etcd, wait until it is ready, - // and restart Icecast whenever it changes. - configReady, notify := client.WatchConfig(ctx, cli, &n.mounts) + // The runtime configuration is just a list of mounts. Synchronize it + // with etcd, wait until it is ready, and restart Icecast whenever it + // changes. + configReady, notify := client.WatchConfig(ctx, session.Client(), &n.mounts) go func() { for range notify { n.updateIcecast() @@ -132,12 +74,14 @@ func New(cli *clientv3.Client, nodeID, publicAddrs, peerAddr string, ice Icecast }() <-configReady - // Register the Icecast endpoints. - _, err = presence.Register( + // Register the Icecast endpoints. First the gossip service, below + // StatusEndpointPrefix with gossipPort, then the public Icecast + // service. + _, err := presence.Register( ctx, session, nodeID, - presence.NewRegistration(autoradio.StatusEndpointPrefix, peerAddr, autoradio.IcecastPort), + presence.NewRegistration(autoradio.StatusEndpointPrefix, []net.IP{peerAddr}, gossipPort), presence.NewRegistration(autoradio.PublicEndpointPrefix, publicAddrs, autoradio.IcecastPort), ) if err != nil { @@ -146,28 +90,23 @@ func New(cli *clientv3.Client, nodeID, publicAddrs, peerAddr string, ice Icecast } // Watch the authoritative list of peer nodes. - n.publicPeers = presence.WatchEndpoints(ctx, cli, autoradio.PublicEndpointPrefix) - - // Run a leader election protocol advertising the peer address - // of our Icecast daemon. - icecastEP, err := pb.ParseEndpoint(nodeID, peerAddr, autoradio.IcecastPort) - if err != nil { - cancel() - return nil, err - } - // We participate in the leader election, but then do nothing - // special once we're the leader, just wait for - // cancellation. All the state transitions are managed by the - // election Watcher, which triggers an Icecast update. + n.publicPeers = presence.WatchEndpoints(ctx, session.Client(), autoradio.PublicEndpointPrefix) + + // Run a leader election protocol advertising the peer address of our + // Icecast daemon. We participate in the leader election, but then do + // nothing special once we're the leader, just wait for + // cancellation. All the state transitions are managed by the election + // Watcher, which triggers an Icecast update. + icecastEP := pb.NewEndpointWithIPAndPort(nodeID, []net.IP{peerAddr}, autoradio.IcecastPort) el := election.New(session, autoradio.IcecastElectionPrefix, icecastEP) go func() { err := el.Run(ctx, election.WaitForever) - // If the election runner's outer retry loop - // terminates, we are unable to talk to etcd to - // re-acquire leadership. In this case we should abort - // the Node, to prevent ending up in a state where all - // nodes are running but the cluster has no leader. + // If the election runner's outer retry loop terminates, we + // are unable to talk to etcd to re-acquire leadership. In + // this case we should abort the Node, to prevent ending up in + // a state where all nodes are running but the cluster has no + // leader. if err != nil && err != context.Canceled { log.Printf("leader election aborted: %v", err) } @@ -182,19 +121,19 @@ func New(cli *clientv3.Client, nodeID, publicAddrs, peerAddr string, ice Icecast } }() - // Start the background thread that updates Icecast whenever - // something changes. + // Start the background thread that updates Icecast whenever something + // changes. go n.updateIcecastThread() - // Start the status reporter that periodically sends our - // status to the frontends. - statusMgr := newStatusManager(ctx, cli) + // Start the status reporter that periodically sends our status to the + // frontends. + n.statusMgr = newStatusManager(ctx, session.Client()) go util.RunCron(ctx, statusUpdateInterval, func(_ context.Context) { - statusMgr.update(n.getStatus()) + n.statusMgr.update(n.getStatus()) }) // Create the loadBalancer that runs within the node. - n.lb, err = newLoadBalancer(ctx, n.publicPeers, statusMgr, lbSpec) + n.lb, err = newLoadBalancer(ctx, n.publicPeers, n.statusMgr, lbSpec) if err != nil { cancel() return nil, err @@ -203,10 +142,61 @@ func New(cli *clientv3.Client, nodeID, publicAddrs, peerAddr string, ice Icecast return n, nil } -func (n *Node) Stop() { - n.cancel() -} - +// Wait for the node to terminate. func (n *Node) Wait() { <-n.ctx.Done() } + +// This goroutine runs in the background and calls Update on the +// icecast.Controller whenever the updateCh channel is triggered. +func (n *Node) updateIcecastThread() { + for range n.updateCh { + elState := n.watcher.State() + if elState.State == election.StateUnknown { + log.Printf("not reloading Icecast because leadership status is unknown") + continue + } + + err := n.ice.Update(n.ctx, n.mounts.GetMounts(), + (elState.State == election.StateLeader), + elState.Leader.Addrs[0]) + if err != nil { + log.Printf("error reloading Icecast: %v", err) + } + + // Sleeping here prevents us from reloading Icecast too + // quickly when there are consecutive state changes. + time.Sleep(1 * time.Second) + } +} + +// Trigger the updateCh in order to reload Icecast. +func (n *Node) updateIcecast() { + select { + case n.updateCh <- struct{}{}: + default: + } +} + +// Return the Status protobuf for this node. +func (n *Node) getStatus() *pb.Status { + iceMounts, iceOk := n.ice.GetStatus() + ns := pb.Status{ + Name: n.name, + Timestamp: uint64(time.Now().UTC().UnixNano()), + IcecastOk: iceOk, + IcecastMounts: iceMounts, + BwUsage: 0, + MaxListeners: 1000, + } + return &ns +} + +// Return the current leader address (if any). +func (n *Node) leaderAddr() string { + leader := n.watcher.State().Leader + if leader == nil || len(leader.Addrs) == 0 { + return "" + } + return leader.Addrs[0] +} diff --git a/node/node_test.go b/node/node_test.go index 3d7dca458ad2964f33cabdb27d28707f662d8700..133fbc87204d4a84f810e5838727f2b72c6e720e 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -4,11 +4,13 @@ import ( "context" "fmt" "log" + "net" "os" "testing" "time" pb "git.autistici.org/ale/autoradio/proto" + "go.etcd.io/etcd/clientv3/concurrency" "go.etcd.io/etcd/embed" "go.etcd.io/etcd/etcdserver/api/v3client" ) @@ -34,30 +36,35 @@ func TestNode(t *testing.T) { <-e.Server.ReadyNotify() cli := v3client.New(e.Server) + session, _ := concurrency.NewSession(cli, concurrency.WithTTL(2)) + ctx, cancel := context.WithCancel(context.Background()) var nodes []*Node for i := 0; i < 2; i++ { n, err := New( - cli, - fmt.Sprintf("node%d", i+1), - fmt.Sprintf("127.0.0.1:%d", 3000+i), - fmt.Sprintf("127.0.0.1:%d", 4000+i), + ctx, + session, &fakeIcecast{}, + fmt.Sprintf("node%d", i+1), + []net.IP{net.ParseIP("127.0.0.1")}, + net.ParseIP("127.0.0.1"), + 4014, "random", ) if err != nil { t.Fatalf("NewNode: %v", err) } - go func() { - time.Sleep(10 * time.Second) - log.Printf("stopping everything") - n.Stop() - }() nodes = append(nodes, n) } + go func() { + time.Sleep(10 * time.Second) + log.Printf("stopping everything") + cancel() + }() + for _, n := range nodes { n.Wait() } diff --git a/node/server.go b/node/server.go new file mode 100644 index 0000000000000000000000000000000000000000..f83e2eebfd1ed5067f0f717b3a91a17a4dfa0e66 --- /dev/null +++ b/node/server.go @@ -0,0 +1,88 @@ +package node + +import ( + "net" + "strconv" + "sync" +) + +// A genericServer is just something that can be started and stopped. +type genericServer interface { + Serve() error + Stop() +} + +// The Server runs all the request-based components of a Node. It +// bundles together servers for all the supported protocols (HTTP, +// DNS, GRPC). A failure of any of them will cause the entire Server +// to fail. +type Server struct { + wg sync.WaitGroup + stopCh chan struct{} + errCh chan error +} + +func buildServer(servers ...genericServer) *Server { + ms := Server{ + stopCh: make(chan struct{}), + errCh: make(chan error, 1), + } + + for _, s := range servers { + ms.wg.Add(1) + go func(s genericServer) { + defer ms.wg.Done() + err := s.Serve() + if err != nil { + select { + case ms.errCh <- err: + default: + } + } + }(s) + go func(s genericServer) { + <-ms.stopCh + s.Stop() + }(s) + } + + return &ms +} + +func (s *Server) Stop() error { + close(s.stopCh) + s.wg.Wait() + + // There may or may not be an error stored in errCh. + var err error + select { + case err = <-s.errCh: + default: + } + close(s.errCh) + + return err +} + +// NewServer creates a new Server. Will use publicAddrs / peerAddr to +// build all the necessary addr/port combinations. +func NewServer(n *Node, domain string, nameservers []string, publicAddrs []net.IP, peerAddr net.IP, httpPort, dnsPort, gossipPort, icecastPort int) *Server { + httpHandler := newHTTPHandler(n, icecastPort, domain) + dnsHandler := newDNSHandler(n, domain, nameservers) + + servers := []genericServer{ + newStatusServer(mkaddr(peerAddr, gossipPort), n.statusMgr), + } + for _, ip := range publicAddrs { + servers = append(servers, + newHTTPServer(mkaddr(ip, httpPort), httpHandler), + newDNSServer(mkaddr(ip, dnsPort), "udp", dnsHandler), + newDNSServer(mkaddr(ip, dnsPort), "tcp", dnsHandler), + ) + } + return buildServer(servers...) +} + +func mkaddr(ip net.IP, port int) string { + return net.JoinHostPort(ip.String(), strconv.Itoa(port)) +} diff --git a/node/status.go b/node/status.go index 8366d8097d4818c8918f1b39bea09242eda63fc7..365456d87e1010505e579238e9654afef67ffbc3 100644 --- a/node/status.go +++ b/node/status.go @@ -4,13 +4,14 @@ import ( "context" "errors" "log" + "net" "sync" "time" "git.autistici.org/ale/autoradio" - "git.autistici.org/ale/autoradio/util" "git.autistici.org/ale/autoradio/coordination/presence" pb "git.autistici.org/ale/autoradio/proto" + "git.autistici.org/ale/autoradio/util" "go.etcd.io/etcd/clientv3" "google.golang.org/grpc" ) @@ -168,3 +169,32 @@ func (m *statusManager) Exchange(ctx context.Context, req *pb.ExchangeRequest) ( Nodes: m.statuses, }, nil } + +// genericServer wrapper for a grpc.Server running the statusManager service. +type statusServer struct { + *grpc.Server + addr string +} + +func newStatusServer(addr string, statusMgr *statusManager) *statusServer { + grpcServer := grpc.NewServer() + pb.RegisterGossipServiceServer(grpcServer, statusMgr) + return &statusServer{ + Server: grpcServer, + addr: addr, + } +} + +func (s *statusServer) Serve() error { + l, err := net.Listen("tcp", s.addr) + if err != nil { + return err + } + defer l.Close() //nolint + + return s.Server.Serve(l) +} + +func (s *statusServer) Stop() { + s.Server.Stop() +} diff --git a/util/flag.go b/util/flag.go index d736d0e479cb567312e27a66ce8abe2882903c28..012644040b960ab0d428d4f28232a9ad556c22a5 100644 --- a/util/flag.go +++ b/util/flag.go @@ -26,14 +26,35 @@ func (l *ipList) Set(value string) error { // Value is not an IP address, try to resolve it. ips, err := net.LookupIP(value) if err != nil { - return fmt.Errorf("Unable to parse IP address \"%s\": %v", value, err) + return fmt.Errorf("unable to parse IP address \"%s\": %v", value, err) } *l = append(*l, ips...) return nil } -func IPList(name, help string) *[]net.IP { +func IPListFlag(name, help string) *[]net.IP { var l ipList flag.Var(&l, name, help) return (*[]net.IP)(&l) } + +type ipFlag net.IP + +func (f *ipFlag) String() string { + return net.IP(*f).String() +} + +func (f *ipFlag) Set(value string) error { + ip := net.ParseIP(value) + if ip == nil { + return fmt.Errorf("unable to parse IP address \"%s\"", value) + } + *f = ipFlag(ip) + return nil +} + +func IPFlag(name, help string) *net.IP { + var f ipFlag + flag.Var(&f, name, help) + return (*net.IP)(&f) +}