From e32ecb394d5c2e90ed3de9d6e2fc99c0d92323cc Mon Sep 17 00:00:00 2001 From: Gavin D'souza Date: Tue, 26 Apr 2022 11:29:06 +0530 Subject: [PATCH] refactor: Oauth20 tests Use App client app directly instead of requests. This removes dependency on needing a web server running for your tests. Also, contributes to coverage now. We can see which lines are impacted with each use case. --- frappe/tests/test_oauth20.py | 330 ++++++++++++++++++----------------- 1 file changed, 167 insertions(+), 163 deletions(-) diff --git a/frappe/tests/test_oauth20.py b/frappe/tests/test_oauth20.py index a634ace62a..8ebff2bca6 100644 --- a/frappe/tests/test_oauth20.py +++ b/frappe/tests/test_oauth20.py @@ -1,88 +1,111 @@ -# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors +# Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and Contributors # License: MIT. See LICENSE import unittest +from typing import TYPE_CHECKING, Dict, Optional from urllib.parse import parse_qs, urljoin, urlparse import jwt import requests +from werkzeug.test import TestResponse import frappe from frappe.integrations.oauth2 import encode_params from frappe.test_runner import make_test_records +from frappe.tests.test_api import get_test_client, make_request, suppress_stdout + +if TYPE_CHECKING: + from frappe.integrations.doctype.social_login_key.social_login_key import SocialLoginKey -class TestOAuth20(unittest.TestCase): - def setUp(self): - make_test_records("OAuth Client") +class FrappeRequestTestCase(unittest.TestCase): + TEST_CLIENT = get_test_client() + + @property + def sid(self) -> str: + if not getattr(self, "_sid", None): + from frappe.auth import CookieManager, LoginManager + from frappe.utils import set_request + + set_request(path="/") + frappe.local.cookie_manager = CookieManager() + frappe.local.login_manager = LoginManager() + frappe.local.login_manager.login_as("test@example.com") + self._sid = frappe.session.sid + + return self._sid + + def get(self, path: str, params: Optional[Dict] = None, **kwargs) -> TestResponse: + return make_request(target=self.TEST_CLIENT.get, args=(path,), kwargs={"data": params, **kwargs}) + + def post(self, path, data, **kwargs) -> TestResponse: + return make_request(target=self.TEST_CLIENT.post, args=(path,), kwargs={"data": data, **kwargs}) + + def put(self, path, data, **kwargs) -> TestResponse: + return make_request(target=self.TEST_CLIENT.put, args=(path,), kwargs={"data": data, **kwargs}) + + def delete(self, path, **kwargs) -> TestResponse: + return make_request(target=self.TEST_CLIENT.delete, args=(path,), kwargs=kwargs) + + +class TestOAuth20(FrappeRequestTestCase): + @classmethod + def setUpClass(cls): + make_test_records("OAuth Client", force=True) make_test_records("User") + client = frappe.get_all("OAuth Client", fields=["*"])[0] - self.client_id = client.get("client_id") - self.client_secret = client.get("client_secret") - self.form_header = {"content-type": "application/x-www-form-urlencoded"} - self.scope = "all openid" - self.redirect_uri = "http://localhost" + cls.client_id = client.get("client_id") + cls.client_secret = client.get("client_secret") + cls.form_header = {"content-type": "application/x-www-form-urlencoded"} + cls.scope = "all openid" + cls.redirect_uri = "http://localhost" # Set Frappe server URL reqired for id_token generation - try: - frappe_login_key = frappe.get_doc("Social Login Key", "frappe") - except frappe.DoesNotExistError: - frappe_login_key = frappe.new_doc("Social Login Key") - + frappe_login_key: "SocialLoginKey" = frappe.new_doc("Social Login Key") frappe_login_key.get_social_login_provider("Frappe", initialize=True) frappe_login_key.base_url = frappe.utils.get_url() frappe_login_key.enable_social_login = 0 - frappe_login_key.save() + frappe_login_key.insert(ignore_if_duplicate=True) frappe.db.commit() def test_invalid_login(self): - self.assertFalse(check_valid_openid_response()) + with suppress_stdout(): + self.assertFalse(check_valid_openid_response(client=self)) def test_login_using_authorization_code(self): update_client_for_auth_code_grant(self.client_id) - session = requests.Session() - login(session) - - redirect_destination = None - # Go to Authorize url - try: - session.get( - get_full_url("/api/method/frappe.integrations.oauth2.authorize"), - params=encode_params( - { - "client_id": self.client_id, - "scope": self.scope, - "response_type": "code", - "redirect_uri": self.redirect_uri, - } - ), - ) - except requests.exceptions.ConnectionError as ex: - redirect_destination = ex.request.url - - # Get authorization code from redirected URL - query = parse_qs(urlparse(redirect_destination).query) + resp = self.get( + "/api/method/frappe.integrations.oauth2.authorize", + { + "sid": self.sid, + "client_id": self.client_id, + "scope": self.scope, + "response_type": "code", + "redirect_uri": self.redirect_uri, + }, + follow_redirects=True, + ) + query = parse_qs(resp.request.environ["QUERY_STRING"]) auth_code = query.get("code")[0] # Request for bearer token - token_response = requests.post( - get_full_url("/api/method/frappe.integrations.oauth2.get_token"), + token_response = self.post( + "/api/method/frappe.integrations.oauth2.get_token", headers=self.form_header, - data=encode_params( - { - "grant_type": "authorization_code", - "code": auth_code, - "redirect_uri": self.redirect_uri, - "client_id": self.client_id, - "scope": self.scope, - } - ), + data={ + "grant_type": "authorization_code", + "code": auth_code, + "redirect_uri": self.redirect_uri, + "client_id": self.client_id, + "scope": self.scope, + }, ) # Parse bearer token json - bearer_token = token_response.json() + bearer_token = token_response.json self.assertTrue(bearer_token.get("access_token")) self.assertTrue(bearer_token.get("expires_in")) @@ -90,7 +113,9 @@ class TestOAuth20(unittest.TestCase): self.assertTrue(bearer_token.get("refresh_token")) self.assertTrue(bearer_token.get("scope")) self.assertTrue(bearer_token.get("token_type") == "Bearer") - self.assertTrue(check_valid_openid_response(bearer_token.get("access_token"))) + self.assertTrue( + check_valid_openid_response(access_token=bearer_token.get("access_token"), client=self) + ) decoded_token = self.decode_id_token(bearer_token.get("id_token")) self.assertEqual(decoded_token["email"], "test@example.com") @@ -98,51 +123,41 @@ class TestOAuth20(unittest.TestCase): def test_login_using_authorization_code_with_pkce(self): update_client_for_auth_code_grant(self.client_id) - session = requests.Session() - login(session) - - redirect_destination = None - # Go to Authorize url - try: - session.get( - get_full_url("/api/method/frappe.integrations.oauth2.authorize"), - params=encode_params( - { - "client_id": self.client_id, - "scope": self.scope, - "response_type": "code", - "redirect_uri": self.redirect_uri, - "code_challenge_method": "S256", - "code_challenge": "21XaP8MJjpxCMRxgEzBP82sZ73PRLqkyBUta1R309J0", - } - ), - ) - except requests.exceptions.ConnectionError as ex: - redirect_destination = ex.request.url + resp = self.get( + "/api/method/frappe.integrations.oauth2.authorize", + { + "sid": self.sid, + "client_id": self.client_id, + "scope": self.scope, + "response_type": "code", + "redirect_uri": self.redirect_uri, + "code_challenge_method": "S256", + "code_challenge": "21XaP8MJjpxCMRxgEzBP82sZ73PRLqkyBUta1R309J0", + }, + follow_redirects=True, + ) # Get authorization code from redirected URL - query = parse_qs(urlparse(redirect_destination).query) + query = parse_qs(resp.request.environ["QUERY_STRING"]) auth_code = query.get("code")[0] # Request for bearer token - token_response = requests.post( - get_full_url("/api/method/frappe.integrations.oauth2.get_token"), + token_response = self.post( + "/api/method/frappe.integrations.oauth2.get_token", headers=self.form_header, - data=encode_params( - { - "grant_type": "authorization_code", - "code": auth_code, - "redirect_uri": self.redirect_uri, - "client_id": self.client_id, - "scope": self.scope, - "code_verifier": "420", - } - ), + data={ + "grant_type": "authorization_code", + "code": auth_code, + "redirect_uri": self.redirect_uri, + "client_id": self.client_id, + "scope": self.scope, + "code_verifier": "420", + }, ) # Parse bearer token json - bearer_token = token_response.json() + bearer_token = token_response.json self.assertTrue(bearer_token.get("access_token")) self.assertTrue(bearer_token.get("id_token")) @@ -157,51 +172,41 @@ class TestOAuth20(unittest.TestCase): client.save() frappe.db.commit() - session = requests.Session() - login(session) - - redirect_destination = None - # Go to Authorize url - try: - session.get( - get_full_url("/api/method/frappe.integrations.oauth2.authorize"), - params=encode_params( - { - "client_id": self.client_id, - "scope": self.scope, - "response_type": "code", - "redirect_uri": self.redirect_uri, - } - ), - ) - except requests.exceptions.ConnectionError as ex: - redirect_destination = ex.request.url + resp = self.get( + "/api/method/frappe.integrations.oauth2.authorize", + { + "sid": self.sid, + "client_id": self.client_id, + "scope": self.scope, + "response_type": "code", + "redirect_uri": self.redirect_uri, + }, + follow_redirects=True, + ) # Get authorization code from redirected URL - query = parse_qs(urlparse(redirect_destination).query) + query = parse_qs(resp.request.environ["QUERY_STRING"]) auth_code = query.get("code")[0] # Request for bearer token - token_response = requests.post( - get_full_url("/api/method/frappe.integrations.oauth2.get_token"), + token_response = self.post( + "/api/method/frappe.integrations.oauth2.get_token", headers=self.form_header, - data=encode_params( - { - "grant_type": "authorization_code", - "code": auth_code, - "redirect_uri": self.redirect_uri, - "client_id": self.client_id, - } - ), + data={ + "grant_type": "authorization_code", + "code": auth_code, + "redirect_uri": self.redirect_uri, + "client_id": self.client_id, + }, ) # Parse bearer token json - bearer_token = token_response.json() + bearer_token = token_response.json # Revoke Token - revoke_token_response = requests.post( - get_full_url("/api/method/frappe.integrations.oauth2.revoke_token"), + revoke_token_response = self.post( + "/api/method/frappe.integrations.oauth2.revoke_token", headers=self.form_header, data={"token": bearer_token.get("access_token")}, ) @@ -209,7 +214,9 @@ class TestOAuth20(unittest.TestCase): self.assertTrue(revoke_token_response.status_code == 200) # Check revoked token - self.assertFalse(check_valid_openid_response(bearer_token.get("access_token"))) + self.assertFalse( + check_valid_openid_response(access_token=bearer_token.get("access_token"), client=self) + ) def test_resource_owner_password_credentials_grant(self): client = frappe.get_doc("OAuth Client", self.client_id) @@ -219,31 +226,32 @@ class TestOAuth20(unittest.TestCase): frappe.db.commit() # Request for bearer token - token_response = requests.post( - get_full_url("/api/method/frappe.integrations.oauth2.get_token"), + token_response = self.post( + "/api/method/frappe.integrations.oauth2.get_token", + data={ + "grant_type": "password", + "username": "test@example.com", + "password": "Eastern_43A1W", + "client_id": self.client_id, + "scope": self.scope, + }, headers=self.form_header, - data=encode_params( - { - "grant_type": "password", - "username": "test@example.com", - "password": "Eastern_43A1W", - "client_id": self.client_id, - "scope": self.scope, - } - ), ) # Parse bearer token json - bearer_token = token_response.json() + bearer_token = token_response.json # Check token for valid response - self.assertTrue(check_valid_openid_response(bearer_token.get("access_token"))) + self.assertTrue( + check_valid_openid_response(access_token=bearer_token.get("access_token"), client=self) + ) def test_login_using_implicit_token(self): oauth_client = frappe.get_doc("OAuth Client", self.client_id) oauth_client.grant_type = "Implicit" oauth_client.response_type = "Token" oauth_client.save() + oauth_client_before = oauth_client.get_doc_before_save() frappe.db.commit() session = requests.Session() @@ -274,41 +282,34 @@ class TestOAuth20(unittest.TestCase): self.assertTrue(response_dict.get("scope")) self.assertTrue(response_dict.get("token_type")) self.assertTrue(check_valid_openid_response(response_dict.get("access_token")[0])) + oauth_client.delete(force=True) + oauth_client_before.insert() + frappe.db.commit() def test_openid_code_id_token(self): client = update_client_for_auth_code_grant(self.client_id) - - session = requests.Session() - login(session) - - redirect_destination = None - nonce = frappe.generate_hash() # Go to Authorize url - try: - session.get( - get_full_url("/api/method/frappe.integrations.oauth2.authorize"), - params=encode_params( - { - "client_id": self.client_id, - "scope": self.scope, - "response_type": "code", - "redirect_uri": self.redirect_uri, - "nonce": nonce, - } - ), - ) - except requests.exceptions.ConnectionError as ex: - redirect_destination = ex.request.url + resp = self.get( + "/api/method/frappe.integrations.oauth2.authorize", + { + "client_id": self.client_id, + "scope": self.scope, + "response_type": "code", + "redirect_uri": self.redirect_uri, + "nonce": nonce, + }, + follow_redirects=True, + ) # Get authorization code from redirected URL - query = parse_qs(urlparse(redirect_destination).query) + query = parse_qs(resp.request.environ["QUERY_STRING"]) auth_code = query.get("code")[0] # Request for bearer token - token_response = requests.post( - get_full_url("/api/method/frappe.integrations.oauth2.get_token"), + token_response = self.post( + "/api/method/frappe.integrations.oauth2.get_token", headers=self.form_header, data=encode_params( { @@ -322,7 +323,7 @@ class TestOAuth20(unittest.TestCase): ) # Parse bearer token json - bearer_token = token_response.json() + bearer_token = token_response.json payload = self.decode_id_token(bearer_token.get("id_token")) self.assertEqual(payload["email"], "test@example.com") @@ -338,17 +339,20 @@ class TestOAuth20(unittest.TestCase): ) -def check_valid_openid_response(access_token=None): +def check_valid_openid_response(access_token=None, client: "FrappeRequestTestCase" = None): """Return True for valid response.""" # Use token in header headers = {} + URL = "/api/method/frappe.integrations.oauth2.openid_profile" + if access_token: - headers["Authorization"] = "Bearer " + access_token + headers["Authorization"] = f"Bearer {access_token}" # check openid for email test@example.com - openid_response = requests.get( - get_full_url("/api/method/frappe.integrations.oauth2.openid_profile"), headers=headers - ) + if client: + openid_response = client.get(URL, headers=headers) + else: + openid_response = requests.get(get_full_url(URL), headers=headers) return openid_response.status_code == 200