...
 
Commits (2)
This diff is collapsed.
......@@ -56,6 +56,7 @@ type wrappedWriter interface {
// request. The additional streamName parameter is used for
// instrumentation.
func doIcecastProxy(rw http.ResponseWriter, req *http.Request, target *url.URL, streamName string) {
log.Printf("proxy: in=%s out=%s stream=%s", req.URL.String(), target.String(), streamName)
outreq := new(http.Request)
*outreq = *req // includes shallow copies of maps, but okay
......
package node
import (
"fmt"
"io"
"net/http"
"net/http/httptest"
"net/url"
"sync"
"sync/atomic"
"testing"
"time"
)
var proxyStopFlag int32
func proxyTestStopped() bool {
return atomic.LoadInt32(&proxyStopFlag) != 0
}
func stopProxyTest() {
atomic.StoreInt32(&proxyStopFlag, 1)
}
const testProxyStreamServerBufSz = 512
func startTestStreamServer(path string, c byte) *httptest.Server {
// Build a buffer of 'c' just so we don't peg the cpu too much.
b := make([]byte, testProxyStreamServerBufSz)
for i := 0; i < len(b); i++ {
b[i] = c
}
h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != path {
http.NotFound(w, r)
return
}
// Continuously serve 'c', at a certain rate.
for !proxyTestStopped() {
if _, err := w.Write(b); err != nil {
break
}
time.Sleep(10 * time.Millisecond)
}
})
return httptest.NewServer(h)
}
func testStreamClient(t testing.TB, streamURL string, c byte) {
resp, err := http.Get(streamURL)
if err != nil {
t.Fatalf("Get(%s) error: %v", streamURL, err)
}
if resp.StatusCode != 200 {
t.Fatalf("Get(%s) error: %s", streamURL, resp.Status)
}
n := 0
var b [1]byte
for {
_, err := resp.Body.Read(b[:])
if err == io.EOF {
break
}
if err != nil {
t.Fatalf("error reading data from %s: %v", streamURL, err)
}
if b[0] != c {
t.Fatalf("bad response from %s at byte %d (%v instead of %v)", streamURL, n, b[0], c)
}
}
}
func TestProxy(t *testing.T) {
streams := 20
clients := 100
// Build a bunch of simulate backend streaming servers. Each
// of these servers will serve a stream of bytes of a specific
// value, so that clients can verify they're talking to the
// right backend through the proxy.
proxyStopFlag = 0
var servers []*httptest.Server
streamMap := make(map[string]*url.URL)
for i := 0; i < streams; i++ {
streamPath := fmt.Sprintf("/stream%d.ogg", i)
srv := startTestStreamServer(streamPath, byte(33+i))
servers = append(servers, srv)
u, _ := url.Parse(srv.URL + streamPath)
streamMap[streamPath] = u
}
// Create a proxy node that will forward streams to backends.
proxySrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
u, ok := streamMap[r.URL.Path]
if !ok {
http.NotFound(w, r)
return
}
doIcecastProxy(w, r, u, r.URL.Path)
}))
// Start a large number of clients, talking to our proxy node.
var wg sync.WaitGroup
for i := 0; i < clients; i++ {
streamNum := i % streams
streamURL := fmt.Sprintf("%s/stream%d.ogg", proxySrv.URL, streamNum)
streamCh := byte(33 + streamNum)
wg.Add(1)
go func() {
defer wg.Done()
testStreamClient(t, streamURL, streamCh)
}()
}
// Now wait 10 seconds.
time.Sleep(10 * time.Second)
stopProxyTest()
for i := 0; i < streams; i++ {
servers[i].Close()
}
}