feat: OAuth 2 openid-configuration and introspect_token endpoint
This commit is contained in:
parent
bb483d59af
commit
4e8b00ba1f
2 changed files with 138 additions and 21 deletions
|
|
@ -206,6 +206,12 @@ class OAuthWebRequestValidator(RequestValidator):
|
|||
"OAuth Client", client_id, "default_redirect_uri"
|
||||
)
|
||||
|
||||
redirect_uris = frappe.db.get_value("OAuth Client", client_id, "redirect_uris")
|
||||
|
||||
if redirect_uris:
|
||||
redirect_uris = redirect_uris.split(get_url_delimiter())
|
||||
return redirect_uri in redirect_uris
|
||||
|
||||
return saved_redirect_uri == redirect_uri
|
||||
|
||||
def validate_grant_type(
|
||||
|
|
@ -352,8 +358,12 @@ class OAuthWebRequestValidator(RequestValidator):
|
|||
if request.nonce:
|
||||
id_token["nonce"] = request.nonce
|
||||
|
||||
userinfo = get_userinfo(user)
|
||||
|
||||
if userinfo.get("iss"):
|
||||
id_token["iss"] = userinfo.get("iss")
|
||||
|
||||
if "openid" in request.scopes:
|
||||
userinfo = get_userinfo(user, request)
|
||||
id_token.update(userinfo)
|
||||
|
||||
id_token_encoded = jwt.encode(
|
||||
|
|
@ -391,7 +401,7 @@ class OAuthWebRequestValidator(RequestValidator):
|
|||
|
||||
def get_userinfo_claims(self, request):
|
||||
user = frappe.get_doc("User", frappe.session.user)
|
||||
userinfo = get_userinfo(user, request)
|
||||
userinfo = get_userinfo(user)
|
||||
return userinfo
|
||||
|
||||
def validate_id_token(self, token, scopes, request):
|
||||
|
|
@ -550,8 +560,8 @@ def calculate_at_hash(access_token, hash_alg):
|
|||
then take the left-most 128 bits and base64url encode them. The at_hash value is a
|
||||
case sensitive string.
|
||||
Args:
|
||||
access_token (str): An access token string.
|
||||
hash_alg (callable): A callable returning a hash object, e.g. hashlib.sha256
|
||||
access_token (str): An access token string.
|
||||
hash_alg (callable): A callable returning a hash object, e.g. hashlib.sha256
|
||||
"""
|
||||
hash_digest = hash_alg(access_token.encode("utf-8")).digest()
|
||||
cut_at = int(len(hash_digest) / 2)
|
||||
|
|
@ -586,21 +596,15 @@ def get_client_scopes(client_id):
|
|||
return scopes_string.split()
|
||||
|
||||
|
||||
def get_userinfo(user, request):
|
||||
def get_userinfo(user):
|
||||
picture = None
|
||||
frappe_server_url = (
|
||||
frappe.db.get_value("Social Login Key", "frappe", "base_url") or None
|
||||
)
|
||||
|
||||
request_url = urlparse(request.uri)
|
||||
frappe_server_url = get_server_url()
|
||||
|
||||
if user.user_image:
|
||||
if frappe.utils.validate_url(user.user_image):
|
||||
picture = user.user_image
|
||||
elif frappe_server_url:
|
||||
picture = frappe_server_url + "/" + user.user_image
|
||||
else:
|
||||
picture = request_url.scheme + "://" + request_url.netloc + user.user_image
|
||||
picture = frappe_server_url + "/" + user.user_image
|
||||
|
||||
userinfo = frappe._dict(
|
||||
{
|
||||
|
|
@ -615,11 +619,10 @@ def get_userinfo(user, request):
|
|||
"email": user.email,
|
||||
"picture": picture,
|
||||
"roles": frappe.get_roles(user.name),
|
||||
"iss": frappe_server_url,
|
||||
}
|
||||
)
|
||||
|
||||
userinfo["iss"] = frappe_server_url or request.uri
|
||||
|
||||
return userinfo
|
||||
|
||||
|
||||
|
|
@ -640,3 +643,9 @@ def generate_json_error_response(e):
|
|||
)
|
||||
frappe.local.response["http_status_code"] = getattr(e, "status_code", 500)
|
||||
return
|
||||
|
||||
|
||||
def get_server_url():
|
||||
request_url = urlparse(frappe.request.url)
|
||||
request_url = f"{request_url.scheme}://{request_url.netloc}"
|
||||
return frappe.get_value("Social Login Key", "frappe", "base_url") or request_url
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@
|
|||
# MIT License. See license.txt
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import unittest, frappe, requests, time
|
||||
import unittest, frappe, requests, time, jwt
|
||||
from frappe.test_runner import make_test_records
|
||||
from six.moves.urllib.parse import urlparse, parse_qs, urljoin
|
||||
from urllib.parse import urlencode, quote
|
||||
|
|
@ -34,11 +34,7 @@ class TestOAuth20(unittest.TestCase):
|
|||
self.assertFalse(check_valid_openid_response())
|
||||
|
||||
def test_login_using_authorization_code(self):
|
||||
client = frappe.get_doc("OAuth Client", self.client_id)
|
||||
client.grant_type = "Authorization Code"
|
||||
client.response_type = "Code"
|
||||
client.save()
|
||||
frappe.db.commit()
|
||||
client = update_client_for_auth_code_grant(self.client_id)
|
||||
|
||||
session = requests.Session()
|
||||
login(session)
|
||||
|
|
@ -87,6 +83,54 @@ class TestOAuth20(unittest.TestCase):
|
|||
self.assertTrue(bearer_token.get("token_type") == "Bearer")
|
||||
self.assertTrue(check_valid_openid_response(bearer_token.get("access_token")))
|
||||
|
||||
def test_login_using_authorization_code_with_pkce(self):
|
||||
client = update_client_for_auth_code_grant(self.client_id)
|
||||
|
||||
session = requests.Session()
|
||||
login(session)
|
||||
|
||||
redirect_destination = None
|
||||
|
||||
# Go to Authorize url
|
||||
try:
|
||||
session.get(
|
||||
get_full_url("/api/method/frappe.integrations.oauth2.authorize"),
|
||||
params=encode_params({
|
||||
"client_id": self.client_id,
|
||||
"scope": self.scope,
|
||||
"response_type": "code",
|
||||
"redirect_uri": self.redirect_uri,
|
||||
"code_challenge_method": 'S256',
|
||||
"code_challenge": '21XaP8MJjpxCMRxgEzBP82sZ73PRLqkyBUta1R309J0' ,
|
||||
})
|
||||
)
|
||||
except requests.exceptions.ConnectionError as ex:
|
||||
redirect_destination = ex.request.url
|
||||
|
||||
# Get authorization code from redirected URL
|
||||
query = parse_qs(urlparse(redirect_destination).query)
|
||||
auth_code = query.get("code")[0]
|
||||
|
||||
# Request for bearer token
|
||||
token_response = requests.post(
|
||||
get_full_url("/api/method/frappe.integrations.oauth2.get_token"),
|
||||
headers=self.form_header,
|
||||
data=encode_params({
|
||||
"grant_type": "authorization_code",
|
||||
"code": auth_code,
|
||||
"redirect_uri": self.redirect_uri,
|
||||
"client_id": self.client_id,
|
||||
"scope": self.scope,
|
||||
"code_verifier": "420",
|
||||
})
|
||||
)
|
||||
|
||||
# Parse bearer token json
|
||||
bearer_token = token_response.json()
|
||||
|
||||
self.assertTrue(bearer_token.get("access_token"))
|
||||
self.assertTrue(bearer_token.get("id_token"))
|
||||
|
||||
def test_revoke_token(self):
|
||||
client = frappe.get_doc("OAuth Client", self.client_id)
|
||||
client.grant_type = "Authorization Code"
|
||||
|
|
@ -204,6 +248,61 @@ class TestOAuth20(unittest.TestCase):
|
|||
self.assertTrue(response_dict.get("token_type"))
|
||||
self.assertTrue(check_valid_openid_response(response_dict.get("access_token")[0]))
|
||||
|
||||
def test_openid_code_id_token(self):
|
||||
client = update_client_for_auth_code_grant(self.client_id)
|
||||
|
||||
session = requests.Session()
|
||||
login(session)
|
||||
|
||||
redirect_destination = None
|
||||
|
||||
nonce = frappe.generate_hash()
|
||||
|
||||
# Go to Authorize url
|
||||
try:
|
||||
session.get(
|
||||
get_full_url("/api/method/frappe.integrations.oauth2.authorize"),
|
||||
params=encode_params({
|
||||
"client_id": self.client_id,
|
||||
"scope": self.scope,
|
||||
"response_type": "code",
|
||||
"redirect_uri": self.redirect_uri,
|
||||
"nonce": nonce,
|
||||
})
|
||||
)
|
||||
except requests.exceptions.ConnectionError as ex:
|
||||
redirect_destination = ex.request.url
|
||||
|
||||
# Get authorization code from redirected URL
|
||||
query = parse_qs(urlparse(redirect_destination).query)
|
||||
auth_code = query.get("code")[0]
|
||||
|
||||
# Request for bearer token
|
||||
token_response = requests.post(
|
||||
get_full_url("/api/method/frappe.integrations.oauth2.get_token"),
|
||||
headers=self.form_header,
|
||||
data=encode_params({
|
||||
"grant_type": "authorization_code",
|
||||
"code": auth_code,
|
||||
"redirect_uri": self.redirect_uri,
|
||||
"client_id": self.client_id,
|
||||
"scope": self.scope,
|
||||
})
|
||||
)
|
||||
|
||||
# Parse bearer token json
|
||||
bearer_token = token_response.json()
|
||||
|
||||
id_token = bearer_token.get("id_token")
|
||||
payload = jwt.decode(
|
||||
id_token,
|
||||
audience=client.client_id,
|
||||
key=client.client_secret,
|
||||
algorithm="HS256",
|
||||
)
|
||||
|
||||
self.assertTrue(payload.get("nonce") == nonce)
|
||||
|
||||
|
||||
def check_valid_openid_response(access_token=None):
|
||||
"""Return True for valid response."""
|
||||
|
|
@ -234,3 +333,12 @@ def login(session):
|
|||
def get_full_url(endpoint):
|
||||
"""Turn '/endpoint' into 'http://127.0.0.1:8000/endpoint'."""
|
||||
return urljoin(frappe.utils.get_url(), endpoint)
|
||||
|
||||
|
||||
def update_client_for_auth_code_grant(client_id):
|
||||
client = frappe.get_doc("OAuth Client", client_id)
|
||||
client.grant_type = "Authorization Code"
|
||||
client.response_type = "Code"
|
||||
client.save()
|
||||
frappe.db.commit()
|
||||
return client
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue