Merge pull request #16748 from gavindsouza/refactor-oauth-tests

refactor: Oauth20 tests
This commit is contained in:
gavin 2022-08-05 16:09:27 +05:30 committed by GitHub
commit 5385276547
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 222 additions and 190 deletions

View file

@ -35,7 +35,11 @@ def make_access_log(
@frappe.write_only()
@retry(stop=stop_after_attempt(3), retry=retry_if_exception_type(frappe.DuplicateEntryError))
@retry(
stop=stop_after_attempt(3),
retry=retry_if_exception_type(frappe.DuplicateEntryError),
reraise=True,
)
def _make_access_log(
doctype=None,
document=None,

View file

@ -117,7 +117,7 @@
}
],
"links": [],
"modified": "2022-08-03 12:20:52.062755",
"modified": "2022-08-03 12:21:52.062755",
"modified_by": "Administrator",
"module": "Integrations",
"name": "OAuth Client",

View file

@ -1,15 +0,0 @@
[
{
"app_name": "_Test OAuth Client",
"client_secret": "test_client_secret",
"default_redirect_uri": "http://localhost",
"docstatus": 0,
"doctype": "OAuth Client",
"grant_type": "Authorization Code",
"name": "test_client_id",
"redirect_uris": "http://localhost",
"response_type": "Code",
"scopes": "all openid",
"skip_authorization": 1
}
]

View file

@ -419,7 +419,7 @@ class Document(BaseDocument):
df.options, {"parent": self.name, "parenttype": self.doctype, "parentfield": fieldname}
)
def get_doc_before_save(self):
def get_doc_before_save(self) -> "Document":
return getattr(self, "_doc_before_save", None)
def has_value_changed(self, fieldname):
@ -1025,10 +1025,14 @@ class Document(BaseDocument):
"""Rename the document to `name`. This transforms the current object."""
return self._rename(name=name, merge=merge, force=force, validate_rename=validate_rename)
def delete(self, ignore_permissions=False):
def delete(self, ignore_permissions=False, force=False):
"""Delete document."""
return frappe.delete_doc(
self.doctype, self.name, ignore_permissions=ignore_permissions, flags=self.flags
self.doctype,
self.name,
ignore_permissions=ignore_permissions,
flags=self.flags,
force=force,
)
def run_before_save_methods(self):

View file

@ -32,9 +32,12 @@ def suppress_stdout():
def make_request(
target: str, args: tuple | None = None, kwargs: dict | None = None
target: str,
args: tuple | None = None,
kwargs: dict | None = None,
site: str = None,
) -> TestResponse:
t = ThreadWithReturnValue(target=target, args=args, kwargs=kwargs)
t = ThreadWithReturnValue(target=target, args=args, kwargs=kwargs, site=site)
t.start()
t.join()
return t._return
@ -46,13 +49,14 @@ def patch_request_header(key, *args, **kwargs):
class ThreadWithReturnValue(Thread):
def __init__(self, group=None, target=None, name=None, args=(), kwargs={}):
def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, *, site=None):
Thread.__init__(self, group, target, name, args, kwargs)
self._return = None
self.site = site or _site
def run(self):
if self._target is not None:
with patch("frappe.app.get_site_name", return_value=_site):
with patch("frappe.app.get_site_name", return_value=self.site):
header_patch = patch("frappe.get_request_header", new=patch_request_header)
if authorization_token:
header_patch.start()

View file

@ -1,88 +1,140 @@
# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors
# Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and Contributors
# License: MIT. See LICENSE
import unittest
from typing import TYPE_CHECKING
from urllib.parse import parse_qs, urljoin, urlparse
import jwt
import requests
from werkzeug.test import TestResponse
import frappe
from frappe.integrations.oauth2 import encode_params
from frappe.test_runner import make_test_records
from frappe.tests.test_api import get_test_client, make_request, suppress_stdout
if TYPE_CHECKING:
from frappe.integrations.doctype.social_login_key.social_login_key import SocialLoginKey
class TestOAuth20(unittest.TestCase):
def setUp(self):
make_test_records("OAuth Client")
class FrappeRequestTestCase(unittest.TestCase):
@property
def sid(self) -> str:
if not getattr(self, "_sid", None):
from frappe.auth import CookieManager, LoginManager
from frappe.utils import set_request
set_request(path="/")
frappe.local.cookie_manager = CookieManager()
frappe.local.login_manager = LoginManager()
frappe.local.login_manager.login_as("test@example.com")
self._sid = frappe.session.sid
return self._sid
def get(self, path: str, params: dict | None = None, **kwargs) -> TestResponse:
return make_request(
target=self.TEST_CLIENT.get, args=(path,), kwargs={"data": params, **kwargs}, site=self.site
)
def post(self, path, data, **kwargs) -> TestResponse:
return make_request(
target=self.TEST_CLIENT.post, args=(path,), kwargs={"data": data, **kwargs}, site=self.site
)
def put(self, path, data, **kwargs) -> TestResponse:
return make_request(
target=self.TEST_CLIENT.put, args=(path,), kwargs={"data": data, **kwargs}, site=self.site
)
def delete(self, path, **kwargs) -> TestResponse:
return make_request(target=self.TEST_CLIENT.delete, args=(path,), kwargs=kwargs, site=self.site)
class TestOAuth20(FrappeRequestTestCase):
site = frappe.local.site
@classmethod
def setUpClass(cls):
make_test_records("User")
client = frappe.get_all("OAuth Client", fields=["*"])[0]
self.client_id = client.get("client_id")
self.client_secret = client.get("client_secret")
self.form_header = {"content-type": "application/x-www-form-urlencoded"}
self.scope = "all openid"
self.redirect_uri = "http://localhost"
cls.form_header = {"content-type": "application/x-www-form-urlencoded"}
cls.scope = "all openid"
cls.redirect_uri = "http://localhost"
# Set Frappe server URL reqired for id_token generation
try:
frappe_login_key = frappe.get_doc("Social Login Key", "frappe")
except frappe.DoesNotExistError:
frappe_login_key = frappe.new_doc("Social Login Key")
frappe_login_key: "SocialLoginKey" = frappe.new_doc("Social Login Key")
frappe_login_key.get_social_login_provider("Frappe", initialize=True)
frappe_login_key.base_url = frappe.utils.get_url()
frappe_login_key.enable_social_login = 0
frappe_login_key.save()
frappe_login_key.insert(ignore_if_duplicate=True)
frappe.db.commit()
def setUp(self):
self.TEST_CLIENT = get_test_client()
self.oauth_client = frappe.new_doc("OAuth Client")
self.oauth_client.update(
{
"app_name": "_Test OAuth Client",
"client_secret": "test_client_secret",
"default_redirect_uri": "http://localhost",
"docstatus": 0,
"doctype": "OAuth Client",
"grant_type": "Authorization Code",
"name": "test_client_id",
"redirect_uris": "http://localhost",
"response_type": "Code",
"scopes": "all openid",
"skip_authorization": 1,
}
)
self.oauth_client.insert()
self.client_id = self.oauth_client.get("client_id")
self.client_secret = self.oauth_client.get("client_secret")
def tearDown(self):
self.oauth_client.delete(force=True)
frappe.db.rollback()
def test_invalid_login(self):
self.assertFalse(check_valid_openid_response())
with suppress_stdout():
self.assertFalse(check_valid_openid_response(client=self))
def test_login_using_authorization_code(self):
update_client_for_auth_code_grant(self.client_id)
session = requests.Session()
login(session)
redirect_destination = None
# Go to Authorize url
try:
session.get(
get_full_url("/api/method/frappe.integrations.oauth2.authorize"),
params=encode_params(
{
"client_id": self.client_id,
"scope": self.scope,
"response_type": "code",
"redirect_uri": self.redirect_uri,
}
),
)
except requests.exceptions.ConnectionError as ex:
redirect_destination = ex.request.url
# Get authorization code from redirected URL
query = parse_qs(urlparse(redirect_destination).query)
self.TEST_CLIENT.set_cookie(self.site, key="sid", value=self.sid)
resp = self.get(
"/api/method/frappe.integrations.oauth2.authorize",
{
"client_id": self.client_id,
"scope": self.scope,
"response_type": "code",
"redirect_uri": self.redirect_uri,
},
follow_redirects=True,
)
query = parse_qs(resp.request.environ["QUERY_STRING"])
auth_code = query.get("code")[0]
# Request for bearer token
token_response = requests.post(
get_full_url("/api/method/frappe.integrations.oauth2.get_token"),
token_response = self.post(
"/api/method/frappe.integrations.oauth2.get_token",
headers=self.form_header,
data=encode_params(
{
"grant_type": "authorization_code",
"code": auth_code,
"redirect_uri": self.redirect_uri,
"client_id": self.client_id,
"scope": self.scope,
}
),
data={
"grant_type": "authorization_code",
"code": auth_code,
"redirect_uri": self.redirect_uri,
"client_id": self.client_id,
"scope": self.scope,
},
)
# Parse bearer token json
bearer_token = token_response.json()
bearer_token = token_response.json
self.assertTrue(bearer_token.get("access_token"))
self.assertTrue(bearer_token.get("expires_in"))
@ -90,7 +142,9 @@ class TestOAuth20(unittest.TestCase):
self.assertTrue(bearer_token.get("refresh_token"))
self.assertTrue(bearer_token.get("scope"))
self.assertTrue(bearer_token.get("token_type") == "Bearer")
self.assertTrue(check_valid_openid_response(bearer_token.get("access_token")))
self.assertTrue(
check_valid_openid_response(access_token=bearer_token.get("access_token"), client=self)
)
decoded_token = self.decode_id_token(bearer_token.get("id_token"))
self.assertEqual(decoded_token["email"], "test@example.com")
@ -98,51 +152,41 @@ class TestOAuth20(unittest.TestCase):
def test_login_using_authorization_code_with_pkce(self):
update_client_for_auth_code_grant(self.client_id)
session = requests.Session()
login(session)
redirect_destination = None
# Go to Authorize url
try:
session.get(
get_full_url("/api/method/frappe.integrations.oauth2.authorize"),
params=encode_params(
{
"client_id": self.client_id,
"scope": self.scope,
"response_type": "code",
"redirect_uri": self.redirect_uri,
"code_challenge_method": "S256",
"code_challenge": "21XaP8MJjpxCMRxgEzBP82sZ73PRLqkyBUta1R309J0",
}
),
)
except requests.exceptions.ConnectionError as ex:
redirect_destination = ex.request.url
self.TEST_CLIENT.set_cookie(self.site, key="sid", value=self.sid)
resp = self.get(
"/api/method/frappe.integrations.oauth2.authorize",
{
"client_id": self.client_id,
"scope": self.scope,
"response_type": "code",
"redirect_uri": self.redirect_uri,
"code_challenge_method": "S256",
"code_challenge": "21XaP8MJjpxCMRxgEzBP82sZ73PRLqkyBUta1R309J0",
},
follow_redirects=True,
)
# Get authorization code from redirected URL
query = parse_qs(urlparse(redirect_destination).query)
query = parse_qs(resp.request.environ["QUERY_STRING"])
auth_code = query.get("code")[0]
# Request for bearer token
token_response = requests.post(
get_full_url("/api/method/frappe.integrations.oauth2.get_token"),
token_response = self.post(
"/api/method/frappe.integrations.oauth2.get_token",
headers=self.form_header,
data=encode_params(
{
"grant_type": "authorization_code",
"code": auth_code,
"redirect_uri": self.redirect_uri,
"client_id": self.client_id,
"scope": self.scope,
"code_verifier": "420",
}
),
data={
"grant_type": "authorization_code",
"code": auth_code,
"redirect_uri": self.redirect_uri,
"client_id": self.client_id,
"scope": self.scope,
"code_verifier": "420",
},
)
# Parse bearer token json
bearer_token = token_response.json()
bearer_token = token_response.json
self.assertTrue(bearer_token.get("access_token"))
self.assertTrue(bearer_token.get("id_token"))
@ -157,51 +201,41 @@ class TestOAuth20(unittest.TestCase):
client.save()
frappe.db.commit()
session = requests.Session()
login(session)
redirect_destination = None
# Go to Authorize url
try:
session.get(
get_full_url("/api/method/frappe.integrations.oauth2.authorize"),
params=encode_params(
{
"client_id": self.client_id,
"scope": self.scope,
"response_type": "code",
"redirect_uri": self.redirect_uri,
}
),
)
except requests.exceptions.ConnectionError as ex:
redirect_destination = ex.request.url
self.TEST_CLIENT.set_cookie(self.site, key="sid", value=self.sid)
resp = self.get(
"/api/method/frappe.integrations.oauth2.authorize",
{
"client_id": self.client_id,
"scope": self.scope,
"response_type": "code",
"redirect_uri": self.redirect_uri,
},
follow_redirects=True,
)
# Get authorization code from redirected URL
query = parse_qs(urlparse(redirect_destination).query)
query = parse_qs(resp.request.environ["QUERY_STRING"])
auth_code = query.get("code")[0]
# Request for bearer token
token_response = requests.post(
get_full_url("/api/method/frappe.integrations.oauth2.get_token"),
token_response = self.post(
"/api/method/frappe.integrations.oauth2.get_token",
headers=self.form_header,
data=encode_params(
{
"grant_type": "authorization_code",
"code": auth_code,
"redirect_uri": self.redirect_uri,
"client_id": self.client_id,
}
),
data={
"grant_type": "authorization_code",
"code": auth_code,
"redirect_uri": self.redirect_uri,
"client_id": self.client_id,
},
)
# Parse bearer token json
bearer_token = token_response.json()
bearer_token = token_response.json
# Revoke Token
revoke_token_response = requests.post(
get_full_url("/api/method/frappe.integrations.oauth2.revoke_token"),
revoke_token_response = self.post(
"/api/method/frappe.integrations.oauth2.revoke_token",
headers=self.form_header,
data={"token": bearer_token.get("access_token")},
)
@ -209,7 +243,9 @@ class TestOAuth20(unittest.TestCase):
self.assertTrue(revoke_token_response.status_code == 200)
# Check revoked token
self.assertFalse(check_valid_openid_response(bearer_token.get("access_token")))
self.assertFalse(
check_valid_openid_response(access_token=bearer_token.get("access_token"), client=self)
)
def test_resource_owner_password_credentials_grant(self):
client = frappe.get_doc("OAuth Client", self.client_id)
@ -219,31 +255,32 @@ class TestOAuth20(unittest.TestCase):
frappe.db.commit()
# Request for bearer token
token_response = requests.post(
get_full_url("/api/method/frappe.integrations.oauth2.get_token"),
token_response = self.post(
"/api/method/frappe.integrations.oauth2.get_token",
data={
"grant_type": "password",
"username": "test@example.com",
"password": "Eastern_43A1W",
"client_id": self.client_id,
"scope": self.scope,
},
headers=self.form_header,
data=encode_params(
{
"grant_type": "password",
"username": "test@example.com",
"password": "Eastern_43A1W",
"client_id": self.client_id,
"scope": self.scope,
}
),
)
# Parse bearer token json
bearer_token = token_response.json()
bearer_token = token_response.json
# Check token for valid response
self.assertTrue(check_valid_openid_response(bearer_token.get("access_token")))
self.assertTrue(
check_valid_openid_response(access_token=bearer_token.get("access_token"), client=self)
)
def test_login_using_implicit_token(self):
oauth_client = frappe.get_doc("OAuth Client", self.client_id)
oauth_client.grant_type = "Implicit"
oauth_client.response_type = "Token"
oauth_client.save()
oauth_client_before = oauth_client.get_doc_before_save()
frappe.db.commit()
session = requests.Session()
@ -274,41 +311,35 @@ class TestOAuth20(unittest.TestCase):
self.assertTrue(response_dict.get("scope"))
self.assertTrue(response_dict.get("token_type"))
self.assertTrue(check_valid_openid_response(response_dict.get("access_token")[0]))
oauth_client.delete(force=True)
oauth_client_before.insert()
frappe.db.commit()
def test_openid_code_id_token(self):
client = update_client_for_auth_code_grant(self.client_id)
session = requests.Session()
login(session)
redirect_destination = None
nonce = frappe.generate_hash()
# Go to Authorize url
try:
session.get(
get_full_url("/api/method/frappe.integrations.oauth2.authorize"),
params=encode_params(
{
"client_id": self.client_id,
"scope": self.scope,
"response_type": "code",
"redirect_uri": self.redirect_uri,
"nonce": nonce,
}
),
)
except requests.exceptions.ConnectionError as ex:
redirect_destination = ex.request.url
self.TEST_CLIENT.set_cookie(self.site, key="sid", value=self.sid)
resp = self.get(
"/api/method/frappe.integrations.oauth2.authorize",
{
"client_id": self.client_id,
"scope": self.scope,
"response_type": "code",
"redirect_uri": self.redirect_uri,
"nonce": nonce,
},
follow_redirects=True,
)
# Get authorization code from redirected URL
query = parse_qs(urlparse(redirect_destination).query)
query = parse_qs(resp.request.environ["QUERY_STRING"])
auth_code = query.get("code")[0]
# Request for bearer token
token_response = requests.post(
get_full_url("/api/method/frappe.integrations.oauth2.get_token"),
token_response = self.post(
"/api/method/frappe.integrations.oauth2.get_token",
headers=self.form_header,
data=encode_params(
{
@ -322,7 +353,7 @@ class TestOAuth20(unittest.TestCase):
)
# Parse bearer token json
bearer_token = token_response.json()
bearer_token = token_response.json
payload = self.decode_id_token(bearer_token.get("id_token"))
self.assertEqual(payload["email"], "test@example.com")
@ -338,17 +369,20 @@ class TestOAuth20(unittest.TestCase):
)
def check_valid_openid_response(access_token=None):
def check_valid_openid_response(access_token=None, client: "FrappeRequestTestCase" = None):
"""Return True for valid response."""
# Use token in header
headers = {}
URL = "/api/method/frappe.integrations.oauth2.openid_profile"
if access_token:
headers["Authorization"] = "Bearer " + access_token
headers["Authorization"] = f"Bearer {access_token}"
# check openid for email test@example.com
openid_response = requests.get(
get_full_url("/api/method/frappe.integrations.oauth2.openid_profile"), headers=headers
)
if client:
openid_response = client.get(URL, headers=headers)
else:
openid_response = requests.get(get_full_url(URL), headers=headers)
return openid_response.status_code == 200

View file

@ -525,11 +525,11 @@ def touch_file(path):
return path
def get_test_client() -> Client:
def get_test_client(use_cookies=True) -> Client:
"""Returns an test instance of the Frappe WSGI"""
from frappe.app import application
return Client(application)
return Client(application, use_cookies=use_cookies)
def get_hook_method(hook_name, fallback=None):

View file

@ -295,6 +295,7 @@ def validate_queue(queue, default_queue_list=None):
retry=retry_if_exception_type(BusyLoadingError) | retry_if_exception_type(ConnectionError),
stop=stop_after_attempt(10),
wait=wait_fixed(1),
reraise=True,
)
def get_redis_conn(username=None, password=None):
if not hasattr(frappe.local, "conf"):