From 9fb635828fe900d8e0eef7fb6ae6e4735586bc03 Mon Sep 17 00:00:00 2001 From: barredterra <14891507+barredterra@users.noreply.github.com> Date: Fri, 20 Nov 2020 17:44:35 +0100 Subject: [PATCH] refactor: oauth2 --- frappe/integrations/oauth2.py | 120 +++++++++++++++++----------------- 1 file changed, 60 insertions(+), 60 deletions(-) diff --git a/frappe/integrations/oauth2.py b/frappe/integrations/oauth2.py index c8dfc52c95..59d0278d79 100644 --- a/frappe/integrations/oauth2.py +++ b/frappe/integrations/oauth2.py @@ -1,41 +1,50 @@ from __future__ import unicode_literals -import frappe, json -from frappe.oauth import OAuthWebRequestValidator, WebApplicationServer + +import hashlib +import json +import jwt +from werkzeug.urls import url_fix from oauthlib.oauth2 import FatalClientError, OAuth2Error -from werkzeug import url_fix from six.moves.urllib.parse import quote, urlencode, urlparse -from frappe.integrations.doctype.oauth_provider_settings.oauth_provider_settings import get_oauth_settings + +import frappe from frappe import _ +from frappe.oauth import OAuthWebRequestValidator, WebApplicationServer +from frappe.integrations.doctype.oauth_provider_settings.oauth_provider_settings import get_oauth_settings def get_oauth_server(): if not getattr(frappe.local, 'oauth_server', None): oauth_validator = OAuthWebRequestValidator() - frappe.local.oauth_server = WebApplicationServer(oauth_validator) + frappe.local.oauth_server = WebApplicationServer(oauth_validator) return frappe.local.oauth_server -def get_urlparams_from_kwargs(param_kwargs): +def sanitize_kwargs(param_kwargs): arguments = param_kwargs - if arguments.get("data"): - arguments.pop("data") - if arguments.get("cmd"): - arguments.pop("cmd") + arguments.pop('data', None) + arguments.pop('cmd', None) - return urlencode(arguments) + return arguments @frappe.whitelist() def approve(*args, **kwargs): r = frappe.request - uri = url_fix(r.url.replace("+"," ")) - http_method = r.method - body = r.get_data() - headers = r.headers try: - scopes, frappe.flags.oauth_credentials = get_oauth_server().validate_authorization_request(uri, http_method, body, headers) + scopes, frappe.flags.oauth_credentials = get_oauth_server().validate_authorization_request( + r.url, + r.method, + r.get_data(), + r.headers + ) - headers, body, status = get_oauth_server().create_authorization_response(uri=frappe.flags.oauth_credentials['redirect_uri'], \ - body=body, headers=headers, scopes=scopes, credentials=frappe.flags.oauth_credentials) + headers, body, status = get_oauth_server().create_authorization_response( + uri=frappe.flags.oauth_credentials['redirect_uri'], + body=r.get_data(), + headers=r.headers, + scopes=scopes, + credentials=frappe.flags.oauth_credentials + ) uri = headers.get('Location', None) frappe.local.response["type"] = "redirect" @@ -47,34 +56,28 @@ def approve(*args, **kwargs): return e @frappe.whitelist(allow_guest=True) -def authorize(*args, **kwargs): - #Fetch provider URL from settings - oauth_settings = get_oauth_settings() - params = get_urlparams_from_kwargs(kwargs) - request_url = urlparse(frappe.request.url) - success_url = request_url.scheme + "://" + request_url.netloc + "/api/method/frappe.integrations.oauth2.approve?" + params +def authorize(**kwargs): + success_url = "/api/method/frappe.integrations.oauth2.approve?" + urlencode(sanitize_kwargs(kwargs)) failure_url = frappe.form_dict["redirect_uri"] + "?error=access_denied" - if frappe.session['user']=='Guest': + if frappe.session.user == 'Guest': #Force login, redirect to preauth again. frappe.local.response["type"] = "redirect" - frappe.local.response["location"] = "/login?redirect-to=/api/method/frappe.integrations.oauth2.authorize?" + quote(params.replace("+"," ")) - - elif frappe.session['user']!='Guest': + frappe.local.response["location"] = "/login?" + urlencode({'redirect-to': frappe.request.url}) + else: try: r = frappe.request - uri = url_fix(r.url) - http_method = r.method - body = r.get_data() - headers = r.headers - - scopes, frappe.flags.oauth_credentials = get_oauth_server().validate_authorization_request(uri, http_method, body, headers) + scopes, frappe.flags.oauth_credentials = get_oauth_server().validate_authorization_request( + r.url, + r.method, + r.get_data(), + r.headers + ) skip_auth = frappe.db.get_value("OAuth Client", frappe.flags.oauth_credentials['client_id'], "skip_authorization") unrevoked_tokens = frappe.get_all("OAuth Bearer Token", filters={"status":"Active"}) - if skip_auth or (oauth_settings["skip_authorization"] == "Auto" and len(unrevoked_tokens)): - + if skip_auth or (get_oauth_settings().skip_authorization == "Auto" and unrevoked_tokens): frappe.local.response["type"] = "redirect" frappe.local.response["location"] = success_url else: @@ -87,7 +90,6 @@ def authorize(*args, **kwargs): }) resp_html = frappe.render_template("templates/includes/oauth_confirmation.html", response_html_params) frappe.respond_as_web_page("Confirm Access", resp_html) - except FatalClientError as e: return e except OAuth2Error as e: @@ -95,20 +97,20 @@ def authorize(*args, **kwargs): @frappe.whitelist(allow_guest=True) def get_token(*args, **kwargs): - r = frappe.request - - uri = url_fix(r.url) - http_method = r.method - body = r.form - headers = r.headers - #Check whether frappe server URL is set frappe_server_url = frappe.db.get_value("Social Login Key", "frappe", "base_url") or None if not frappe_server_url: frappe.throw(_("Please set Base URL in Social Login Key for Frappe")) try: - headers, body, status = get_oauth_server().create_token_response(uri, http_method, body, headers, frappe.flags.oauth_credentials) + r = frappe.request + headers, body, status = get_oauth_server().create_token_response( + r.url, + r.method, + r.form, + r.headers, + frappe.flags.oauth_credentials + ) out = frappe._dict(json.loads(body)) if not out.error and "openid" in out.scope: token_user = frappe.db.get_value("OAuth Bearer Token", out.access_token, "user") @@ -116,7 +118,7 @@ def get_token(*args, **kwargs): client_secret = frappe.db.get_value("OAuth Client", token_client, "client_secret") if token_user in ["Guest", "Administrator"]: frappe.throw(_("Logged in as Guest or Administrator")) - import hashlib + id_token_header = { "typ":"jwt", "alg":"HS256" @@ -128,9 +130,10 @@ def get_token(*args, **kwargs): "iss": frappe_server_url, "at_hash": frappe.oauth.calculate_at_hash(out.access_token, hashlib.sha256) } - import jwt + id_token_encoded = jwt.encode(id_token, client_secret, algorithm='HS256', headers=id_token_header) - out.update({"id_token":str(id_token_encoded)}) + out.update({"id_token": str(id_token_encoded)}) + frappe.local.response = out except FatalClientError as e: @@ -140,12 +143,12 @@ def get_token(*args, **kwargs): @frappe.whitelist(allow_guest=True) def revoke_token(*args, **kwargs): r = frappe.request - uri = url_fix(r.url) - http_method = r.method - body = r.form - headers = r.headers - - headers, body, status = get_oauth_server().create_revocation_response(uri, headers=headers, body=body, http_method=http_method) + headers, body, status = get_oauth_server().create_revocation_response( + r.url, + headers=r.headers, + body=r.form, + http_method=r.method + ) frappe.local.response['http_status_code'] = status if status == 200: @@ -174,15 +177,12 @@ def openid_profile(*args, **kwargs): "email": name, "picture": picture }) - + frappe.local.response = user_profile def validate_url(url_string): try: result = urlparse(url_string) - if result.scheme and result.scheme in ["http", "https", "ftp", "ftps"]: - return True - else: - return False + return result.scheme and result.scheme in ["http", "https", "ftp", "ftps"] except: - return False \ No newline at end of file + return False