From f94312529de6eb1eab9115d9e36c7a70900b152e Mon Sep 17 00:00:00 2001 From: ale <ale@incal.net> Date: Sat, 22 Feb 2020 10:20:00 +0000 Subject: [PATCH] Add a stress test for the proxy module --- node/proxy.go | 1 + node/proxy_test.go | 124 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 125 insertions(+) create mode 100644 node/proxy_test.go diff --git a/node/proxy.go b/node/proxy.go index dc2e9187..126ed022 100644 --- a/node/proxy.go +++ b/node/proxy.go @@ -56,6 +56,7 @@ type wrappedWriter interface { // request. The additional streamName parameter is used for // instrumentation. func doIcecastProxy(rw http.ResponseWriter, req *http.Request, target *url.URL, streamName string) { + log.Printf("proxy: in=%s out=%s stream=%s", req.URL.String(), target.String(), streamName) outreq := new(http.Request) *outreq = *req // includes shallow copies of maps, but okay diff --git a/node/proxy_test.go b/node/proxy_test.go new file mode 100644 index 00000000..9f09f6f2 --- /dev/null +++ b/node/proxy_test.go @@ -0,0 +1,124 @@ +package node + +import ( + "fmt" + "io" + "net/http" + "net/http/httptest" + "net/url" + "sync" + "sync/atomic" + "testing" + "time" +) + +var proxyStopFlag int32 + +func proxyTestStopped() bool { + return atomic.LoadInt32(&proxyStopFlag) != 0 +} + +func stopProxyTest() { + atomic.StoreInt32(&proxyStopFlag, 1) +} + +const testProxyStreamServerBufSz = 512 + +func startTestStreamServer(path string, c byte) *httptest.Server { + // Build a buffer of 'c' just so we don't peg the cpu too much. + b := make([]byte, testProxyStreamServerBufSz) + for i := 0; i < len(b); i++ { + b[i] = c + } + h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != path { + http.NotFound(w, r) + return + } + // Continuously serve 'c', at a certain rate. + for !proxyTestStopped() { + if _, err := w.Write(b); err != nil { + break + } + time.Sleep(10 * time.Millisecond) + } + }) + return httptest.NewServer(h) +} + +func testStreamClient(t testing.TB, streamURL string, c byte) { + resp, err := http.Get(streamURL) + if err != nil { + t.Fatalf("Get(%s) error: %v", streamURL, err) + } + if resp.StatusCode != 200 { + t.Fatalf("Get(%s) error: %s", streamURL, resp.Status) + } + + n := 0 + var b [1]byte + for { + _, err := resp.Body.Read(b[:]) + if err == io.EOF { + break + } + if err != nil { + t.Fatalf("error reading data from %s: %v", streamURL, err) + } + if b[0] != c { + t.Fatalf("bad response from %s at byte %d (%v instead of %v)", streamURL, n, b[0], c) + } + } +} + +func TestProxy(t *testing.T) { + streams := 20 + clients := 100 + + // Build a bunch of simulate backend streaming servers. Each + // of these servers will serve a stream of bytes of a specific + // value, so that clients can verify they're talking to the + // right backend through the proxy. + proxyStopFlag = 0 + var servers []*httptest.Server + streamMap := make(map[string]*url.URL) + for i := 0; i < streams; i++ { + streamPath := fmt.Sprintf("/stream%d.ogg", i) + srv := startTestStreamServer(streamPath, byte(33+i)) + servers = append(servers, srv) + + u, _ := url.Parse(srv.URL + streamPath) + streamMap[streamPath] = u + } + + // Create a proxy node that will forward streams to backends. + proxySrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + u, ok := streamMap[r.URL.Path] + if !ok { + http.NotFound(w, r) + return + } + doIcecastProxy(w, r, u, r.URL.Path) + })) + + // Start a large number of clients, talking to our proxy node. + var wg sync.WaitGroup + for i := 0; i < clients; i++ { + streamNum := i % streams + streamURL := fmt.Sprintf("%s/stream%d.ogg", proxySrv.URL, streamNum) + streamCh := byte(33 + streamNum) + wg.Add(1) + go func() { + defer wg.Done() + testStreamClient(t, streamURL, streamCh) + }() + } + + // Now wait 10 seconds. + time.Sleep(10 * time.Second) + + stopProxyTest() + for i := 0; i < streams; i++ { + servers[i].Close() + } +} -- GitLab