fix: add get_dynamic_linked_docs & get_linked_docs utils

This commit is contained in:
Pratik 2026-04-13 15:37:01 +05:30
parent cdb24afaa4
commit f0ef9295bd

View file

@ -14,6 +14,7 @@ from frappe.model.docstatus import DocStatus
from frappe.model.dynamic_links import get_dynamic_link_map from frappe.model.dynamic_links import get_dynamic_link_map
from frappe.model.naming import revert_series_if_last from frappe.model.naming import revert_series_if_last
from frappe.model.utils import is_virtual_doctype from frappe.model.utils import is_virtual_doctype
from frappe.query_builder import DocType
from frappe.utils.data import get_link_to_form from frappe.utils.data import get_link_to_form
from frappe.utils.file_manager import remove_all from frappe.utils.file_manager import remove_all
from frappe.utils.global_search import delete_for_document from frappe.utils.global_search import delete_for_document
@ -137,7 +138,7 @@ def delete_doc(
): ):
try: try:
delete_controllers(name, doc.module) delete_controllers(name, doc.module)
except (OSError, KeyError): except OSError, KeyError:
# in case a doctype doesnt have any controller code nor any app and module # in case a doctype doesnt have any controller code nor any app and module
pass pass
@ -147,7 +148,7 @@ def delete_doc(
# Lock the doc without waiting # Lock the doc without waiting
try: try:
frappe.db.get_value(doctype, name, for_update=True, wait=False) frappe.db.get_value(doctype, name, for_update=True, wait=False)
except (frappe.QueryTimeoutError, frappe.QueryDeadlockError): except frappe.QueryTimeoutError, frappe.QueryDeadlockError:
frappe.throw( frappe.throw(
_( _(
"This document can not be deleted right now as it's being modified by another user. Please try again after some time." "This document can not be deleted right now as it's being modified by another user. Please try again after some time."
@ -297,9 +298,9 @@ def check_permission_and_not_submitted(doc):
) )
def check_if_doc_is_linked(doc, method="Delete"): def get_linked_docs(doc, method="Delete") -> list[dict]:
""" """
Raises exception if the given document is linked in another record. Return a list of documents that are statically linked to the given document.
""" """
from frappe.model.rename_doc import get_link_fields from frappe.model.rename_doc import get_link_fields
@ -311,6 +312,8 @@ def check_if_doc_is_linked(doc, method="Delete"):
if method == "Delete": if method == "Delete":
ignored_doctypes.update(frappe.get_hooks("ignore_links_on_delete")) ignored_doctypes.update(frappe.get_hooks("ignore_links_on_delete"))
linked_docs = []
for lf in link_fields: for lf in link_fields:
link_dt, link_field, issingle = lf["parent"], lf["fieldname"], lf["issingle"] link_dt, link_field, issingle = lf["parent"], lf["fieldname"], lf["issingle"]
if link_dt in ignored_doctypes or (link_field == "amended_from" and method == "Cancel"): if link_dt in ignored_doctypes or (link_field == "amended_from" and method == "Cancel"):
@ -326,7 +329,9 @@ def check_if_doc_is_linked(doc, method="Delete"):
if issingle: if issingle:
if frappe.db.get_single_value(link_dt, link_field) == doc.name: if frappe.db.get_single_value(link_dt, link_field) == doc.name:
raise_link_exists_exception(doc, link_dt, link_dt) linked_docs.append(
{"doc": doc.name, "reference_doctype": link_dt, "reference_docname": link_dt}
)
continue continue
fields = ["name", "docstatus"] fields = ["name", "docstatus"]
@ -343,20 +348,39 @@ def check_if_doc_is_linked(doc, method="Delete"):
continue continue
if method != "Delete" and (method != "Cancel" or not DocStatus(item.docstatus).is_submitted()): if method != "Delete" and (method != "Cancel" or not DocStatus(item.docstatus).is_submitted()):
# don't raise exception if not # don't add if not
# linked to a non-cancelled doc when deleting or to a submitted doc when cancelling # linked to a non-cancelled doc when deleting or to a submitted doc when cancelling
continue continue
elif link_dt == doc.doctype and (item_parent or item.name) == doc.name: elif link_dt == doc.doctype and (item_parent or item.name) == doc.name:
# don't raise exception if not # don't add if linked to same item or doc having same name as the item
# linked to same item or doc having same name as the item
continue continue
else: else:
reference_docname = item_parent or item.name reference_docname = item_parent or item.name
raise_link_exists_exception(doc, linked_parent_doctype, reference_docname) linked_docs.append(
{
"doc": doc.name,
"reference_doctype": linked_parent_doctype,
"reference_docname": reference_docname,
}
)
return linked_docs
def check_if_doc_is_dynamically_linked(doc, method="Delete"): def check_if_doc_is_linked(doc, method="Delete"):
"""Raise `frappe.LinkExistsError` if the document is dynamically linked""" """
Raises exception if the given document is linked in another record.
"""
for link in get_linked_docs(doc, method):
raise_link_exists_exception(doc, link["reference_doctype"], link["reference_docname"])
def get_dynamic_linked_docs(doc, method="Delete") -> list[dict]:
"""
Return a list of documents that are dynamically linked to the given document.
"""
linked_docs = []
for df in get_dynamic_link_map().get(doc.doctype, []): for df in get_dynamic_link_map().get(doc.doctype, []):
ignore_linked_doctypes = doc.get("ignore_linked_doctypes") or [] ignore_linked_doctypes = doc.get("ignore_linked_doctypes") or []
@ -380,16 +404,26 @@ def check_if_doc_is_dynamically_linked(doc, method="Delete"):
or (method == "Cancel" and DocStatus(refdoc.docstatus).is_submitted()) or (method == "Cancel" and DocStatus(refdoc.docstatus).is_submitted())
) )
): ):
raise_link_exists_exception(doc, df.parent, df.parent) linked_docs.append(
{
"doc": doc.name,
"reference_doctype": df.parent,
"reference_docname": df.parent,
"at_position": "",
}
)
else: else:
# dynamic link in table # dynamic link in table
df["table"] = ", `parent`, `parenttype`, `idx`" if meta.istable else "" RefDoc = DocType(df.parent)
for refdoc in frappe.db.sql( query = (
"""select `name`, `docstatus` {table} from `tab{parent}` where frappe.qb.from_(RefDoc)
`{options}`=%s and `{fieldname}`=%s""".format(**df), .select(RefDoc.name, RefDoc.docstatus)
(doc.doctype, doc.name), .where(RefDoc[df.options] == doc.doctype)
as_dict=True, .where(RefDoc[df.fieldname] == doc.name)
): )
if meta.istable:
query = query.select(RefDoc.parent, RefDoc.parenttype, RefDoc.idx)
for refdoc in query.run(as_dict=True):
# linked to an non-cancelled doc when deleting # linked to an non-cancelled doc when deleting
# or linked to a submitted doc when cancelling # or linked to a submitted doc when cancelling
if (method == "Delete" and not DocStatus(refdoc.docstatus).is_cancelled()) or ( if (method == "Delete" and not DocStatus(refdoc.docstatus).is_cancelled()) or (
@ -406,7 +440,24 @@ def check_if_doc_is_dynamically_linked(doc, method="Delete"):
at_position = f"at Row: {refdoc.idx}" if meta.istable else "" at_position = f"at Row: {refdoc.idx}" if meta.istable else ""
raise_link_exists_exception(doc, reference_doctype, reference_docname, at_position) linked_docs.append(
{
"doc": doc.name,
"reference_doctype": reference_doctype,
"reference_docname": reference_docname,
"at_position": at_position,
}
)
return linked_docs
def check_if_doc_is_dynamically_linked(doc, method="Delete"):
"""Raise `frappe.LinkExistsError` if the document is dynamically linked"""
for link in get_dynamic_linked_docs(doc, method):
raise_link_exists_exception(
doc, link["reference_doctype"], link["reference_docname"], link["at_position"]
)
def raise_link_exists_exception(doc, reference_doctype, reference_docname, row=""): def raise_link_exists_exception(doc, reference_doctype, reference_docname, row=""):