272 lines
8.2 KiB
Python
272 lines
8.2 KiB
Python
# Copyright (c) 2019, Frappe Technologies and contributors
|
|
# License: MIT. See LICENSE
|
|
|
|
import datetime
|
|
import json
|
|
from typing import Any, cast
|
|
from urllib.parse import parse_qs
|
|
|
|
from pydantic import BaseModel, HttpUrl
|
|
|
|
import frappe
|
|
from frappe.integrations.doctype.oauth_client.oauth_client import OAuthClient
|
|
from frappe.utils import get_request_session
|
|
|
|
|
|
class OAuth2DynamicClientMetadata(BaseModel):
|
|
"""
|
|
OAuth 2.0 Dynamic Client Registration Metadata.
|
|
|
|
As defined in RFC7591 - OAuth 2.0 Dynamic Client Registration Protocol
|
|
https://datatracker.ietf.org/doc/html/rfc7591#section-2
|
|
"""
|
|
|
|
# Used to identify the client to the authorization server
|
|
redirect_uris: list[HttpUrl]
|
|
token_endpoint_auth_method: str | None = "client_secret_basic"
|
|
grant_types: list[str] | None = ["authorization_code"]
|
|
response_types: list[str] | None = ["code"]
|
|
|
|
# Client identifiers shown to user
|
|
client_name: str
|
|
scope: str | None = None
|
|
client_uri: HttpUrl | None = None
|
|
logo_uri: HttpUrl | None = None
|
|
|
|
# Client contact and other information for the client
|
|
contacts: list[str] | None = None
|
|
tos_uri: HttpUrl | None = None
|
|
policy_uri: HttpUrl | None = None
|
|
software_id: str | None = None
|
|
software_version: str | None = None
|
|
|
|
# JSON Web Key Set (JWKS) not used here
|
|
jwks_uri: HttpUrl | None = None
|
|
jwks: dict | None = None
|
|
|
|
|
|
def make_request(method: str, url: str, auth=None, headers=None, data=None, json=None, params=None):
|
|
auth = auth or ""
|
|
data = data or {}
|
|
headers = headers or {}
|
|
|
|
try:
|
|
s = get_request_session()
|
|
response = frappe.flags.integration_request = s.request(
|
|
method, url, data=data, auth=auth, headers=headers, json=json, params=params
|
|
)
|
|
response.raise_for_status()
|
|
|
|
# Check whether the response has a content-type, before trying to check what it is
|
|
if content_type := response.headers.get("content-type"):
|
|
if content_type == "text/plain; charset=utf-8":
|
|
return parse_qs(response.text)
|
|
elif content_type.startswith("application/") and content_type.split(";")[0].endswith("json"):
|
|
return response.json()
|
|
elif response.text:
|
|
return response.text
|
|
return
|
|
except Exception as exc:
|
|
if frappe.flags.integration_request_doc:
|
|
frappe.flags.integration_request_doc.log_error()
|
|
else:
|
|
frappe.log_error()
|
|
raise exc
|
|
|
|
|
|
def make_get_request(url: str, **kwargs):
|
|
"""Make a 'GET' HTTP request to the given `url` and return processed response.
|
|
|
|
You can optionally pass the below parameters:
|
|
|
|
* `headers`: Headers to be set in the request.
|
|
* `params`: Query parameters to be passed in the request.
|
|
* `auth`: Auth credentials.
|
|
"""
|
|
return make_request("GET", url, **kwargs)
|
|
|
|
|
|
def make_post_request(url: str, **kwargs):
|
|
"""Make a 'POST' HTTP request to the given `url` and return processed response.
|
|
|
|
You can optionally pass the below parameters:
|
|
|
|
* `headers`: Headers to be set in the request.
|
|
* `data`: Data to be passed in body of the request.
|
|
* `json`: JSON to be passed in the request.
|
|
* `params`: Query parameters to be passed in the request.
|
|
* `auth`: Auth credentials.
|
|
"""
|
|
return make_request("POST", url, **kwargs)
|
|
|
|
|
|
def make_put_request(url: str, **kwargs):
|
|
"""Make a 'PUT' HTTP request to the given `url` and return processed response.
|
|
|
|
You can optionally pass the below parameters:
|
|
|
|
* `headers`: Headers to be set in the request.
|
|
* `data`: Data to be passed in body of the request.
|
|
* `json`: JSON to be passed in the request.
|
|
* `params`: Query parameters to be passed in the request.
|
|
* `auth`: Auth credentials.
|
|
"""
|
|
return make_request("PUT", url, **kwargs)
|
|
|
|
|
|
def make_patch_request(url: str, **kwargs):
|
|
"""Make a 'PATCH' HTTP request to the given `url` and return processed response.
|
|
|
|
You can optionally pass the below parameters:
|
|
|
|
* `headers`: Headers to be set in the request.
|
|
* `data`: Data to be passed in body of the request.
|
|
* `json`: JSON to be passed in the request.
|
|
* `params`: Query parameters to be passed in the request.
|
|
* `auth`: Auth credentials.
|
|
"""
|
|
return make_request("PATCH", url, **kwargs)
|
|
|
|
|
|
def make_delete_request(url: str, **kwargs):
|
|
"""Make a 'DELETE' HTTP request to the given `url` and return processed response.
|
|
|
|
You can optionally pass the below parameters:
|
|
|
|
* `headers`: Headers to be set in the request.
|
|
* `data`: Data to be passed in body of the request.
|
|
* `json`: JSON to be passed in the request.
|
|
* `params`: Query parameters to be passed in the request.
|
|
* `auth`: Auth credentials.
|
|
"""
|
|
return make_request("DELETE", url, **kwargs)
|
|
|
|
|
|
def create_request_log(
|
|
data,
|
|
integration_type=None,
|
|
service_name=None,
|
|
name=None,
|
|
error=None,
|
|
request_headers=None,
|
|
output=None,
|
|
**kwargs,
|
|
):
|
|
"""
|
|
DEPRECATED: The parameter integration_type will be removed in the next major release.
|
|
Use is_remote_request instead.
|
|
"""
|
|
if integration_type == "Remote":
|
|
kwargs["is_remote_request"] = 1
|
|
|
|
elif integration_type == "Subscription Notification":
|
|
kwargs["request_description"] = integration_type
|
|
|
|
reference_doctype = reference_docname = None
|
|
if "reference_doctype" not in kwargs:
|
|
if isinstance(data, str):
|
|
data = json.loads(data)
|
|
|
|
reference_doctype = data.get("reference_doctype")
|
|
reference_docname = data.get("reference_docname")
|
|
|
|
integration_request = frappe.get_doc(
|
|
{
|
|
"doctype": "Integration Request",
|
|
"integration_request_service": service_name,
|
|
"request_headers": get_json(request_headers),
|
|
"data": get_json(data),
|
|
"output": get_json(output),
|
|
"error": get_json(error),
|
|
"reference_doctype": reference_doctype,
|
|
"reference_docname": reference_docname,
|
|
**kwargs,
|
|
}
|
|
)
|
|
|
|
if name:
|
|
integration_request.flags._name = name
|
|
|
|
integration_request.insert(ignore_permissions=True)
|
|
frappe.db.commit()
|
|
|
|
return integration_request
|
|
|
|
|
|
def get_json(obj):
|
|
return obj if isinstance(obj, str) else frappe.as_json(obj, indent=1)
|
|
|
|
|
|
def json_handler(obj):
|
|
if isinstance(obj, datetime.date | datetime.timedelta | datetime.datetime):
|
|
return str(obj)
|
|
|
|
|
|
def validate_dynamic_client_metadata(client: OAuth2DynamicClientMetadata):
|
|
invalidation_reasons = []
|
|
if len(client.redirect_uris) == 0:
|
|
invalidation_reasons.append("redirect_uris is required")
|
|
|
|
if client.grant_types and not set(client.grant_types).issubset({"authorization_code", "refresh_token"}):
|
|
invalidation_reasons.append("only 'authorization_code' and 'refresh_token' grant types are supported")
|
|
|
|
if client.response_types and not all(rt == "code" for rt in client.response_types):
|
|
invalidation_reasons.append("only 'code' response_type is supported")
|
|
|
|
if not frappe.conf.developer_mode and any(c.scheme != "https" for c in client.redirect_uris):
|
|
invalidation_reasons.append("redirect_uris must be https")
|
|
|
|
if invalidation_reasons:
|
|
return ",\n".join(invalidation_reasons)
|
|
|
|
return None
|
|
|
|
|
|
def create_new_oauth_client(client: OAuth2DynamicClientMetadata):
|
|
doc = cast(OAuthClient, frappe.get_doc({"doctype": "OAuth Client"}))
|
|
redirect_uris = [str(uri) for uri in client.redirect_uris]
|
|
|
|
doc.app_name = client.client_name
|
|
doc.scopes = client.scope or "all"
|
|
doc.redirect_uris = "\n".join(redirect_uris)
|
|
doc.default_redirect_uri = redirect_uris[0]
|
|
doc.response_type = "Code"
|
|
doc.grant_type = "Authorization Code"
|
|
doc.skip_authorization = False
|
|
|
|
if client.client_uri:
|
|
doc.client_uri = client.client_uri.encoded_string()
|
|
if client.logo_uri:
|
|
doc.logo_uri = client.logo_uri.encoded_string()
|
|
if client.tos_uri:
|
|
doc.tos_uri = client.tos_uri.encoded_string()
|
|
if client.policy_uri:
|
|
doc.policy_uri = client.policy_uri.encoded_string()
|
|
if client.contacts:
|
|
doc.contacts = "\n".join(client.contacts)
|
|
if client.software_id:
|
|
doc.software_id = client.software_id
|
|
if client.software_version:
|
|
doc.software_version = client.software_version
|
|
|
|
if client.token_endpoint_auth_method == "none":
|
|
doc.token_endpoint_auth_method = "None"
|
|
if client.token_endpoint_auth_method == "client_secret_post":
|
|
doc.token_endpoint_auth_method = "Client Secret Post"
|
|
|
|
doc.save(ignore_permissions=True)
|
|
return doc
|
|
|
|
|
|
def get_oauth_settings():
|
|
"""Return OAuth settings."""
|
|
settings: dict[str, Any] = frappe._dict({"skip_authorization": None})
|
|
if frappe.get_cached_value("OAuth Settings", "OAuth Settings", "skip_authorization"):
|
|
settings["skip_authorization"] = "Auto" # based on legacy OAuth Provider Settings value
|
|
|
|
elif value := frappe.get_cached_value(
|
|
"OAuth Provider Settings", "OAuth Provider Settings", "skip_authorization"
|
|
):
|
|
settings["skip_authorization"] = value
|
|
|
|
return settings
|