diff --git a/cmd/radiobench/radiobench.go b/cmd/radiobench/radiobench.go index 0da713130759d6bd5a5f8462feefbf1b44b96dc4..27a8f0468df4d3d4c6c3e161baba7fa6e05186ec 100644 --- a/cmd/radiobench/radiobench.go +++ b/cmd/radiobench/radiobench.go @@ -70,7 +70,18 @@ func randomDuration(max time.Duration) time.Duration { } func readstream(id int, streamUrl string) error { - resp, err := http.Get(streamUrl) + // Create a new client so we do not multiplex requests. + client := &http.Client{ + Transport: &http.Transport{}, + } + + req, err := http.NewRequest("GET", streamUrl, nil) + if err != nil { + return err + } + req.Header.Set("Connection", "close") + + resp, err := client.Do(req) if err != nil { stats.Error() return err @@ -103,7 +114,7 @@ func readstream(id int, streamUrl string) error { } } - log.Printf("worker(%d): connected to %s", id, streamUrl) + log.Printf("worker(%d): connected to %s", id, resp.Request.URL.String()) defer resp.Body.Close() diff --git a/node/http.go b/node/http.go index 75cd48e8b27c28018a72704850c5546924c0a1e4..823138587bf48cb325ff17c63830b3ae776f59c1 100644 --- a/node/http.go +++ b/node/http.go @@ -162,6 +162,13 @@ func serveSource(n *Node, mount *pb.Mount, w http.ResponseWriter, r *http.Reques } func serveRedirect(lb *loadBalancer, mount *pb.Mount, w http.ResponseWriter, r *http.Request) { + // The M3U file just sends to the plain redirect endpoint (as + // people might save the .m3u file). + if strings.HasSuffix(r.URL.Path, ".m3u") { + sendM3U(w, r) + return + } + // Redirect the user to a final target, depending on the load // balancing algorithm's decision. This enforces the // 1:1 mapping between Icecasts and frontends. @@ -181,12 +188,8 @@ func serveRedirect(lb *loadBalancer, mount *pb.Mount, w http.ResponseWriter, r * return } - if strings.HasSuffix(r.URL.Path, ".m3u") { - sendM3U(w, r) - } else { - targetURL := fmt.Sprintf("http://%s%s", targetAddr, autoradio.MountPathToIcecastPath(mount.Path)) - sendRedirect(w, r, targetURL) - } + targetURL := fmt.Sprintf("http://%s%s", targetAddr, autoradio.MountPathToIcecastPath(mount.Path)) + sendRedirect(w, r, targetURL) } // Serve a M3U response. This simply points back at the stream diff --git a/node/instrumentation.go b/node/instrumentation.go index 1d4799f630b12d6afd9f8881191a84ea2ca20390..ce43213f61a5f0c9653a6542226085ddacce0d84 100644 --- a/node/instrumentation.go +++ b/node/instrumentation.go @@ -25,7 +25,7 @@ var ( ) streamListeners = prometheus.NewGaugeVec( prometheus.GaugeOpts{ - Name: "listeners", + Name: "stream_listeners", Help: "Number of current listeners.", }, []string{"stream"}, diff --git a/node/node.go b/node/node.go index 7deefadf06cfeb365cb8036b826be965ef069df3..a0071a9b6c7265a32b94894bcdf96c812aaa4c89 100644 --- a/node/node.go +++ b/node/node.go @@ -106,7 +106,7 @@ func New(parentCtx context.Context, session *concurrency.Session, ice Icecast, n session, nodeID, presence.NewRegistration(autoradio.StatusEndpointPrefix, []net.IP{peerAddr}, gossipPort), - presence.NewRegistration(autoradio.PublicEndpointPrefix, publicAddrs, autoradio.IcecastPort), + presence.NewRegistration(autoradio.PublicEndpointPrefix, publicAddrs, 80), ) if err != nil { cancel() diff --git a/node/proxy.go b/node/proxy.go index 737a1a56b83b232405eed29de3a2180c3758d415..dc2e9187a334e9a45b78e5f9dbe57dee4c595ece 100644 --- a/node/proxy.go +++ b/node/proxy.go @@ -192,9 +192,10 @@ func isCloseError(err error) bool { // Simple two-way TCP proxy that copies data in both directions and // can shutdown each direction of the connection independently. func handleProxy(conn *net.TCPConn, upstream *net.TCPConn, streamName string) { + l := streamListeners.WithLabelValues(streamName) + l.Inc() var wg sync.WaitGroup wg.Add(2) - streamListeners.WithLabelValues(streamName).Inc() // Instrument both directions of the stream, but let the // bandwidth estimator count only the bytes sent to the user. @@ -208,7 +209,7 @@ func handleProxy(conn *net.TCPConn, upstream *net.TCPConn, streamName string) { }() wg.Wait() - streamListeners.WithLabelValues(streamName).Dec() + l.Dec() } // Implementation of a simple buffer cache, to minimize large