seitime-frappe/frappe/api/v2.py
2023-10-16 18:12:53 +05:30

109 lines
2.9 KiB
Python

import json
from werkzeug.routing import Rule
import frappe
from frappe import _
from frappe.utils.data import sbool
def handle_rpc_call(method: str, doctype: str | None = None):
from frappe.handler import execute_cmd
from frappe.modules.utils import load_doctype_module
if doctype:
# Expand to run actual method
module = load_doctype_module(doctype)
method = module.__name__ + "." + method
if method == "login":
return
return execute_cmd(method)
def read_doc(doctype: str, name: str):
doc = frappe.get_doc(doctype, name)
doc.check_permission("read")
doc.apply_fieldlevel_read_permissions()
return doc
def document_list(doctype: str):
if frappe.form_dict.get("fields"):
frappe.form_dict["fields"] = json.loads(frappe.form_dict["fields"])
# set limit of records for frappe.get_list
frappe.form_dict.setdefault(
"limit_page_length",
frappe.form_dict.limit or frappe.form_dict.limit_page_length or 20,
)
# convert strings to native types - only as_dict and debug accept bool
for param in ["as_dict", "debug"]:
param_val = frappe.form_dict.get(param)
if param_val is not None:
frappe.form_dict[param] = sbool(param_val)
# evaluate frappe.get_list
return frappe.call(frappe.client.get_list, doctype, **frappe.form_dict)
def create_doc(doctype: str):
data = frappe.form_dict
data.pop("doctype", None)
return frappe.new_doc(doctype, **data).insert()
def update_doc(doctype: str, name: str):
data = frappe.form_dict
doc = frappe.get_doc(doctype, name, for_update=True)
if "flags" in data:
del data["flags"]
doc.update(data)
doc.save()
# check for child table doctype
if doc.get("parenttype"):
frappe.get_doc(doc.parenttype, doc.parent).save()
return doc
def delete_doc(doctype: str, name: str):
# TODO: child doc handling
frappe.delete_doc(doctype, name, ignore_missing=False)
frappe.response.http_status_code = 202
return "ok"
def execute_doc_method(doctype: str, name: str, method: str | None = None):
method = method or frappe.form_dict.pop("run_method")
doc = frappe.get_doc(doctype, name)
doc.is_whitelisted(method)
if frappe.request.method == "GET":
doc.check_permission("read")
return doc.run_method(method, **frappe.form_dict)
elif frappe.request.method == "POST":
doc.check_permission("write")
return doc.run_method(method, **frappe.form_dict)
url_rules = [
Rule("/method/<method>", endpoint=handle_rpc_call),
Rule("/method/<doctype>/<method>", endpoint=handle_rpc_call),
Rule("/resource/<doctype>", methods=["GET"], endpoint=document_list),
Rule("/resource/<doctype>", methods=["POST"], endpoint=create_doc),
Rule("/resource/<doctype>/<path:name>/", methods=["GET"], endpoint=read_doc),
Rule("/resource/<doctype>/<path:name>/", methods=["PUT"], endpoint=update_doc),
Rule("/resource/<doctype>/<path:name>/", methods=["DELETE"], endpoint=delete_doc),
Rule(
"/resource/<doctype>/<path:name>/method/<method>/",
methods=["GET", "POST"],
endpoint=execute_doc_method,
),
]