From 671d5ad13d16a0163f45d92744ffb536a8b82972 Mon Sep 17 00:00:00 2001
From: ale <ale@incal.net>
Date: Sat, 2 Oct 2021 23:31:21 +0100
Subject: [PATCH] Use requests instead of httplib.HTTPConnection

---
 src/mod_sso/test/httpd_integration_test.py | 136 +++++++++------------
 1 file changed, 61 insertions(+), 75 deletions(-)

diff --git a/src/mod_sso/test/httpd_integration_test.py b/src/mod_sso/test/httpd_integration_test.py
index 973eb1e..996b667 100755
--- a/src/mod_sso/test/httpd_integration_test.py
+++ b/src/mod_sso/test/httpd_integration_test.py
@@ -2,12 +2,11 @@
 
 import os
 import re
+import requests
 import subprocess
 import sys
 import time
 import unittest
-from http.client import HTTPConnection
-from http.cookies import SimpleCookie
 from urllib.parse import urlencode, urlsplit, parse_qsl
 
 sys.path.append("../../python")
@@ -94,20 +93,20 @@ def _stop_httpd(httpd, dump_log=False):
 
 def _query(url, host=None, cookie=None):
     """Make a simple request to url using httplib."""
-    conn = HTTPConnection("127.0.0.1", APACHE_PORT)
     headers = {"Host": host or "localhost"}
     if cookie:
-        headers["Cookie"] = cookie
-    conn.request("GET", url, headers=headers)
-    resp = conn.getresponse()
+        headers['Cookie'] = cookie
+    resp = requests.get(
+        f'http://127.0.0.1:{APACHE_PORT}{url}',
+        headers=headers,
+        allow_redirects=False)
     location = None
     body = None
-    if resp.status in (301, 302):
-        location = resp.getheader("Location")
-    elif resp.status == 200:
-        body = resp.read()
-    conn.close()
-    return (resp.status, body, location)
+    if resp.status_code in (301, 302):
+        location = resp.headers["Location"]
+    elif resp.status_code == 200:
+        body = resp.text
+    return (resp.status_code, body, location)
 
 
 def mkcookie(tkt):
@@ -253,7 +252,7 @@ class HttpdIntegrationTestBase(unittest.TestCase):
                         name, check["url"], status, check["status"]))
                 if "body" in check:
                     self.assertTrue(
-                        check["body"].encode('utf-8') in body,
+                        check["body"] in body,
                         "test: '%s'\nbody mismatch for %s (exp '%s')" % (
                             name, check["url"], check["body"]))
                 if "location" in check:
@@ -270,50 +269,41 @@ class HttpdIntegrationTestBase(unittest.TestCase):
         if not sso_login_url:
             sso_login_url = '/%s/sso_login' % sso_service.split('/', 1)[1]
 
-        cookies = SimpleCookie()
+        sess = requests.Session()
 
         # Make a request to a protected URL.
-        conn = HTTPConnection("127.0.0.1", APACHE_PORT)
-        conn.request("GET", url, headers={'Host': host_header})
-        resp = conn.getresponse()
-        self.assertEquals(302, resp.status)
-        set_cookie_hdr = resp.getheader("Set-Cookie")
-        self.assertTrue(set_cookie_hdr)
-        cookies.load(set_cookie_hdr)
-        location = resp.getheader("Location")
+        resp = sess.get(
+            f'http://127.0.0.1:{APACHE_PORT}{url}',
+            headers={'Host': host_header},
+            allow_redirects=False)
+        self.assertEquals(302, resp.status_code)
+        location = resp.headers["Location"]
         location_u = urlsplit(location)
         location_args = dict(parse_qsl(location_u.query))
         nonce = location_args.get('n')
         self.assertTrue(nonce)
-        conn.close()
+        session_cookie = resp.cookies['_sso_local_session']
 
         # Now call the /sso_login endpoint.
-        conn = HTTPConnection("127.0.0.1", APACHE_PORT)
         tkt = self._ticket(nonce=nonce, service=sso_service)
-        conn.request("GET", sso_login_url + "?" + urlencode(
-            {"t": tkt, "d": "https://" + host_header + url}), headers={
-                "Cookie": cookies.output(attrs=[], header='', sep='; '),
-                "Host": host_header,
-            })
-        resp = conn.getresponse()
-        self.assertEquals(302, resp.status)
-        set_cookie_hdr = resp.getheader("Set-Cookie")
-        self.assertTrue(set_cookie_hdr)
-        cookies.load(set_cookie_hdr)
-        self.assertTrue('SSO_test' in cookies,
-                        'missing cookie: %s' % cookies.output())
-        self.assertEquals(tkt, cookies['SSO_test'].value)
-        conn.close()
+        query_args = urlencode({"t": tkt, "d": "https://" + host_header + url})
+        resp = sess.get(
+            f'http://127.0.0.1:{APACHE_PORT}{sso_login_url}?{query_args}',
+            headers={'Host': host_header},
+            cookies={'_sso_local_session': session_cookie},
+            allow_redirects=False)
+        self.assertEquals(302, resp.status_code)
+        self.assertTrue('SSO_test' in resp.cookies,
+                        'missing cookie: %s\n%s' % (resp.cookies, resp.headers))
+        self.assertEquals(tkt, resp.cookies['SSO_test'])
 
         # Make the original request again.
-        conn = HTTPConnection("127.0.0.1", APACHE_PORT)
-        conn.request("GET", url, headers={
-            "Cookie": cookies.output(attrs=[], header='', sep='; '),
-            'Host': host_header,
-        })
-        resp = conn.getresponse()
-        self.assertEquals(200, resp.status)
-        conn.close()
+        resp = sess.get(
+            f'http://127.0.0.1:{APACHE_PORT}{url}',
+            headers={'Host': host_header},
+            cookies={'SSO_test': tkt},
+            allow_redirects=False)
+        self.assertEquals(200, resp.status_code)
 
 
 class HttpdIntegrationTest(HttpdIntegrationTestBase):
@@ -328,7 +318,6 @@ class HttpdIntegrationTest(HttpdIntegrationTestBase):
         # to spot issues where we are not cleaning up state properly
         # between requests.
         n = 100
-        errors = 0
         for i in range(n):
             cookie = 'SSO_test=%s' % self._ticket()
             status, body, location = _query("/index.html", cookie=cookie)
@@ -341,45 +330,41 @@ class HttpdIntegrationTest(HttpdIntegrationTestBase):
                                         cookie=mkcookie(tkt))
         self.assertEquals(200, status)
         self.assertTrue(body)
-        self.assertTrue(b"REMOTE_USER=testuser" in body)
-        self.assertTrue(b"SSO_USER=testuser" in body)
-        self.assertTrue(b"SSO_SERVICE=service.example.com/" in body)
-        self.assertTrue((b"SSO_TICKET=" + tkt.encode('utf-8')) in body)
+        self.assertTrue("REMOTE_USER=testuser" in body)
+        self.assertTrue("SSO_USER=testuser" in body)
+        self.assertTrue("SSO_SERVICE=service.example.com/" in body)
+        self.assertTrue(("SSO_TICKET=" + tkt) in body)
 
     def test_sso_login(self):
         # Call the /sso_login endpoint, verify that it sets the local
         # SSO cookie to whatever the 't' parameter says.
-        cookies = SimpleCookie()
-        conn = HTTPConnection("127.0.0.1", APACHE_PORT)
         tkt = self._ticket()
-        conn.request("GET", "/sso_login?%s" % urlencode(
-            {"t": tkt, "d": "https://service.example.com/index.html"}))
-        resp = conn.getresponse()
-        self.assertEquals(302, resp.status)
-        set_cookie_hdr = resp.getheader("Set-Cookie")
+        query_args = urlencode(
+            {"t": tkt, "d": "https://service.example.com/index.html"})
+        resp = requests.get(
+            f'http://127.0.0.1:{APACHE_PORT}/sso_login?{query_args}',
+            allow_redirects=False)
+        self.assertEquals(302, resp.status_code)
+        set_cookie_hdr = resp.headers["Set-Cookie"]
         self.assertTrue(set_cookie_hdr)
-        cookies.load(set_cookie_hdr)
-        self.assertTrue('SSO_test' in cookies,
-                        'missing cookie: %s' % cookies.output())
-        self.assertEquals(tkt, cookies['SSO_test'].value)
-        conn.close()
+        self.assertTrue('SSO_test' in resp.cookies,
+                        'missing cookie: %s' % resp.cookies)
+        self.assertEquals(tkt, resp.cookies['SSO_test'])
 
     def test_sso_logout(self):
         # test the /sso_logout endpoint
-        conn = HTTPConnection("127.0.0.1", APACHE_PORT)
-        conn.request("GET", "/sso_logout", headers={
-            "Cookie": mkcookie(self._ticket())})
-        resp = conn.getresponse()
-        set_cookie_hdr = resp.getheader("Set-Cookie")
+        resp = requests.get(
+            f'http://127.0.0.1:{APACHE_PORT}/sso_logout',
+            headers={"Cookie": mkcookie(self._ticket())},
+            allow_redirects=False)
+        set_cookie_hdr = resp.headers["Set-Cookie"]
         self.assertTrue(set_cookie_hdr)
         self.assertTrue("SSO_test=;" in set_cookie_hdr)
-        conn.close()
 
 
 class HttpdIntegrationTestWithNonces(HttpdIntegrationTestBase):
 
     CONFIG = 'test-httpd-2.4-nonces.conf'
-    DUMP_LOG = True
 
     def setUp(self):
         with open('session.key', 'wb') as fd:
@@ -396,11 +381,11 @@ class HttpdIntegrationTestWithNonces(HttpdIntegrationTestBase):
                                         cookie=mkcookie(tkt))
         self.assertEquals(200, status)
         self.assertTrue(body)
-        self.assertTrue(b"REMOTE_USER=testuser" in body)
-        self.assertTrue(b"SSO_USER=testuser" in body)
-        self.assertTrue(b"SSO_SERVICE=service.example.com/" in body)
-        self.assertTrue((b"SSO_TICKET=" + tkt.encode('utf-8')) in body)
-        self.assertTrue(b"SSO_NONCE=NONCE1" in body)
+        self.assertTrue("REMOTE_USER=testuser" in body)
+        self.assertTrue("SSO_USER=testuser" in body)
+        self.assertTrue("SSO_SERVICE=service.example.com/" in body)
+        self.assertTrue(("SSO_TICKET=" + tkt) in body)
+        self.assertTrue("SSO_NONCE=NONCE1" in body)
 
     def test_login_workflow(self):
         self._login_workflow("service.example.com", "/protected-user/index.html",
@@ -416,6 +401,7 @@ class HttpdIntegrationTestWithNonces(HttpdIntegrationTestBase):
 class WebmailIntegrationTestWithNonces(HttpdIntegrationTestBase):
 
     CONFIG = 'test-httpd-2.4-webmail.conf'
+    DUMP_LOG = True
 
     def setUp(self):
         with open('session.key', 'wb') as fd:
-- 
GitLab