Skip to content
Snippets Groups Projects
Commit 32f6bfbe authored by ale's avatar ale
Browse files

Redirect clients to the proxied URLs

And various other minor fixes.
parent 37cd3dfa
No related branches found
No related tags found
1 merge request!1v2.0
...@@ -70,7 +70,18 @@ func randomDuration(max time.Duration) time.Duration { ...@@ -70,7 +70,18 @@ func randomDuration(max time.Duration) time.Duration {
} }
func readstream(id int, streamUrl string) error { func readstream(id int, streamUrl string) error {
resp, err := http.Get(streamUrl) // Create a new client so we do not multiplex requests.
client := &http.Client{
Transport: &http.Transport{},
}
req, err := http.NewRequest("GET", streamUrl, nil)
if err != nil {
return err
}
req.Header.Set("Connection", "close")
resp, err := client.Do(req)
if err != nil { if err != nil {
stats.Error() stats.Error()
return err return err
...@@ -103,7 +114,7 @@ func readstream(id int, streamUrl string) error { ...@@ -103,7 +114,7 @@ func readstream(id int, streamUrl string) error {
} }
} }
log.Printf("worker(%d): connected to %s", id, streamUrl) log.Printf("worker(%d): connected to %s", id, resp.Request.URL.String())
defer resp.Body.Close() defer resp.Body.Close()
......
...@@ -162,6 +162,13 @@ func serveSource(n *Node, mount *pb.Mount, w http.ResponseWriter, r *http.Reques ...@@ -162,6 +162,13 @@ func serveSource(n *Node, mount *pb.Mount, w http.ResponseWriter, r *http.Reques
} }
func serveRedirect(lb *loadBalancer, mount *pb.Mount, w http.ResponseWriter, r *http.Request) { func serveRedirect(lb *loadBalancer, mount *pb.Mount, w http.ResponseWriter, r *http.Request) {
// The M3U file just sends to the plain redirect endpoint (as
// people might save the .m3u file).
if strings.HasSuffix(r.URL.Path, ".m3u") {
sendM3U(w, r)
return
}
// Redirect the user to a final target, depending on the load // Redirect the user to a final target, depending on the load
// balancing algorithm's decision. This enforces the // balancing algorithm's decision. This enforces the
// 1:1 mapping between Icecasts and frontends. // 1:1 mapping between Icecasts and frontends.
...@@ -181,12 +188,8 @@ func serveRedirect(lb *loadBalancer, mount *pb.Mount, w http.ResponseWriter, r * ...@@ -181,12 +188,8 @@ func serveRedirect(lb *loadBalancer, mount *pb.Mount, w http.ResponseWriter, r *
return return
} }
if strings.HasSuffix(r.URL.Path, ".m3u") { targetURL := fmt.Sprintf("http://%s%s", targetAddr, autoradio.MountPathToIcecastPath(mount.Path))
sendM3U(w, r) sendRedirect(w, r, targetURL)
} else {
targetURL := fmt.Sprintf("http://%s%s", targetAddr, autoradio.MountPathToIcecastPath(mount.Path))
sendRedirect(w, r, targetURL)
}
} }
// Serve a M3U response. This simply points back at the stream // Serve a M3U response. This simply points back at the stream
......
...@@ -25,7 +25,7 @@ var ( ...@@ -25,7 +25,7 @@ var (
) )
streamListeners = prometheus.NewGaugeVec( streamListeners = prometheus.NewGaugeVec(
prometheus.GaugeOpts{ prometheus.GaugeOpts{
Name: "listeners", Name: "stream_listeners",
Help: "Number of current listeners.", Help: "Number of current listeners.",
}, },
[]string{"stream"}, []string{"stream"},
......
...@@ -106,7 +106,7 @@ func New(parentCtx context.Context, session *concurrency.Session, ice Icecast, n ...@@ -106,7 +106,7 @@ func New(parentCtx context.Context, session *concurrency.Session, ice Icecast, n
session, session,
nodeID, nodeID,
presence.NewRegistration(autoradio.StatusEndpointPrefix, []net.IP{peerAddr}, gossipPort), presence.NewRegistration(autoradio.StatusEndpointPrefix, []net.IP{peerAddr}, gossipPort),
presence.NewRegistration(autoradio.PublicEndpointPrefix, publicAddrs, autoradio.IcecastPort), presence.NewRegistration(autoradio.PublicEndpointPrefix, publicAddrs, 80),
) )
if err != nil { if err != nil {
cancel() cancel()
......
...@@ -192,9 +192,10 @@ func isCloseError(err error) bool { ...@@ -192,9 +192,10 @@ func isCloseError(err error) bool {
// Simple two-way TCP proxy that copies data in both directions and // Simple two-way TCP proxy that copies data in both directions and
// can shutdown each direction of the connection independently. // can shutdown each direction of the connection independently.
func handleProxy(conn *net.TCPConn, upstream *net.TCPConn, streamName string) { func handleProxy(conn *net.TCPConn, upstream *net.TCPConn, streamName string) {
l := streamListeners.WithLabelValues(streamName)
l.Inc()
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(2) wg.Add(2)
streamListeners.WithLabelValues(streamName).Inc()
// Instrument both directions of the stream, but let the // Instrument both directions of the stream, but let the
// bandwidth estimator count only the bytes sent to the user. // bandwidth estimator count only the bytes sent to the user.
...@@ -208,7 +209,7 @@ func handleProxy(conn *net.TCPConn, upstream *net.TCPConn, streamName string) { ...@@ -208,7 +209,7 @@ func handleProxy(conn *net.TCPConn, upstream *net.TCPConn, streamName string) {
}() }()
wg.Wait() wg.Wait()
streamListeners.WithLabelValues(streamName).Dec() l.Dec()
} }
// Implementation of a simple buffer cache, to minimize large // Implementation of a simple buffer cache, to minimize large
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment