seitime-frappe/frappe/integrations/utils.py
2025-07-04 15:05:50 +05:30

272 lines
8.2 KiB
Python

# Copyright (c) 2019, Frappe Technologies and contributors
# License: MIT. See LICENSE
import datetime
import json
from typing import Any, cast
from urllib.parse import parse_qs
from pydantic import BaseModel, HttpUrl
import frappe
from frappe.integrations.doctype.oauth_client.oauth_client import OAuthClient
from frappe.utils import get_request_session
class OAuth2DynamicClientMetadata(BaseModel):
"""
OAuth 2.0 Dynamic Client Registration Metadata.
As defined in RFC7591 - OAuth 2.0 Dynamic Client Registration Protocol
https://datatracker.ietf.org/doc/html/rfc7591#section-2
"""
# Used to identify the client to the authorization server
redirect_uris: list[HttpUrl]
token_endpoint_auth_method: str | None = "client_secret_basic"
grant_types: list[str] | None = ["authorization_code"]
response_types: list[str] | None = ["code"]
# Client identifiers shown to user
client_name: str
scope: str | None = None
client_uri: HttpUrl | None = None
logo_uri: HttpUrl | None = None
# Client contact and other information for the client
contacts: list[str] | None = None
tos_uri: HttpUrl | None = None
policy_uri: HttpUrl | None = None
software_id: str | None = None
software_version: str | None = None
# JSON Web Key Set (JWKS) not used here
jwks_uri: HttpUrl | None = None
jwks: dict | None = None
def make_request(method: str, url: str, auth=None, headers=None, data=None, json=None, params=None):
auth = auth or ""
data = data or {}
headers = headers or {}
try:
s = get_request_session()
response = frappe.flags.integration_request = s.request(
method, url, data=data, auth=auth, headers=headers, json=json, params=params
)
response.raise_for_status()
# Check whether the response has a content-type, before trying to check what it is
if content_type := response.headers.get("content-type"):
if content_type == "text/plain; charset=utf-8":
return parse_qs(response.text)
elif content_type.startswith("application/") and content_type.split(";")[0].endswith("json"):
return response.json()
elif response.text:
return response.text
return
except Exception as exc:
if frappe.flags.integration_request_doc:
frappe.flags.integration_request_doc.log_error()
else:
frappe.log_error()
raise exc
def make_get_request(url: str, **kwargs):
"""Make a 'GET' HTTP request to the given `url` and return processed response.
You can optionally pass the below parameters:
* `headers`: Headers to be set in the request.
* `params`: Query parameters to be passed in the request.
* `auth`: Auth credentials.
"""
return make_request("GET", url, **kwargs)
def make_post_request(url: str, **kwargs):
"""Make a 'POST' HTTP request to the given `url` and return processed response.
You can optionally pass the below parameters:
* `headers`: Headers to be set in the request.
* `data`: Data to be passed in body of the request.
* `json`: JSON to be passed in the request.
* `params`: Query parameters to be passed in the request.
* `auth`: Auth credentials.
"""
return make_request("POST", url, **kwargs)
def make_put_request(url: str, **kwargs):
"""Make a 'PUT' HTTP request to the given `url` and return processed response.
You can optionally pass the below parameters:
* `headers`: Headers to be set in the request.
* `data`: Data to be passed in body of the request.
* `json`: JSON to be passed in the request.
* `params`: Query parameters to be passed in the request.
* `auth`: Auth credentials.
"""
return make_request("PUT", url, **kwargs)
def make_patch_request(url: str, **kwargs):
"""Make a 'PATCH' HTTP request to the given `url` and return processed response.
You can optionally pass the below parameters:
* `headers`: Headers to be set in the request.
* `data`: Data to be passed in body of the request.
* `json`: JSON to be passed in the request.
* `params`: Query parameters to be passed in the request.
* `auth`: Auth credentials.
"""
return make_request("PATCH", url, **kwargs)
def make_delete_request(url: str, **kwargs):
"""Make a 'DELETE' HTTP request to the given `url` and return processed response.
You can optionally pass the below parameters:
* `headers`: Headers to be set in the request.
* `data`: Data to be passed in body of the request.
* `json`: JSON to be passed in the request.
* `params`: Query parameters to be passed in the request.
* `auth`: Auth credentials.
"""
return make_request("DELETE", url, **kwargs)
def create_request_log(
data,
integration_type=None,
service_name=None,
name=None,
error=None,
request_headers=None,
output=None,
**kwargs,
):
"""
DEPRECATED: The parameter integration_type will be removed in the next major release.
Use is_remote_request instead.
"""
if integration_type == "Remote":
kwargs["is_remote_request"] = 1
elif integration_type == "Subscription Notification":
kwargs["request_description"] = integration_type
reference_doctype = reference_docname = None
if "reference_doctype" not in kwargs:
if isinstance(data, str):
data = json.loads(data)
reference_doctype = data.get("reference_doctype")
reference_docname = data.get("reference_docname")
integration_request = frappe.get_doc(
{
"doctype": "Integration Request",
"integration_request_service": service_name,
"request_headers": get_json(request_headers),
"data": get_json(data),
"output": get_json(output),
"error": get_json(error),
"reference_doctype": reference_doctype,
"reference_docname": reference_docname,
**kwargs,
}
)
if name:
integration_request.flags._name = name
integration_request.insert(ignore_permissions=True)
frappe.db.commit()
return integration_request
def get_json(obj):
return obj if isinstance(obj, str) else frappe.as_json(obj, indent=1)
def json_handler(obj):
if isinstance(obj, datetime.date | datetime.timedelta | datetime.datetime):
return str(obj)
def validate_dynamic_client_metadata(client: OAuth2DynamicClientMetadata):
invalidation_reasons = []
if len(client.redirect_uris) == 0:
invalidation_reasons.append("redirect_uris is required")
if client.grant_types and not set(client.grant_types).issubset({"authorization_code", "refresh_token"}):
invalidation_reasons.append("only 'authorization_code' and 'refresh_token' grant types are supported")
if client.response_types and not all(rt == "code" for rt in client.response_types):
invalidation_reasons.append("only 'code' response_type is supported")
if not frappe.conf.developer_mode and any(c.scheme != "https" for c in client.redirect_uris):
invalidation_reasons.append("redirect_uris must be https")
if invalidation_reasons:
return ",\n".join(invalidation_reasons)
return None
def create_new_oauth_client(client: OAuth2DynamicClientMetadata):
doc = cast(OAuthClient, frappe.get_doc({"doctype": "OAuth Client"}))
redirect_uris = [str(uri) for uri in client.redirect_uris]
doc.app_name = client.client_name
doc.scopes = client.scope or "all"
doc.redirect_uris = "\n".join(redirect_uris)
doc.default_redirect_uri = redirect_uris[0]
doc.response_type = "Code"
doc.grant_type = "Authorization Code"
doc.skip_authorization = False
if client.client_uri:
doc.client_uri = client.client_uri.encoded_string()
if client.logo_uri:
doc.logo_uri = client.logo_uri.encoded_string()
if client.tos_uri:
doc.tos_uri = client.tos_uri.encoded_string()
if client.policy_uri:
doc.policy_uri = client.policy_uri.encoded_string()
if client.contacts:
doc.contacts = "\n".join(client.contacts)
if client.software_id:
doc.software_id = client.software_id
if client.software_version:
doc.software_version = client.software_version
if client.token_endpoint_auth_method == "none":
doc.token_endpoint_auth_method = "None"
if client.token_endpoint_auth_method == "client_secret_post":
doc.token_endpoint_auth_method = "Client Secret Post"
doc.save(ignore_permissions=True)
return doc
def get_oauth_settings():
"""Return OAuth settings."""
settings: dict[str, Any] = frappe._dict({"skip_authorization": None})
if frappe.get_cached_value("OAuth Settings", "OAuth Settings", "skip_authorization"):
settings["skip_authorization"] = "Auto" # based on legacy OAuth Provider Settings value
elif value := frappe.get_cached_value(
"OAuth Provider Settings", "OAuth Provider Settings", "skip_authorization"
):
settings["skip_authorization"] = value
return settings