refactor!: move OAuth and token auth code to auth.py

This doesn't belong in api.py
This commit is contained in:
Ankush Menat 2023-09-03 19:37:49 +05:30
parent b064d12440
commit e0f87dc4e1
3 changed files with 104 additions and 106 deletions

View file

@ -1,9 +1,5 @@
# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors
# License: MIT. See LICENSE
import base64
import binascii
from urllib.parse import urlencode, urlparse
from werkzeug.exceptions import NotFound
from werkzeug.routing import Map, Submount
from werkzeug.wrappers import Request
@ -48,105 +44,6 @@ def handle(request: Request):
return build_response("json")
def validate_auth():
"""
Authenticate and sets user for the request.
"""
authorization_header = frappe.get_request_header("Authorization", "").split(" ")
if len(authorization_header) == 2:
validate_oauth(authorization_header)
validate_auth_via_api_keys(authorization_header)
validate_auth_via_hooks()
def validate_oauth(authorization_header):
"""
Authenticate request using OAuth and set session user
Args:
authorization_header (list of str): The 'Authorization' header containing the prefix and token
"""
from frappe.integrations.oauth2 import get_oauth_server
from frappe.oauth import get_url_delimiter
form_dict = frappe.local.form_dict
token = authorization_header[1]
req = frappe.request
parsed_url = urlparse(req.url)
access_token = {"access_token": token}
uri = (
parsed_url.scheme + "://" + parsed_url.netloc + parsed_url.path + "?" + urlencode(access_token)
)
http_method = req.method
headers = req.headers
body = req.get_data()
if req.content_type and "multipart/form-data" in req.content_type:
body = None
try:
required_scopes = frappe.db.get_value("OAuth Bearer Token", token, "scopes").split(
get_url_delimiter()
)
valid, oauthlib_request = get_oauth_server().verify_request(
uri, http_method, body, headers, required_scopes
)
if valid:
frappe.set_user(frappe.db.get_value("OAuth Bearer Token", token, "user"))
frappe.local.form_dict = form_dict
except AttributeError:
pass
def validate_auth_via_api_keys(authorization_header):
"""
Authenticate request using API keys and set session user
Args:
authorization_header (list of str): The 'Authorization' header containing the prefix and token
"""
try:
auth_type, auth_token = authorization_header
authorization_source = frappe.get_request_header("Frappe-Authorization-Source")
if auth_type.lower() == "basic":
api_key, api_secret = frappe.safe_decode(base64.b64decode(auth_token)).split(":")
validate_api_key_secret(api_key, api_secret, authorization_source)
elif auth_type.lower() == "token":
api_key, api_secret = auth_token.split(":")
validate_api_key_secret(api_key, api_secret, authorization_source)
except binascii.Error:
frappe.throw(
_("Failed to decode token, please provide a valid base64-encoded token."),
frappe.InvalidAuthorizationToken,
)
except (AttributeError, TypeError, ValueError):
pass
def validate_api_key_secret(api_key, api_secret, frappe_authorization_source=None):
"""frappe_authorization_source to provide api key and secret for a doctype apart from User"""
doctype = frappe_authorization_source or "User"
doc = frappe.db.get_value(doctype=doctype, filters={"api_key": api_key}, fieldname=["name"])
form_dict = frappe.local.form_dict
doc_secret = frappe.utils.password.get_decrypted_password(doctype, doc, fieldname="api_secret")
if api_secret == doc_secret:
if doctype == "User":
user = frappe.db.get_value(doctype="User", filters={"api_key": api_key}, fieldname=["name"])
else:
user = frappe.db.get_value(doctype, doc, "user")
if frappe.local.login_manager.user in ("", "Guest"):
frappe.set_user(user)
frappe.local.form_dict = form_dict
def validate_auth_via_hooks():
for auth_hook in frappe.get_hooks("auth_hooks", []):
frappe.get_attr(auth_hook)()
# Merge all API version routing rules
from frappe.api.v1 import url_rules as v1_rules
from frappe.api.v2 import url_rules as v2_rules

View file

@ -22,7 +22,7 @@ import frappe.rate_limiter
import frappe.recorder
import frappe.utils.response
from frappe import _
from frappe.auth import SAFE_HTTP_METHODS, UNSAFE_HTTP_METHODS, HTTPRequest
from frappe.auth import SAFE_HTTP_METHODS, UNSAFE_HTTP_METHODS, HTTPRequest, validate_auth
from frappe.middlewares import StaticDataMiddleware
from frappe.utils import CallbackManager, cint, get_site_name
from frappe.utils.data import escape_html
@ -94,7 +94,7 @@ def application(request: Request):
init_request(request)
frappe.api.validate_auth()
validate_auth()
if request.method == "OPTIONS":
response = Response()

View file

@ -1,6 +1,8 @@
# Copyright (c) 2021, Frappe Technologies Pvt. Ltd. and Contributors
# MIT License. See LICENSE
from urllib.parse import quote
import base64
import binascii
from urllib.parse import quote, urlencode, urlparse
import frappe
import frappe.database
@ -547,3 +549,102 @@ class LoginAttemptTracker:
):
return False
return True
def validate_auth():
"""
Authenticate and sets user for the request.
"""
authorization_header = frappe.get_request_header("Authorization", "").split(" ")
if len(authorization_header) == 2:
validate_oauth(authorization_header)
validate_auth_via_api_keys(authorization_header)
validate_auth_via_hooks()
def validate_oauth(authorization_header):
"""
Authenticate request using OAuth and set session user
Args:
authorization_header (list of str): The 'Authorization' header containing the prefix and token
"""
from frappe.integrations.oauth2 import get_oauth_server
from frappe.oauth import get_url_delimiter
form_dict = frappe.local.form_dict
token = authorization_header[1]
req = frappe.request
parsed_url = urlparse(req.url)
access_token = {"access_token": token}
uri = (
parsed_url.scheme + "://" + parsed_url.netloc + parsed_url.path + "?" + urlencode(access_token)
)
http_method = req.method
headers = req.headers
body = req.get_data()
if req.content_type and "multipart/form-data" in req.content_type:
body = None
try:
required_scopes = frappe.db.get_value("OAuth Bearer Token", token, "scopes").split(
get_url_delimiter()
)
valid, oauthlib_request = get_oauth_server().verify_request(
uri, http_method, body, headers, required_scopes
)
if valid:
frappe.set_user(frappe.db.get_value("OAuth Bearer Token", token, "user"))
frappe.local.form_dict = form_dict
except AttributeError:
pass
def validate_auth_via_api_keys(authorization_header):
"""
Authenticate request using API keys and set session user
Args:
authorization_header (list of str): The 'Authorization' header containing the prefix and token
"""
try:
auth_type, auth_token = authorization_header
authorization_source = frappe.get_request_header("Frappe-Authorization-Source")
if auth_type.lower() == "basic":
api_key, api_secret = frappe.safe_decode(base64.b64decode(auth_token)).split(":")
validate_api_key_secret(api_key, api_secret, authorization_source)
elif auth_type.lower() == "token":
api_key, api_secret = auth_token.split(":")
validate_api_key_secret(api_key, api_secret, authorization_source)
except binascii.Error:
frappe.throw(
_("Failed to decode token, please provide a valid base64-encoded token."),
frappe.InvalidAuthorizationToken,
)
except (AttributeError, TypeError, ValueError):
pass
def validate_api_key_secret(api_key, api_secret, frappe_authorization_source=None):
"""frappe_authorization_source to provide api key and secret for a doctype apart from User"""
doctype = frappe_authorization_source or "User"
doc = frappe.db.get_value(doctype=doctype, filters={"api_key": api_key}, fieldname=["name"])
form_dict = frappe.local.form_dict
doc_secret = frappe.utils.password.get_decrypted_password(doctype, doc, fieldname="api_secret")
if api_secret == doc_secret:
if doctype == "User":
user = frappe.db.get_value(doctype="User", filters={"api_key": api_key}, fieldname=["name"])
else:
user = frappe.db.get_value(doctype, doc, "user")
if frappe.local.login_manager.user in ("", "Guest"):
frappe.set_user(user)
frappe.local.form_dict = form_dict
def validate_auth_via_hooks():
for auth_hook in frappe.get_hooks("auth_hooks", []):
frappe.get_attr(auth_hook)()