From dd0d0be0e83de3253ad965a35d8d9ac5da9f0c37 Mon Sep 17 00:00:00 2001 From: ale <ale@incal.net> Date: Tue, 7 Jan 2020 22:06:58 +0000 Subject: [PATCH] Use golang.org/x/sync/errgroup to coordinate all servers Remove a lot of boilerplate by using the errgroup package instead of our own custom cumbersome solutions. Control server lifetime with an outer Context. --- cmd/radiod/radiod.go | 36 ++++------ node/dns.go | 18 +++-- node/http.go | 22 +++--- node/server.go | 72 ++++++++----------- node/status.go | 12 ++-- vendor/golang.org/x/sync/LICENSE | 27 +++++++ vendor/golang.org/x/sync/PATENTS | 22 ++++++ vendor/golang.org/x/sync/errgroup/errgroup.go | 66 +++++++++++++++++ vendor/vendor.json | 6 ++ 9 files changed, 200 insertions(+), 81 deletions(-) create mode 100644 vendor/golang.org/x/sync/LICENSE create mode 100644 vendor/golang.org/x/sync/PATENTS create mode 100644 vendor/golang.org/x/sync/errgroup/errgroup.go diff --git a/cmd/radiod/radiod.go b/cmd/radiod/radiod.go index 39b41233..f93484a0 100644 --- a/cmd/radiod/radiod.go +++ b/cmd/radiod/radiod.go @@ -8,7 +8,6 @@ import ( "os" "os/signal" "strings" - "sync" "syscall" "time" @@ -20,6 +19,7 @@ import ( "git.autistici.org/ale/autoradio/util" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/clientv3/concurrency" + "golang.org/x/sync/errgroup" ) var ( @@ -127,6 +127,9 @@ func main() { // goroutines: canceling it immediately terminates all // outbound requests. ctx, cancel := context.WithCancel(context.Background()) + var g *errgroup.Group + g, ctx = errgroup.WithContext(ctx) + go func() { // Stop everything if the etcd session goes away. <-session.Done() @@ -160,28 +163,19 @@ func main() { // Start all the network services. DNS will listen on all // non-loopback addresses on all interfaces, to let people run // a loopback cache if necessary. - srv := node.NewServer(n, *domain, strings.Split(*nameservers, ","), nonLocalAddrs(), *peerIP, *httpPort, *dnsPort, *gossipPort, autoradio.IcecastPort, *metricsPort) + srv := node.NewServer(ctx, n, *domain, strings.Split(*nameservers, ","), nonLocalAddrs(), *peerIP, *httpPort, *dnsPort, *gossipPort, autoradio.IcecastPort, *metricsPort) // Wait until the Node and the Server terminate. A failure in // either the network services or the Node itself should cause - // the other to terminate. Since the Server does not have a - // controlling Context, we just call its Stop method. - var wg sync.WaitGroup - wg.Add(2) - go func() { - err := srv.Wait() - if err != nil { - log.Printf("server error: %v", err) - cancel() - } - wg.Done() - }() - - go func() { + // the other to terminate. + g.Go(func() error { + return srv.Wait() + }) + g.Go(func() error { n.Wait() - srv.Stop() - wg.Done() - }() - - wg.Wait() + return nil + }) + if err := g.Wait(); err != nil && err != context.Canceled { + log.Printf("fatal error: %v", err) + } } diff --git a/node/dns.go b/node/dns.go index 74e4fbcd..0864f826 100644 --- a/node/dns.go +++ b/node/dns.go @@ -219,12 +219,18 @@ func newDNSServer(name, addr, proto string, h dns.Handler) *dnsServer { func (s *dnsServer) Name() string { return s.name } -func (s *dnsServer) Serve() error { - return s.Server.ListenAndServe() +func (s *dnsServer) Start(ctx context.Context) error { + return runDNSServerWithContext(ctx, s.Server) } -func (s *dnsServer) Stop() { - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) - s.Server.ShutdownContext(ctx) //nolint - cancel() +func runDNSServerWithContext(ctx context.Context, srv *dns.Server) error { + go func() { + <-ctx.Done() + + // Create an standalone context with a short timeout. + sctx, cancel := context.WithTimeout(context.Background(), shutdownTimeout) + srv.ShutdownContext(sctx) // nolint + cancel() + }() + return srv.ListenAndServe() } diff --git a/node/http.go b/node/http.go index 82313858..85435d45 100644 --- a/node/http.go +++ b/node/http.go @@ -279,16 +279,22 @@ func newHTTPServer(name, addr string, h http.Handler) *httpServer { func (s *httpServer) Name() string { return s.name } -func (s *httpServer) Serve() error { - err := s.Server.ListenAndServe() +func (s *httpServer) Start(ctx context.Context) error { + return runHTTPServerWithContext(ctx, s.Server) +} + +func runHTTPServerWithContext(ctx context.Context, srv *http.Server) error { + go func() { + <-ctx.Done() + + // Create an standalone context with a short timeout. + sctx, cancel := context.WithTimeout(context.Background(), shutdownTimeout) + srv.Shutdown(sctx) // nolint + cancel() + }() + err := srv.ListenAndServe() if err == http.ErrServerClosed { err = nil } return err } - -func (s *httpServer) Stop() { - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) - s.Server.Shutdown(ctx) //nolint - cancel() -} diff --git a/node/server.go b/node/server.go index 95cf7bde..5c3b7530 100644 --- a/node/server.go +++ b/node/server.go @@ -1,18 +1,22 @@ package node import ( + "context" "fmt" "log" "net" "strconv" - "sync" + "time" + + "golang.org/x/sync/errgroup" ) +var shutdownTimeout = 2 * time.Second + // A genericServer is just something that can be started and stopped. type genericServer interface { Name() string - Serve() error - Stop() + Start(context.Context) error } // The Server runs all the request-based components of a Node. It @@ -20,61 +24,45 @@ type genericServer interface { // DNS, GRPC). A failure of any of them will cause the entire Server // to fail. type Server struct { - wg sync.WaitGroup - stopCh chan struct{} - errCh chan error + rootCtx context.Context + cancel context.CancelFunc + eg *errgroup.Group } -func buildServer(servers ...genericServer) *Server { - ms := Server{ - stopCh: make(chan struct{}), - errCh: make(chan error, 1), - } +func buildServer(ctx context.Context, servers ...genericServer) *Server { + // Even if the outer context may be canceled, we want our own + // cancel function so that we can implement Stop(). + rootCtx, cancel := context.WithCancel(ctx) + g, innerCtx := errgroup.WithContext(rootCtx) for _, s := range servers { - ms.wg.Add(1) - go func(s genericServer) { - defer ms.wg.Done() + s := s // To lock it onto the closure for the next function. + g.Go(func() error { log.Printf("starting network service: %s", s.Name()) - err := s.Serve() + err := s.Start(innerCtx) if err != nil { log.Printf("%s: error: %v", s.Name(), err) - werr := fmt.Errorf("%s: %v", s.Name(), err) - select { - case ms.errCh <- werr: - // First error, stop all other services. - ms.Stop() - default: - } + return fmt.Errorf("%s: %v", s.Name(), err) } - }(s) - go func(s genericServer) { - <-ms.stopCh - s.Stop() - }(s) + return nil + }) } - return &ms + return &Server{ + rootCtx: rootCtx, + cancel: cancel, + eg: g, + } } // Stop all network services. func (s *Server) Stop() { - close(s.stopCh) + s.cancel() } // Wait for the services to terminate. func (s *Server) Wait() error { - s.wg.Wait() - - // There may or may not be an error stored in errCh. - var err error - select { - case err = <-s.errCh: - default: - } - close(s.errCh) - - return err + return s.eg.Wait() } // NewServer creates a new Server. Will use publicAddrs / peerAddr to @@ -84,7 +72,7 @@ func (s *Server) Wait() error { // DNS servers will bind only to the dnsAddrs (both TCP and // UDP). The metrics and the status services, which are internal, will // bind on peerAddr. -func NewServer(n *Node, domain string, nameservers []string, dnsAddrs []net.IP, peerAddr net.IP, httpPort, dnsPort, gossipPort, icecastPort, metricsPort int) *Server { +func NewServer(ctx context.Context, n *Node, domain string, nameservers []string, dnsAddrs []net.IP, peerAddr net.IP, httpPort, dnsPort, gossipPort, icecastPort, metricsPort int) *Server { httpHandler := newHTTPHandler(n, icecastPort, domain) dnsHandler := newDNSHandler(n, domain, nameservers) @@ -99,7 +87,7 @@ func NewServer(n *Node, domain string, nameservers []string, dnsAddrs []net.IP, newDNSServer("dns(tcp)", mkaddr(ip, dnsPort), "tcp", dnsHandler), ) } - return buildServer(servers...) + return buildServer(ctx, servers...) } func mkaddr(ip net.IP, port int) string { diff --git a/node/status.go b/node/status.go index f9e0cdd9..a8a8924f 100644 --- a/node/status.go +++ b/node/status.go @@ -209,16 +209,20 @@ func newStatusServer(addr string, statusMgr *statusManager) *statusServer { func (s *statusServer) Name() string { return "status" } -func (s *statusServer) Serve() error { +func (s *statusServer) Start(ctx context.Context) error { l, err := net.Listen("tcp", s.addr) if err != nil { return err } defer l.Close() //nolint - return s.Server.Serve(l) + return runGRPCServerWithContext(ctx, s.Server, l) } -func (s *statusServer) Stop() { - s.Server.Stop() +func runGRPCServerWithContext(ctx context.Context, srv *grpc.Server, l net.Listener) error { + go func() { + <-ctx.Done() + srv.Stop() + }() + return srv.Serve(l) } diff --git a/vendor/golang.org/x/sync/LICENSE b/vendor/golang.org/x/sync/LICENSE new file mode 100644 index 00000000..6a66aea5 --- /dev/null +++ b/vendor/golang.org/x/sync/LICENSE @@ -0,0 +1,27 @@ +Copyright (c) 2009 The Go Authors. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Google Inc. nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/golang.org/x/sync/PATENTS b/vendor/golang.org/x/sync/PATENTS new file mode 100644 index 00000000..73309904 --- /dev/null +++ b/vendor/golang.org/x/sync/PATENTS @@ -0,0 +1,22 @@ +Additional IP Rights Grant (Patents) + +"This implementation" means the copyrightable works distributed by +Google as part of the Go project. + +Google hereby grants to You a perpetual, worldwide, non-exclusive, +no-charge, royalty-free, irrevocable (except as stated in this section) +patent license to make, have made, use, offer to sell, sell, import, +transfer and otherwise run, modify and propagate the contents of this +implementation of Go, where such license applies only to those patent +claims, both currently owned or controlled by Google and acquired in +the future, licensable by Google that are necessarily infringed by this +implementation of Go. This grant does not include claims that would be +infringed only as a consequence of further modification of this +implementation. If you or your agent or exclusive licensee institute or +order or agree to the institution of patent litigation against any +entity (including a cross-claim or counterclaim in a lawsuit) alleging +that this implementation of Go or any code incorporated within this +implementation of Go constitutes direct or contributory patent +infringement, or inducement of patent infringement, then any patent +rights granted to you under this License for this implementation of Go +shall terminate as of the date such litigation is filed. diff --git a/vendor/golang.org/x/sync/errgroup/errgroup.go b/vendor/golang.org/x/sync/errgroup/errgroup.go new file mode 100644 index 00000000..9857fe53 --- /dev/null +++ b/vendor/golang.org/x/sync/errgroup/errgroup.go @@ -0,0 +1,66 @@ +// Copyright 2016 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package errgroup provides synchronization, error propagation, and Context +// cancelation for groups of goroutines working on subtasks of a common task. +package errgroup + +import ( + "context" + "sync" +) + +// A Group is a collection of goroutines working on subtasks that are part of +// the same overall task. +// +// A zero Group is valid and does not cancel on error. +type Group struct { + cancel func() + + wg sync.WaitGroup + + errOnce sync.Once + err error +} + +// WithContext returns a new Group and an associated Context derived from ctx. +// +// The derived Context is canceled the first time a function passed to Go +// returns a non-nil error or the first time Wait returns, whichever occurs +// first. +func WithContext(ctx context.Context) (*Group, context.Context) { + ctx, cancel := context.WithCancel(ctx) + return &Group{cancel: cancel}, ctx +} + +// Wait blocks until all function calls from the Go method have returned, then +// returns the first non-nil error (if any) from them. +func (g *Group) Wait() error { + g.wg.Wait() + if g.cancel != nil { + g.cancel() + } + return g.err +} + +// Go calls the given function in a new goroutine. +// +// The first call to return a non-nil error cancels the group; its error will be +// returned by Wait. +func (g *Group) Go(f func() error) { + g.wg.Add(1) + + go func() { + defer g.wg.Done() + + if err := f(); err != nil { + g.errOnce.Do(func() { + g.err = err + if g.cancel != nil { + g.cancel() + } + }) + } + }() +} diff --git a/vendor/vendor.json b/vendor/vendor.json index 043d4533..094c4ba3 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -936,6 +936,12 @@ "revision": "eb5bcb51f2a31c7d5141d810b70815c05d9c9146", "revisionTime": "2019-04-03T01:06:53Z" }, + { + "checksumSHA1": "iEK5hCRfrkdc1JOJsaiWuymHmeQ=", + "path": "golang.org/x/sync/errgroup", + "revision": "cd5d95a43a6e21273425c7ae415d3df9ea832eeb", + "revisionTime": "2019-08-30T23:09:31Z" + }, { "checksumSHA1": "h7T+YPjgH20kHOKUibYbCxdXHxs=", "path": "golang.org/x/sys/unix", -- GitLab