When `get_docs` output is unknown, we might end up generating queries for child table with `in (...)` containing thousands of doc names. This doesn't fare well with databases, so it's better to chunk it to 1000 by default. This is an acceptable tradeoff IMO.
2581 lines
76 KiB
Python
2581 lines
76 KiB
Python
# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors
|
|
# License: MIT. See LICENSE
|
|
import hashlib
|
|
import itertools
|
|
import json
|
|
import time
|
|
import warnings
|
|
from collections.abc import Generator, Iterable
|
|
from contextlib import contextmanager
|
|
from functools import wraps
|
|
from types import MappingProxyType
|
|
from typing import TYPE_CHECKING, Any, Literal, Optional, Self, TypeAlias, Union, overload, override
|
|
|
|
from werkzeug.exceptions import NotFound
|
|
|
|
import frappe
|
|
from frappe import _, is_whitelisted, msgprint
|
|
from frappe.core.doctype.file.utils import relink_mismatched_files
|
|
from frappe.core.doctype.server_script.server_script_utils import run_server_script_for_doc_event
|
|
from frappe.database.utils import commit_after_response
|
|
from frappe.desk.form.document_follow import follow_document
|
|
from frappe.integrations.doctype.webhook import run_webhooks
|
|
from frappe.model import optional_fields, table_fields
|
|
from frappe.model.base_document import BaseDocument, D, get_controller
|
|
from frappe.model.docstatus import DocStatus
|
|
from frappe.model.naming import set_new_name, validate_name
|
|
from frappe.model.utils import is_virtual_doctype, simple_singledispatch
|
|
from frappe.model.workflow import set_workflow_state_on_action, validate_workflow
|
|
from frappe.types import DF
|
|
from frappe.types.filter import FilterSignature
|
|
from frappe.utils import compare, cstr, date_diff, file_lock, flt, get_table_name, now
|
|
from frappe.utils.data import get_absolute_url, get_datetime, get_timedelta, getdate
|
|
from frappe.utils.global_search import update_global_search
|
|
|
|
if TYPE_CHECKING:
|
|
from typing import Self
|
|
|
|
from frappe.core.doctype.docfield.docfield import DocField
|
|
|
|
|
|
DOCUMENT_LOCK_EXPIRY = 3 * 60 * 60 # All locks expire in 3 hours automatically
|
|
DOCUMENT_LOCK_SOFT_EXPIRY = 30 * 60 # Let users force-unlock after 30 minutes
|
|
|
|
|
|
type _SingleDocument = "Document"
|
|
type _NewDocument = "Document"
|
|
|
|
|
|
@overload
|
|
def get_doc(document: "Document", /) -> "Document":
|
|
pass
|
|
|
|
|
|
@overload
|
|
def get_doc(doctype: str, /) -> _SingleDocument:
|
|
"""Retrieve Single DocType from DB, doctype must be positional argument."""
|
|
pass
|
|
|
|
|
|
@overload
|
|
def get_doc(
|
|
doctype: str, name: str, /, *, for_update: bool | None = None, check_permission: str | bool | None = None
|
|
) -> "Document":
|
|
"""Retrieve DocType from DB, doctype and name must be positional argument."""
|
|
pass
|
|
|
|
|
|
@overload
|
|
def get_doc(**kwargs: dict) -> "_NewDocument":
|
|
"""Initialize document from kwargs.
|
|
Not recommended. Use `frappe.new_doc` instead."""
|
|
pass
|
|
|
|
|
|
@overload
|
|
def get_doc(documentdict: dict) -> "_NewDocument":
|
|
"""Create document from dict.
|
|
Not recommended. Use `frappe.new_doc` instead."""
|
|
pass
|
|
|
|
|
|
@simple_singledispatch
|
|
def get_doc(*args, **kwargs) -> "Document":
|
|
"""Return a `frappe.model.Document` object.
|
|
|
|
:param arg1: Document dict or DocType name.
|
|
:param arg2: [optional] document name.
|
|
:param for_update: [optional] select document for update.
|
|
|
|
There are multiple ways to call `get_doc`
|
|
|
|
# will fetch the latest user object (with child table) from the database
|
|
user = get_doc("User", "test@example.com")
|
|
|
|
# create a new object
|
|
user = get_doc({
|
|
"doctype":"User"
|
|
"email_id": "test@example.com",
|
|
"roles: [
|
|
{"role": "System Manager"}
|
|
]
|
|
})
|
|
|
|
# create new object with keyword arguments
|
|
user = get_doc(doctype='User', email_id='test@example.com')
|
|
|
|
# select a document for update
|
|
user = get_doc("User", "test@example.com", for_update=True)
|
|
"""
|
|
if not args and kwargs:
|
|
return get_doc_from_dict(kwargs)
|
|
else:
|
|
raise ValueError("First non keyword argument must be a string or dict")
|
|
|
|
|
|
@get_doc.register(BaseDocument)
|
|
def _basedoc(doc: BaseDocument, *args, **kwargs) -> "Document":
|
|
return doc
|
|
|
|
|
|
@get_doc.register(str)
|
|
def get_doc_str(doctype: str, name: str | None = None, **kwargs) -> "Document":
|
|
# if no name: it's a single
|
|
if controller := get_controller(doctype):
|
|
doc = controller(doctype, name, **kwargs)
|
|
return get_doc_permission_check(doc, kwargs.get("check_permission"))
|
|
|
|
raise ImportError(doctype)
|
|
|
|
|
|
@get_doc.register(MappingProxyType) # global test record
|
|
def get_doc_from_mapping_proxy(data: MappingProxyType, **kwargs) -> "Document":
|
|
return get_doc_from_dict(dict(data), **kwargs)
|
|
|
|
|
|
@get_doc.register(dict)
|
|
def get_doc_from_dict(data: dict[str, Any], **kwargs) -> "Document":
|
|
if "doctype" not in data:
|
|
raise ValueError('"doctype" is a required key')
|
|
if controller := get_controller(data["doctype"]):
|
|
doc = controller(**data)
|
|
return get_doc_permission_check(doc, kwargs.get("check_permission"))
|
|
raise ImportError(data["doctype"])
|
|
|
|
|
|
def get_lazy_doc(
|
|
doctype: str, name: str, *, for_update=None, check_permission: str | bool | None = None
|
|
) -> "Document":
|
|
if doctype == "DocType":
|
|
warnings.warn("DocType doesn't support lazy loading", stacklevel=1)
|
|
return get_doc(doctype, name, check_permission=check_permission)
|
|
|
|
if controller := get_lazy_controller(doctype):
|
|
doc = controller(doctype, name, for_update=for_update)
|
|
return get_doc_permission_check(doc, check_permission)
|
|
raise ImportError(doctype)
|
|
|
|
|
|
def get_docs(
|
|
doctype: str,
|
|
filters: dict | None = None,
|
|
*,
|
|
chunk_size: int = 1000,
|
|
limit: int | None = None,
|
|
limit_start: int = 0,
|
|
order_by: str = "creation asc",
|
|
as_iterator: bool = False,
|
|
for_update: bool = False,
|
|
distinct: bool = False,
|
|
) -> list["Document"] | Generator["Document"]:
|
|
"""Fetch fully instantiated Document objects from the database.
|
|
|
|
Returns a list of Documents by default. Pass `as_iterator=True` to get
|
|
a chunked generator that yields a list of Documents per chunk to reduce memory usage.
|
|
|
|
:param doctype: DocType of the records to fetch.
|
|
:param filters: Dict or list of filters to apply.
|
|
:param chunk_size: Number of records to fetch in each chunk if using `as_iterator`.
|
|
:param limit: Maximum total number of records to fetch.
|
|
:param limit_start: Start results at record #. Default 0.
|
|
:param order_by: Order By string, e.g. `creation desc`.
|
|
:param as_iterator: If True, returns a iterator yielding Documents.
|
|
:param for_update: If True, locks the fetched rows for update.
|
|
:param distinct: If True, return distinct rows.
|
|
"""
|
|
if is_virtual_doctype(doctype):
|
|
frappe.throw(_("Virtual DocType {0} cannot be fetched in bulk.").format(doctype))
|
|
|
|
meta = frappe.get_meta(doctype)
|
|
|
|
if meta.issingle:
|
|
frappe.throw(_("Single DocType {0} cannot be fetched in bulk.").format(doctype))
|
|
|
|
if limit_start and limit is None:
|
|
frappe.throw(_("limit cannot be None when limit_start is used"))
|
|
|
|
if not order_by:
|
|
# Sort order is mandatory for iterator logic
|
|
order_by = "name asc"
|
|
|
|
child_tables = [
|
|
(df.fieldname, df.options) for df in meta.get_table_fields() if not is_virtual_doctype(df.options)
|
|
]
|
|
controller = get_controller(doctype)
|
|
for_update = for_update and frappe.db.db_type != "sqlite"
|
|
|
|
iterator = _get_docs_generator(
|
|
doctype,
|
|
controller,
|
|
child_tables,
|
|
filters=filters,
|
|
chunk_size=chunk_size,
|
|
limit=limit,
|
|
limit_start=limit_start,
|
|
order_by=order_by,
|
|
for_update=for_update,
|
|
distinct=distinct,
|
|
)
|
|
|
|
if as_iterator:
|
|
return iterator
|
|
return list(iterator)
|
|
|
|
|
|
def _get_docs_generator(
|
|
doctype,
|
|
controller,
|
|
child_tables,
|
|
*,
|
|
filters,
|
|
chunk_size,
|
|
limit,
|
|
limit_start,
|
|
order_by,
|
|
for_update,
|
|
distinct,
|
|
) -> Generator["Document"]:
|
|
fetched_count = 0
|
|
current_offset = limit_start
|
|
|
|
while True:
|
|
current_chunk_size = chunk_size
|
|
if limit is not None:
|
|
remaining = limit - fetched_count
|
|
if remaining <= 0:
|
|
break
|
|
current_chunk_size = min(chunk_size, remaining)
|
|
|
|
chunk_data = _fetch_rows(
|
|
doctype,
|
|
filters=filters,
|
|
order_by=order_by,
|
|
limit=current_chunk_size,
|
|
offset=current_offset,
|
|
for_update=for_update,
|
|
child_tables=child_tables,
|
|
distinct=distinct,
|
|
)
|
|
|
|
if not chunk_data:
|
|
break
|
|
|
|
yield from _build_document_objects(controller, chunk_data, for_update)
|
|
|
|
fetched_count += len(chunk_data)
|
|
current_offset += len(chunk_data)
|
|
|
|
|
|
def _fetch_rows(doctype, *, filters, order_by, limit, offset, for_update, child_tables, distinct=False):
|
|
kwargs = {}
|
|
if limit is not None:
|
|
kwargs["limit"] = limit
|
|
if offset:
|
|
kwargs["offset"] = offset
|
|
|
|
data = frappe.qb.get_query(
|
|
table=doctype,
|
|
filters=filters or {},
|
|
fields=["*"],
|
|
order_by=order_by,
|
|
for_update=for_update,
|
|
distinct=distinct,
|
|
**kwargs,
|
|
).run(as_dict=True)
|
|
|
|
if not data:
|
|
return []
|
|
|
|
for row in data:
|
|
row["doctype"] = doctype
|
|
|
|
fetched_docs_by_name = {row.name: row for row in data}
|
|
parent_names = list(fetched_docs_by_name.keys())
|
|
|
|
for fieldname, child_doctype in child_tables:
|
|
child_table_data = frappe.qb.get_query(
|
|
table=child_doctype,
|
|
filters={"parent": ("in", parent_names), "parenttype": doctype, "parentfield": fieldname},
|
|
fields=["*"],
|
|
order_by="idx asc",
|
|
for_update=for_update,
|
|
).run(as_dict=True)
|
|
|
|
for child in child_table_data:
|
|
child["doctype"] = child_doctype
|
|
|
|
for parent_doc in fetched_docs_by_name.values():
|
|
parent_doc[fieldname] = []
|
|
|
|
for child in child_table_data:
|
|
if child.parent in fetched_docs_by_name:
|
|
fetched_docs_by_name[child.parent][fieldname].append(child)
|
|
|
|
return list(fetched_docs_by_name.values())
|
|
|
|
|
|
def _build_document_objects(controller, data: list, for_update: bool):
|
|
for row in data:
|
|
doc = controller(row)
|
|
if for_update:
|
|
doc.flags.for_update = True
|
|
yield doc
|
|
|
|
|
|
def get_doc_permission_check(doc: "Document", check_permission: str | bool | None = None) -> "Document":
|
|
"""
|
|
Checks permissions for the given document, if specified.
|
|
|
|
:param doc: The document to check permissions for.
|
|
:param check_permission: The permission to check for, default is "read" if truthy.
|
|
:return: The document with permissions checked.
|
|
"""
|
|
if check_permission:
|
|
if isinstance(check_permission, str):
|
|
doc.check_permission(check_permission)
|
|
else:
|
|
doc.check_permission("read")
|
|
return doc
|
|
|
|
|
|
class Document(BaseDocument):
|
|
"""All controllers inherit from `Document`."""
|
|
|
|
doctype: DF.Data
|
|
name: DF.Data | None
|
|
flags: frappe._dict[str, Any]
|
|
owner: DF.Link
|
|
creation: DF.Datetime
|
|
modified: DF.Datetime
|
|
modified_by: DF.Link
|
|
idx: DF.Int
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
"""Constructor.
|
|
|
|
:param arg1: DocType name as string, document **dict**, or DocRef object
|
|
:param arg2: Document name, if `arg1` is DocType name.
|
|
|
|
If DocType name and document name are passed, the object will load
|
|
all values (including child documents) from the database.
|
|
"""
|
|
self.doctype = None
|
|
self.name = None
|
|
self.flags = frappe._dict()
|
|
if args:
|
|
first_arg = args[0]
|
|
if isinstance(first_arg, str):
|
|
self.doctype = first_arg
|
|
self.name = first_arg if len(args) == 1 else args[1]
|
|
|
|
# for_update is set in flags to avoid changing load_from_db signature
|
|
# since it is used in virtual doctypes and inherited in child classes
|
|
self.flags.for_update = kwargs.get("for_update", False)
|
|
self.load_from_db()
|
|
return
|
|
|
|
if isinstance(first_arg, dict):
|
|
kwargs = first_arg
|
|
|
|
if kwargs:
|
|
super().__init__(kwargs)
|
|
self.init_child_tables()
|
|
self.init_valid_columns()
|
|
return
|
|
|
|
raise ValueError("Illegal arguments")
|
|
|
|
@property
|
|
def is_locked(self):
|
|
signature = self.get_signature()
|
|
if not file_lock.lock_exists(signature):
|
|
return False
|
|
|
|
if file_lock.lock_age(signature) > DOCUMENT_LOCK_EXPIRY:
|
|
return False
|
|
|
|
return True
|
|
|
|
def load_from_db(self) -> "Self":
|
|
"""Load document and children from database and create properties
|
|
from fields"""
|
|
|
|
is_doctype = self.doctype == "DocType"
|
|
|
|
self.flags.ignore_children = True
|
|
if not is_doctype and self.meta.issingle:
|
|
single_doc = frappe.db.get_singles_dict(self.doctype, for_update=self.flags.for_update)
|
|
if not single_doc:
|
|
single_doc = frappe.new_doc(self.doctype, as_dict=True)
|
|
single_doc["name"] = self.doctype
|
|
del single_doc["__islocal"]
|
|
|
|
super().__init__(single_doc)
|
|
self.init_valid_columns()
|
|
self._fix_numeric_types()
|
|
|
|
else:
|
|
if not is_doctype and isinstance(self.name, str | int):
|
|
for_update = ""
|
|
if self.flags.for_update and frappe.db.db_type != "sqlite":
|
|
for_update = "FOR UPDATE"
|
|
# Fast path - use raw SQL to avoid QB/ORM overheads.
|
|
d = frappe.db.sql(
|
|
"SELECT * FROM {table_name} WHERE `name` = %s {for_update}".format(
|
|
table_name=get_table_name(self.doctype, wrap_in_backticks=True),
|
|
for_update=for_update,
|
|
),
|
|
(self.name),
|
|
as_dict=True,
|
|
)
|
|
d = d[0] if d else d
|
|
else:
|
|
d = frappe.db.get_value(
|
|
doctype=self.doctype,
|
|
filters=self.name,
|
|
fieldname="*",
|
|
for_update=self.flags.for_update,
|
|
as_dict=True,
|
|
)
|
|
|
|
if not d:
|
|
frappe.throw(
|
|
_("{0} {1} not found").format(_(self.doctype), self.name),
|
|
frappe.DoesNotExistError(doctype=self.doctype),
|
|
)
|
|
|
|
super().__init__(d)
|
|
self.flags.pop("ignore_children", None)
|
|
|
|
self.load_children_from_db()
|
|
|
|
# sometimes __setup__ can depend on child values, hence calling again at the end
|
|
if hasattr(self, "__setup__"):
|
|
self.__setup__()
|
|
|
|
if not is_doctype:
|
|
self.mask_fields()
|
|
|
|
return self
|
|
|
|
def mask_fields(self):
|
|
from frappe.model.db_query import mask_field_value
|
|
|
|
mask_fields = frappe.get_meta(self.doctype).get_masked_fields()
|
|
|
|
for field in mask_fields:
|
|
val = self.get(field.fieldname)
|
|
self.set(field.fieldname, mask_field_value(field, val))
|
|
|
|
def load_children_from_db(self):
|
|
is_doctype = self.doctype == "DocType"
|
|
|
|
for fieldname, child_doctype in self._table_fieldnames.items():
|
|
# Make sure not to query the DB for a child table, if it is a virtual one.
|
|
if not is_doctype and is_virtual_doctype(child_doctype):
|
|
# Remove cache so that the virtual field loads again
|
|
self.__dict__.pop(fieldname, None)
|
|
continue
|
|
|
|
if is_doctype:
|
|
# This special handling is required because of bootstrapping code that doesn't
|
|
# handle failures correctly.
|
|
children = frappe.db.get_values(
|
|
child_doctype,
|
|
{"parent": self.name, "parenttype": self.doctype, "parentfield": fieldname},
|
|
"*",
|
|
as_dict=True,
|
|
order_by="idx asc",
|
|
for_update=self.flags.for_update,
|
|
)
|
|
else:
|
|
children = self._load_child_table_from_db(fieldname, child_doctype)
|
|
|
|
if children is None:
|
|
children = []
|
|
|
|
self.set(fieldname, children)
|
|
|
|
return self
|
|
|
|
def _load_child_table_from_db(self, fieldname, child_doctype):
|
|
for_update = ""
|
|
if self.flags.for_update and frappe.db.db_type != "sqlite":
|
|
for_update = "FOR UPDATE"
|
|
# Fast pass for all other doctypes - using raw SQL
|
|
return frappe.db.sql(
|
|
"""SELECT * FROM {table_name}
|
|
WHERE `parent`= %(parent)s
|
|
AND `parenttype`= %(parenttype)s
|
|
AND `parentfield`= %(parentfield)s
|
|
ORDER BY `idx` ASC {for_update}""".format(
|
|
table_name=get_table_name(child_doctype, wrap_in_backticks=True),
|
|
for_update=for_update,
|
|
),
|
|
{"parent": str(self.name), "parenttype": self.doctype, "parentfield": fieldname},
|
|
as_dict=True,
|
|
)
|
|
|
|
def reload(self) -> "Self":
|
|
"""Reload document from database"""
|
|
return self.load_from_db()
|
|
|
|
def get_latest(self):
|
|
if not getattr(self, "_doc_before_save", None):
|
|
self.load_doc_before_save()
|
|
|
|
return self._doc_before_save
|
|
|
|
def check_permission(self, permtype="read", permlevel=None):
|
|
"""Raise `frappe.PermissionError` if not permitted"""
|
|
if not self.has_permission(permtype):
|
|
self._handle_permission_failure(permtype)
|
|
|
|
def has_permission(self, permtype="read", *, debug=False, user=None) -> bool:
|
|
"""
|
|
Call `frappe.permissions.has_permission` if `ignore_permissions` flag isn't truthy
|
|
|
|
:param permtype: `read`, `write`, `submit`, `cancel`, `delete`, etc.
|
|
"""
|
|
|
|
if self.flags.ignore_permissions:
|
|
return True
|
|
|
|
import frappe.permissions
|
|
|
|
return frappe.permissions.has_permission(self.doctype, permtype, self, debug=debug, user=user)
|
|
|
|
def _handle_permission_failure(self, perm_type):
|
|
from frappe.permissions import check_doctype_permission
|
|
|
|
check_doctype_permission(self.doctype, perm_type)
|
|
self.raise_no_permission_to(perm_type)
|
|
|
|
def raise_no_permission_to(self, perm_type):
|
|
"""Raise `frappe.PermissionError`."""
|
|
frappe.flags.error_message = _(
|
|
"You need the '{0}' permission on {1} {2} to perform this action."
|
|
).format(
|
|
_(perm_type),
|
|
frappe.bold(_(self.doctype)),
|
|
self.name or "",
|
|
)
|
|
raise frappe.PermissionError
|
|
|
|
def insert(
|
|
self,
|
|
ignore_permissions=None,
|
|
ignore_links=None,
|
|
ignore_if_duplicate=False,
|
|
ignore_mandatory=None,
|
|
set_name=None,
|
|
set_child_names=True,
|
|
) -> "Self":
|
|
"""Insert the document in the database (as a new document).
|
|
This will check for user permissions and execute `before_insert`,
|
|
`validate`, `on_update`, `after_insert` methods if they are written.
|
|
|
|
:param ignore_permissions: Do not check permissions if True.
|
|
:param ignore_links: Do not check validity of links if True.
|
|
:param ignore_if_duplicate: Do not raise error if a duplicate entry exists.
|
|
:param ignore_mandatory: Do not check missing mandatory fields if True.
|
|
:param set_name: Name to set for the document, if valid.
|
|
:param set_child_names: Whether to set names for the child documents.
|
|
"""
|
|
if self.flags.in_print:
|
|
return self
|
|
|
|
self.flags.notifications_executed = []
|
|
|
|
if ignore_permissions is not None:
|
|
self.flags.ignore_permissions = ignore_permissions
|
|
|
|
if ignore_links is not None:
|
|
self.flags.ignore_links = ignore_links
|
|
|
|
if ignore_mandatory is not None:
|
|
self.flags.ignore_mandatory = ignore_mandatory
|
|
|
|
self.set("__islocal", True)
|
|
|
|
self._set_defaults()
|
|
self.set_user_and_timestamp()
|
|
self.set_docstatus()
|
|
self.check_permission("create")
|
|
self.check_if_latest()
|
|
self._validate_links()
|
|
self.run_method("before_insert")
|
|
self.set_new_name(set_name=set_name, set_child_names=set_child_names)
|
|
self.set_parent_in_children()
|
|
self.validate_higher_perm_levels()
|
|
|
|
self.flags.in_insert = True
|
|
self.run_before_save_methods()
|
|
self._validate()
|
|
self.set_docstatus()
|
|
self.flags.in_insert = False
|
|
|
|
# run validate, on update etc.
|
|
|
|
# parent
|
|
if getattr(self.meta, "issingle", 0):
|
|
self.update_single(self.get_valid_dict())
|
|
else:
|
|
self.db_insert(ignore_if_duplicate=ignore_if_duplicate)
|
|
|
|
# children
|
|
if not getattr(self.meta, "is_virtual", False):
|
|
for d in self.get_all_children():
|
|
d.db_insert()
|
|
|
|
self.reset_computed_child_tables()
|
|
self.run_method("after_insert")
|
|
self.flags.in_insert = True
|
|
|
|
if self.get("amended_from"):
|
|
self.validate_amended_from()
|
|
self.copy_attachments_from_amended_from()
|
|
|
|
relink_mismatched_files(self)
|
|
self.run_post_save_methods()
|
|
self.flags.in_insert = False
|
|
|
|
# delete __islocal
|
|
if hasattr(self, "__islocal"):
|
|
delattr(self, "__islocal")
|
|
|
|
# clear unsaved flag
|
|
if hasattr(self, "__unsaved"):
|
|
delattr(self, "__unsaved")
|
|
|
|
if not (frappe.flags.in_migrate or frappe.local.flags.in_install or frappe.flags.in_setup_wizard):
|
|
if frappe.get_cached_value("User", frappe.session.user, "follow_created_documents"):
|
|
follow_document(self.doctype, self.name, frappe.session.user)
|
|
return self
|
|
|
|
def check_if_locked(self):
|
|
if not self.creation or not self.is_locked:
|
|
return
|
|
|
|
# Allow unlocking if created more than 60 minutes ago
|
|
primary_action = None
|
|
if file_lock.lock_age(self.get_signature()) > DOCUMENT_LOCK_SOFT_EXPIRY:
|
|
primary_action = {
|
|
"label": "Force Unlock",
|
|
"server_action": "frappe.model.document.unlock_document",
|
|
"hide_on_success": True,
|
|
"args": {
|
|
"doctype": self.doctype,
|
|
"name": self.name,
|
|
},
|
|
}
|
|
|
|
frappe.throw(
|
|
_(
|
|
"This document is currently locked and queued for execution. Please try again after some time."
|
|
),
|
|
title=_("Document Queued"),
|
|
primary_action=primary_action,
|
|
exc=frappe.DocumentLockedError,
|
|
)
|
|
|
|
def save(self, *args, **kwargs) -> "Self":
|
|
"""Wrapper for _save"""
|
|
return self._save(*args, **kwargs)
|
|
|
|
def _save(self, ignore_permissions=None, ignore_version=None) -> "Self":
|
|
"""Save the current document in the database in the **DocType**'s table or
|
|
`tabSingles` (for single types).
|
|
|
|
This will check for user permissions and execute
|
|
`validate` before updating, `on_update` after updating triggers.
|
|
|
|
:param ignore_permissions: Do not check permissions if True.
|
|
:param ignore_version: Do not save version if True."""
|
|
if self.flags.in_print:
|
|
return self
|
|
|
|
self.flags.notifications_executed = []
|
|
|
|
if ignore_permissions is not None:
|
|
self.flags.ignore_permissions = ignore_permissions
|
|
|
|
self.flags.ignore_version = frappe.in_test if ignore_version is None else ignore_version
|
|
|
|
if self.get("__islocal") or not self.get("name"):
|
|
return self.insert()
|
|
|
|
self.check_if_locked()
|
|
self._set_defaults()
|
|
self.check_permission("write", "save")
|
|
|
|
self.set_user_and_timestamp()
|
|
self.set_docstatus()
|
|
self.check_if_latest()
|
|
self.set_parent_in_children()
|
|
self.set_name_in_children()
|
|
|
|
self.validate_higher_perm_levels()
|
|
self._validate_links()
|
|
self.run_before_save_methods()
|
|
|
|
if self._action != "cancel":
|
|
self._validate()
|
|
|
|
if self._action == "update_after_submit":
|
|
self.validate_update_after_submit()
|
|
|
|
self.set_docstatus()
|
|
|
|
# parent
|
|
if self.meta.issingle:
|
|
self.update_single(self.get_valid_dict())
|
|
else:
|
|
self.db_update()
|
|
|
|
self.update_children()
|
|
self.reset_computed_child_tables()
|
|
self.run_post_save_methods()
|
|
|
|
# clear unsaved flag
|
|
if hasattr(self, "__unsaved"):
|
|
delattr(self, "__unsaved")
|
|
|
|
return self
|
|
|
|
def validate_amended_from(self):
|
|
if frappe.db.get_value(self.doctype, self.get("amended_from"), "docstatus") != 2:
|
|
message = _(
|
|
"{0} cannot be amended because it is not cancelled. Please cancel the document before creating an amendment."
|
|
).format(frappe.utils.get_link_to_form(self.doctype, self.get("amended_from")))
|
|
frappe.throw(message, title=_("Amendment Not Allowed"))
|
|
|
|
def copy_attachments_from_amended_from(self):
|
|
"""Copy attachments from `amended_from`"""
|
|
from frappe.desk.form.load import get_attachments
|
|
|
|
# loop through attachments
|
|
for attach_item in get_attachments(self.doctype, self.amended_from):
|
|
# save attachments to new doc
|
|
_file = frappe.get_doc(
|
|
{
|
|
"doctype": "File",
|
|
"file_url": attach_item.file_url,
|
|
"file_name": attach_item.file_name,
|
|
"attached_to_name": self.name,
|
|
"attached_to_doctype": self.doctype,
|
|
"attached_to_field": attach_item.attached_to_field,
|
|
"folder": attach_item.folder or "Home/Attachments",
|
|
"is_private": attach_item.is_private,
|
|
}
|
|
)
|
|
_file.save()
|
|
|
|
def update_children(self):
|
|
"""update child tables"""
|
|
if getattr(self.meta, "is_virtual", False):
|
|
# Virtual doctypes manage their own children
|
|
return
|
|
|
|
for df in self.meta.get_table_fields():
|
|
self.update_child_table(df.fieldname, df)
|
|
|
|
def update_child_table(self, fieldname: str, df: "DocField" | None = None):
|
|
"""sync child table for given fieldname"""
|
|
df: DocField = df or self.meta.get_field(fieldname)
|
|
if df.is_virtual:
|
|
return
|
|
all_rows = self.get(df.fieldname)
|
|
|
|
# delete rows that do not match the ones in the document
|
|
# if the doctype isn't in ignore_children_type flag and isn't virtual
|
|
if not (
|
|
df.options in (self.flags.ignore_children_type or ())
|
|
or frappe.get_meta(df.options).is_virtual == 1
|
|
):
|
|
existing_row_names = [row.name for row in all_rows if row.name and not row.is_new()]
|
|
|
|
tbl = frappe.qb.DocType(df.options)
|
|
qry = (
|
|
frappe.qb.from_(tbl)
|
|
.where(tbl.parent == str(self.name))
|
|
.where(tbl.parenttype == self.doctype)
|
|
.where(tbl.parentfield == fieldname)
|
|
.delete()
|
|
)
|
|
|
|
if existing_row_names:
|
|
qry = qry.where(tbl.name.notin(existing_row_names))
|
|
|
|
qry.run()
|
|
|
|
# update / insert
|
|
for d in all_rows:
|
|
d: Document
|
|
d.db_update()
|
|
|
|
def reset_computed_child_tables(self):
|
|
"""Reset computed child tables so that they are reloaded next time"""
|
|
for df in self.meta.get_table_fields(include_computed=True):
|
|
if df.is_virtual:
|
|
self.__dict__.pop(df.fieldname, None)
|
|
|
|
def get_doc_before_save(self) -> "Self":
|
|
return getattr(self, "_doc_before_save", None)
|
|
|
|
def has_value_changed(self, fieldname):
|
|
"""Return True if value has changed before and after saving."""
|
|
from datetime import date, datetime, timedelta
|
|
|
|
previous = self.get_doc_before_save()
|
|
|
|
if not previous:
|
|
return True
|
|
|
|
previous_value = previous.get(fieldname)
|
|
current_value = self.get(fieldname)
|
|
|
|
if isinstance(previous_value, datetime):
|
|
current_value = get_datetime(current_value)
|
|
elif isinstance(previous_value, date):
|
|
current_value = getdate(current_value)
|
|
elif isinstance(previous_value, timedelta):
|
|
current_value = get_timedelta(current_value)
|
|
|
|
return previous_value != current_value
|
|
|
|
def get_value_before_save(self, fieldname):
|
|
"""Returns value of a field before saving
|
|
|
|
Note: This function only works in save context like doc.save, doc.submit.
|
|
"""
|
|
previous = self.get_doc_before_save()
|
|
if not previous:
|
|
return
|
|
return previous.get(fieldname)
|
|
|
|
def set_new_name(self, force=False, set_name=None, set_child_names=True):
|
|
"""Calls `frappe.naming.set_new_name` for parent and child docs."""
|
|
|
|
if self.flags.name_set and not force:
|
|
return
|
|
|
|
autoname = self.meta.autoname or ""
|
|
|
|
# If autoname has set as Prompt (name)
|
|
if self.get("__newname") and autoname.lower() == "prompt":
|
|
self.name = validate_name(self.doctype, self.get("__newname"))
|
|
self.flags.name_set = True
|
|
return
|
|
|
|
if set_name:
|
|
self.name = validate_name(self.doctype, set_name)
|
|
else:
|
|
set_new_name(self)
|
|
|
|
if set_child_names:
|
|
# set name for children
|
|
for d in self.get_all_children():
|
|
set_new_name(d)
|
|
|
|
self.flags.name_set = True
|
|
|
|
def get_title(self):
|
|
"""Get the document title based on title_field or `title` or `name`"""
|
|
return self.get(self.meta.get_title_field()) or ""
|
|
|
|
def set_title_field(self):
|
|
"""Set title field based on template"""
|
|
|
|
def get_values():
|
|
values = self.as_dict()
|
|
# format values
|
|
for key, value in values.items():
|
|
if value is None:
|
|
values[key] = ""
|
|
return values
|
|
|
|
if self.meta.get("title_field") == "title":
|
|
df = self.meta.get_field(self.meta.title_field)
|
|
|
|
if df.options:
|
|
self.set(df.fieldname, df.options.format(**get_values()))
|
|
elif self.is_new() and not self.get(df.fieldname) and df.default:
|
|
# set default title for new transactions (if default)
|
|
self.set(df.fieldname, df.default.format(**get_values()))
|
|
|
|
def update_single(self, d):
|
|
"""Updates values for Single type Document in `tabSingles`."""
|
|
frappe.db.delete("Singles", {"doctype": self.doctype})
|
|
for field, value in d.items():
|
|
if field != "doctype":
|
|
frappe.db.sql(
|
|
"""insert into `tabSingles` (doctype, field, value)
|
|
values (%s, %s, %s)""",
|
|
(self.doctype, field, value),
|
|
)
|
|
|
|
if self.doctype in frappe.db.value_cache:
|
|
frappe.db.value_cache.pop(self.doctype, None)
|
|
|
|
def set_user_and_timestamp(self):
|
|
self._original_modified = self.modified
|
|
self.modified = now()
|
|
self.modified_by = frappe.session.user
|
|
|
|
# We'd probably want the creation and owner to be set via API
|
|
# or Data import at some point, that'd have to be handled here
|
|
if self.is_new() and not (
|
|
frappe.flags.in_install or frappe.flags.in_patch or frappe.flags.in_migrate
|
|
):
|
|
self.creation = self.modified
|
|
self.owner = self.modified_by
|
|
|
|
for d in self.get_all_children():
|
|
d.modified = self.modified
|
|
d.modified_by = self.modified_by
|
|
if not d.owner:
|
|
d.owner = self.owner
|
|
if not d.creation:
|
|
d.creation = self.creation
|
|
|
|
frappe.flags.currently_saving.append((self.doctype, self.name))
|
|
|
|
def set_docstatus(self):
|
|
# docstatus property automatically sets a docstatus if not set
|
|
docstatus = self.docstatus
|
|
|
|
for d in self.get_all_children():
|
|
d.set("docstatus", docstatus)
|
|
|
|
def _validate(self):
|
|
self._validate_mandatory()
|
|
self._validate_data_fields()
|
|
self._validate_selects()
|
|
self._validate_non_negative()
|
|
self._validate_length()
|
|
self._fix_rating_value()
|
|
self._validate_code_fields()
|
|
self._sync_autoname_field()
|
|
self._extract_images_from_text_editor()
|
|
self._sanitize_content()
|
|
self._save_passwords()
|
|
self.validate_workflow()
|
|
|
|
for d in self.get_all_children():
|
|
d._validate_data_fields()
|
|
d._validate_selects()
|
|
d._validate_non_negative()
|
|
d._validate_length()
|
|
d._fix_rating_value()
|
|
d._validate_code_fields()
|
|
d._sync_autoname_field()
|
|
d._extract_images_from_text_editor()
|
|
d._sanitize_content()
|
|
d._save_passwords()
|
|
if self.is_new():
|
|
# don't set fields like _assign, _comments for new doc
|
|
for fieldname in optional_fields:
|
|
self.set(fieldname, None)
|
|
else:
|
|
self.validate_set_only_once()
|
|
|
|
def _validate_non_negative(self):
|
|
def get_msg(df):
|
|
if self.get("parentfield"):
|
|
return "{} {} #{}: {} {}".format(
|
|
frappe.bold(_(self.doctype)),
|
|
_("Row"),
|
|
self.idx,
|
|
_("Value cannot be negative for"),
|
|
frappe.bold(_(df.label, context=df.parent)),
|
|
)
|
|
else:
|
|
return _("Value cannot be negative for {0}: {1}").format(
|
|
_(df.parent), frappe.bold(_(df.label, context=df.parent))
|
|
)
|
|
|
|
for df in self.meta.get(
|
|
"fields", {"non_negative": ("=", 1), "fieldtype": ("in", ["Int", "Float", "Currency", "Percent"])}
|
|
):
|
|
if flt(self.get(df.fieldname)) < 0:
|
|
msg = get_msg(df)
|
|
frappe.throw(msg, frappe.NonNegativeError, title=_("Negative Value"))
|
|
|
|
def _fix_rating_value(self):
|
|
for field in self.meta.get("fields", {"fieldtype": "Rating"}):
|
|
value = self.get(field.fieldname)
|
|
if not isinstance(value, float):
|
|
value = flt(value)
|
|
|
|
# Make sure rating is between 0 and 1
|
|
self.set(field.fieldname, max(0, min(value, 1)))
|
|
|
|
def validate_workflow(self):
|
|
"""Validate if the workflow transition is valid"""
|
|
if frappe.flags.in_install == "frappe":
|
|
return
|
|
workflow = self.meta.get_workflow()
|
|
if workflow:
|
|
validate_workflow(self)
|
|
if self._action != "save":
|
|
set_workflow_state_on_action(self, workflow, self._action)
|
|
|
|
def validate_set_only_once(self):
|
|
"""Validate that fields are not changed if not in insert"""
|
|
set_only_once_fields = self.meta.get_set_only_once_fields()
|
|
|
|
if set_only_once_fields and self._doc_before_save:
|
|
# document exists before saving
|
|
for field in set_only_once_fields:
|
|
fail = False
|
|
value = self.get(field.fieldname)
|
|
original_value = self._doc_before_save.get(field.fieldname)
|
|
|
|
if field.fieldtype in table_fields:
|
|
fail = not self.is_child_table_same(field.fieldname)
|
|
elif field.fieldtype in ("Date", "Datetime", "Time"):
|
|
fail = str(value) != str(original_value)
|
|
else:
|
|
fail = value != original_value
|
|
|
|
if fail:
|
|
frappe.throw(
|
|
_("Value cannot be changed for {0}").format(
|
|
frappe.bold(_(self.meta.get_label(field.fieldname)))
|
|
),
|
|
exc=frappe.CannotChangeConstantError,
|
|
)
|
|
|
|
return False
|
|
|
|
def is_child_table_same(self, fieldname):
|
|
"""Validate child table is same as original table before saving"""
|
|
|
|
if self.is_new():
|
|
return False
|
|
|
|
same = True
|
|
value = self.get(fieldname)
|
|
original_value = self._doc_before_save.get(fieldname)
|
|
|
|
if len(original_value) != len(value):
|
|
same = False
|
|
else:
|
|
# check all child entries
|
|
for i, d in enumerate(original_value):
|
|
new_child = value[i].as_dict(convert_dates_to_str=True)
|
|
original_child = d.as_dict(convert_dates_to_str=True)
|
|
|
|
# all fields must be same other than modified and modified_by
|
|
for key in ("modified", "modified_by", "creation"):
|
|
del new_child[key]
|
|
del original_child[key]
|
|
|
|
if original_child != new_child:
|
|
same = False
|
|
break
|
|
|
|
return same
|
|
|
|
def apply_fieldlevel_read_permissions(self):
|
|
"""Remove values the user is not allowed to read."""
|
|
if frappe.session.user == "Administrator":
|
|
return
|
|
|
|
all_fields = self.meta.fields.copy()
|
|
for table_field in self.meta.get_table_fields(include_computed=True):
|
|
all_fields += frappe.get_meta(table_field.options).fields or []
|
|
|
|
if all(df.permlevel == 0 for df in all_fields):
|
|
return
|
|
|
|
has_access_to = self.get_permlevel_access("read")
|
|
|
|
for df in self.meta.fields:
|
|
if df.permlevel and hasattr(self, df.fieldname) and df.permlevel not in has_access_to:
|
|
try:
|
|
delattr(self, df.fieldname)
|
|
except AttributeError:
|
|
# hasattr might return True for class attribute which can't be delattr-ed.
|
|
continue
|
|
|
|
for table_field in self.meta.get_table_fields(include_computed=True):
|
|
for df in frappe.get_meta(table_field.options).fields or []:
|
|
if df.permlevel and df.permlevel not in has_access_to:
|
|
for child in self.get(table_field.fieldname) or []:
|
|
if hasattr(child, df.fieldname):
|
|
delattr(child, df.fieldname)
|
|
|
|
def validate_higher_perm_levels(self):
|
|
"""If the user does not have permissions at permlevel > 0, then reset the values to original / default"""
|
|
if self.flags.ignore_permissions or frappe.flags.in_install:
|
|
return
|
|
|
|
if frappe.session.user == "Administrator":
|
|
return
|
|
|
|
has_access_to = self.get_permlevel_access()
|
|
high_permlevel_fields = self.meta.get_high_permlevel_fields()
|
|
|
|
mask_fields = self.meta.get_masked_fields()
|
|
|
|
if high_permlevel_fields or mask_fields:
|
|
self.reset_values_if_no_permlevel_access(has_access_to, high_permlevel_fields, mask_fields)
|
|
|
|
# If new record then don't reset the values for child table
|
|
if self.is_new():
|
|
return
|
|
|
|
# check for child tables
|
|
for df in self.meta.get_table_fields():
|
|
high_permlevel_fields = frappe.get_meta(df.options).get_high_permlevel_fields()
|
|
if high_permlevel_fields:
|
|
for d in self.get(df.fieldname):
|
|
d.reset_values_if_no_permlevel_access(has_access_to, high_permlevel_fields)
|
|
|
|
def get_permlevel_access(self, permission_type="write"):
|
|
allowed_permlevels = []
|
|
roles = frappe.get_roles()
|
|
|
|
for perm in self.get_permissions():
|
|
if perm.role in roles and perm.get(permission_type) and perm.permlevel not in allowed_permlevels:
|
|
allowed_permlevels.append(perm.permlevel)
|
|
|
|
return allowed_permlevels
|
|
|
|
def has_permlevel_access_to(self, fieldname, df=None, permission_type="read"):
|
|
if not df:
|
|
df = self.meta.get_field(fieldname)
|
|
|
|
return df.permlevel in self.get_permlevel_access(permission_type)
|
|
|
|
def get_permissions(self):
|
|
if self.meta.istable:
|
|
# use parent permissions
|
|
permissions = frappe.get_meta(self.parenttype).permissions
|
|
else:
|
|
permissions = self.meta.permissions
|
|
|
|
return permissions
|
|
|
|
def _set_defaults(self):
|
|
if frappe.flags.in_import:
|
|
return
|
|
|
|
if self.is_new():
|
|
new_doc = frappe.new_doc(self.doctype, as_dict=True)
|
|
self.update_if_missing(new_doc)
|
|
|
|
# children
|
|
for df in self.meta.get_table_fields():
|
|
new_doc = frappe.new_doc(df.options, parent_doc=self, parentfield=df.fieldname, as_dict=True)
|
|
value = self.get(df.fieldname)
|
|
if isinstance(value, list):
|
|
for d in value:
|
|
if d.is_new():
|
|
d.update_if_missing(new_doc)
|
|
|
|
def check_if_latest(self):
|
|
"""Checks if `modified` timestamp provided by document being updated is same as the
|
|
`modified` timestamp in the database. If there is a different, the document has been
|
|
updated in the database after the current copy was read. Will throw an error if
|
|
timestamps don't match.
|
|
|
|
Will also validate document transitions (Save > Submit > Cancel) calling
|
|
`self.check_docstatus_transition`."""
|
|
|
|
self.load_doc_before_save(raise_exception=True)
|
|
|
|
if not hasattr(self, "_action"):
|
|
self._action = "save"
|
|
|
|
previous = self._doc_before_save
|
|
# previous is None for new document insert
|
|
if not previous and self._action != "discard":
|
|
self.check_docstatus_transition(0)
|
|
return
|
|
|
|
if cstr(previous.modified) != cstr(self._original_modified):
|
|
frappe.msgprint(
|
|
_(f"Error: {self.name} ({self.doctype}) has been modified after you have opened it")
|
|
+ (f" ({previous.modified}, {self.modified}). ")
|
|
+ _("Please refresh to get the latest document."),
|
|
raise_exception=frappe.TimestampMismatchError,
|
|
)
|
|
|
|
if not self.meta.issingle and self._action != "discard":
|
|
self.check_docstatus_transition(previous.docstatus)
|
|
|
|
def check_docstatus_transition(self, from_docstatus):
|
|
"""Ensures valid `docstatus` transition.
|
|
Valid transitions are (number in brackets is `docstatus`):
|
|
|
|
- Save (0) > Save (0)
|
|
- Save (0) > Submit (1)
|
|
- Submit (1) > Submit (1)
|
|
- Submit (1) > Cancel (2)
|
|
|
|
"""
|
|
if self.flags.skip_docstatus_validation:
|
|
return
|
|
|
|
to_docstatus = self.docstatus
|
|
if from_docstatus == DocStatus.DRAFT:
|
|
if to_docstatus.is_draft():
|
|
self._action = "save"
|
|
elif to_docstatus.is_submitted():
|
|
if not getattr(self.meta, "is_submittable", False):
|
|
frappe.throw(
|
|
_("Cannot change docstatus of non submittable doctype {0}").format(self.doctype),
|
|
frappe.DocstatusTransitionError,
|
|
)
|
|
self._action = "submit"
|
|
self.check_permission("submit")
|
|
elif to_docstatus.is_cancelled():
|
|
raise frappe.DocstatusTransitionError(
|
|
_("Cannot change docstatus from 0 (Draft) to 2 (Cancelled)")
|
|
)
|
|
else:
|
|
raise frappe.ValidationError(_("Invalid docstatus"), to_docstatus)
|
|
|
|
elif from_docstatus == DocStatus.SUBMITTED:
|
|
if to_docstatus.is_submitted():
|
|
self._action = "update_after_submit"
|
|
self.check_permission("submit")
|
|
elif to_docstatus.is_cancelled():
|
|
self._action = "cancel"
|
|
self.check_permission("cancel")
|
|
elif to_docstatus.is_draft():
|
|
raise frappe.DocstatusTransitionError(
|
|
_("Cannot change docstatus from 1 (Submitted) to 0 (Draft)")
|
|
)
|
|
else:
|
|
raise frappe.ValidationError(_("Invalid docstatus"), to_docstatus)
|
|
|
|
elif from_docstatus == DocStatus.CANCELLED:
|
|
raise frappe.ValidationError(_("Cannot edit cancelled document"))
|
|
|
|
def set_parent_in_children(self):
|
|
"""Updates `parent` and `parenttype` property in all children."""
|
|
for d in self.get_all_children():
|
|
d.parent = self.name
|
|
d.parenttype = self.doctype
|
|
|
|
def set_name_in_children(self):
|
|
# Set name for any new children
|
|
for d in self.get_all_children():
|
|
if not d.name:
|
|
set_new_name(d)
|
|
|
|
def validate_update_after_submit(self):
|
|
if self.flags.ignore_validate_update_after_submit:
|
|
return
|
|
|
|
self._validate_update_after_submit()
|
|
for d in self.get_all_children():
|
|
if d.is_new() and self.meta.get_field(d.parentfield).allow_on_submit:
|
|
# in case of a new row, don't validate allow on submit, if table is allow on submit
|
|
continue
|
|
|
|
d._validate_update_after_submit()
|
|
|
|
# TODO check only allowed values are updated
|
|
|
|
def _validate_mandatory(self):
|
|
if self.flags.ignore_mandatory:
|
|
return
|
|
|
|
missing = self._get_missing_mandatory_fields()
|
|
for d in self.get_all_children():
|
|
missing.extend(d._get_missing_mandatory_fields())
|
|
|
|
if not missing:
|
|
return
|
|
|
|
for idx, msg in missing: # noqa: B007
|
|
msgprint(msg)
|
|
|
|
if frappe.flags.print_messages:
|
|
print(self.as_json().encode("utf-8"))
|
|
|
|
raise frappe.MandatoryError(
|
|
"[{doctype}, {name}]: {fields}".format(
|
|
fields=", ".join(each[0] for each in missing), doctype=self.doctype, name=self.name
|
|
)
|
|
)
|
|
|
|
def _prefetch_link_values(self):
|
|
"""Pre-fetch all link values including fetch_from fields for bulk validation.
|
|
|
|
This optimization collects all Link/Dynamic Link values from the doc tree,
|
|
then bulk-fetches them by doctype to eliminate N+1 queries.
|
|
"""
|
|
if self.flags.ignore_links or self._action == "cancel":
|
|
return
|
|
|
|
from collections import defaultdict
|
|
|
|
def _chunk(iterable, size):
|
|
"""Split iterable into chunks of given size."""
|
|
lst = list(iterable)
|
|
for i in range(0, len(lst), size):
|
|
yield lst[i : i + size]
|
|
|
|
self._link_value_cache = {}
|
|
docs_to_validate = [self, *self.get_all_children()]
|
|
|
|
# Collect: {doctype: {'names': set(), 'fields': set()}}
|
|
prefetch_map = defaultdict(lambda: {"names": set(), "fields": {"name"}})
|
|
|
|
for doc in docs_to_validate:
|
|
is_submittable = self.meta.is_submittable
|
|
link_fields = doc.meta.get_link_fields() + doc.meta.get(
|
|
"fields", {"fieldtype": ("=", "Dynamic Link")}
|
|
)
|
|
|
|
for df in link_fields:
|
|
docname = doc.get(df.fieldname)
|
|
if not docname:
|
|
continue
|
|
|
|
# Skip invalid docname types - let get_invalid_links handle the assertion
|
|
if not isinstance(docname, str | int):
|
|
continue
|
|
|
|
# Resolve target doctype
|
|
if df.fieldtype == "Link":
|
|
doctype = df.options
|
|
if not doctype:
|
|
continue
|
|
else: # Dynamic Link
|
|
doctype = doc.get(df.options)
|
|
if not doctype:
|
|
continue
|
|
|
|
prefetch_map[doctype]["names"].add(docname)
|
|
|
|
# Collect fetch_from fields - fetch ALL, let base_document handle fetch_if_empty
|
|
for fetch_df in doc.meta.get_fields_to_fetch(df.fieldname):
|
|
if fetch_df.get("fetch_from"):
|
|
source_field = fetch_df.fetch_from.split(".")[-1]
|
|
prefetch_map[doctype]["fields"].add(source_field)
|
|
|
|
# Add docstatus if needed
|
|
target_meta = frappe.get_meta(doctype)
|
|
if is_submittable and target_meta.is_submittable:
|
|
prefetch_map[doctype]["fields"].add("docstatus")
|
|
|
|
# Bulk fetch with chunking
|
|
for doctype, data in prefetch_map.items():
|
|
meta = frappe.get_meta(doctype)
|
|
names = list(data["names"])
|
|
fields = sorted(data["fields"]) # Sorted for deterministic cache key matching
|
|
|
|
# Skip if no names to fetch for this doctype
|
|
if not names:
|
|
continue
|
|
|
|
if meta.get("is_virtual"):
|
|
# Virtual doctypes: fetch individually
|
|
for name in names:
|
|
try:
|
|
values = frappe.get_doc(doctype, name).as_dict()
|
|
except frappe.DoesNotExistError:
|
|
values = None
|
|
self._link_value_cache.setdefault(doctype, {})[name] = values
|
|
|
|
elif getattr(meta, "issingle", 0):
|
|
# Single doctypes
|
|
values = frappe.db.get_singles_dict(doctype)
|
|
values["name"] = doctype
|
|
for name in names:
|
|
self._link_value_cache.setdefault(doctype, {})[name] = frappe._dict(values)
|
|
|
|
else:
|
|
# Regular doctypes: bulk fetch with chunking
|
|
result_dict = {}
|
|
field_tuple = tuple(fields)
|
|
|
|
for name_chunk in _chunk(names, 1000):
|
|
results = frappe.db.get_all(
|
|
doctype,
|
|
filters={"name": ("in", name_chunk)},
|
|
fields=fields,
|
|
)
|
|
for row in results:
|
|
result_dict[row.name] = row
|
|
# Link fields may carry "123" (text) while autoincrement doctypes return 123 (int);
|
|
# adding str(name) avoids false cache misses that surface as invalid-link errors.
|
|
result_dict[str(row.name)] = row
|
|
# Case-insensitive key for MariaDB compatibility (strings only)
|
|
if frappe.db.db_type == "mariadb" and isinstance(row.name, str):
|
|
result_dict[row.name.casefold()] = row
|
|
|
|
# Store results in both caches
|
|
for name in names:
|
|
if frappe.db.db_type == "mariadb" and isinstance(name, str):
|
|
cached_value = (
|
|
result_dict.get(name)
|
|
or result_dict.get(str(name))
|
|
or result_dict.get(name.casefold())
|
|
)
|
|
else:
|
|
cached_value = result_dict.get(name) or result_dict.get(str(name))
|
|
|
|
# Store in local document cache
|
|
self._link_value_cache.setdefault(doctype, {})[name] = cached_value
|
|
|
|
# Also populate global db.value_cache for cross-document caching
|
|
# Only for string names (matching get_values behavior at line 632)
|
|
if cached_value is not None and isinstance(name, str):
|
|
frappe.db.value_cache[doctype][name][field_tuple] = [cached_value]
|
|
|
|
def _validate_links(self):
|
|
if self.flags.ignore_links or self._action == "cancel":
|
|
return
|
|
|
|
# Pre-fetch all link values in bulk
|
|
self._prefetch_link_values()
|
|
link_cache = getattr(self, "_link_value_cache", None)
|
|
|
|
invalid_links, cancelled_links = self.get_invalid_links(link_value_cache=link_cache)
|
|
|
|
for d in self.get_all_children():
|
|
result = d.get_invalid_links(is_submittable=self.meta.is_submittable, link_value_cache=link_cache)
|
|
invalid_links.extend(result[0])
|
|
cancelled_links.extend(result[1])
|
|
|
|
if invalid_links:
|
|
msg = ", ".join(each[2] for each in invalid_links)
|
|
frappe.throw(_("Could not find {0}").format(msg), frappe.LinkValidationError)
|
|
|
|
if cancelled_links:
|
|
msg = ", ".join(each[2] for each in cancelled_links)
|
|
frappe.throw(_("Cannot link cancelled document: {0}").format(msg), frappe.CancelledLinkError)
|
|
|
|
def get_all_children(self, parenttype=None, *, include_computed=False) -> list["Document"]:
|
|
"""
|
|
Return all child documents from **Table** type fields in a list.
|
|
Excludes computed tables by default, unless `include_computed` is set to True.
|
|
"""
|
|
|
|
children = []
|
|
table_fieldnames = self._table_fieldnames if include_computed else self._non_computed_table_fieldnames
|
|
|
|
for fieldname, child_doctype in table_fieldnames.items():
|
|
if parenttype and child_doctype != parenttype:
|
|
continue
|
|
|
|
if value := self.get(fieldname):
|
|
children.extend(value)
|
|
|
|
return children
|
|
|
|
def run_method(self, method, *args, **kwargs):
|
|
"""run standard triggers, plus those in hooks"""
|
|
|
|
def fn(self, *args, **kwargs):
|
|
method_object = getattr(self, method, None)
|
|
|
|
# Cannot have a field with same name as method
|
|
# If method found in __dict__, expect it to be callable
|
|
if method in self.__dict__ or callable(method_object):
|
|
return method_object(*args, **kwargs)
|
|
|
|
fn.__name__ = str(method)
|
|
out = Document.hook(fn)(self, *args, **kwargs)
|
|
|
|
self.run_notifications(method)
|
|
run_webhooks(self, method)
|
|
run_server_script_for_doc_event(self, method)
|
|
|
|
return out
|
|
|
|
def run_trigger(self, method, *args, **kwargs):
|
|
return self.run_method(method, *args, **kwargs)
|
|
|
|
def run_notifications(self, method):
|
|
"""Run notifications for this method"""
|
|
if (
|
|
method == "onload"
|
|
or (frappe.flags.in_import and frappe.flags.mute_emails)
|
|
or frappe.flags.in_patch
|
|
or frappe.flags.in_install
|
|
):
|
|
return
|
|
|
|
if self.flags.notifications_executed is None:
|
|
self.flags.notifications_executed = []
|
|
|
|
from frappe.email.doctype.notification.notification import evaluate_alert
|
|
|
|
def _get_notifications():
|
|
"""Return enabled notifications for the current doctype."""
|
|
|
|
return frappe.get_all(
|
|
"Notification",
|
|
fields=["name", "event", "method"],
|
|
filters={"enabled": 1, "document_type": self.doctype},
|
|
)
|
|
|
|
notifications = frappe.client_cache.get_value(
|
|
f"notifications::{self.doctype}", generator=_get_notifications
|
|
)
|
|
|
|
if not notifications:
|
|
return
|
|
|
|
def _evaluate_alert(alert):
|
|
if alert.name in self.flags.notifications_executed:
|
|
return
|
|
|
|
evaluate_alert(self, alert.name, alert.event)
|
|
self.flags.notifications_executed.append(alert.name)
|
|
|
|
event_map = {
|
|
"on_update": "Save",
|
|
"after_insert": "New",
|
|
"on_submit": "Submit",
|
|
"on_cancel": "Cancel",
|
|
}
|
|
|
|
if not self.flags.in_insert and not self.flags.in_delete:
|
|
# value change is not applicable in insert
|
|
event_map["on_change"] = "Value Change"
|
|
|
|
for alert in notifications:
|
|
event = event_map.get(method, None)
|
|
if event and alert.event == event:
|
|
_evaluate_alert(alert)
|
|
elif alert.event == "Method" and method == alert.method:
|
|
_evaluate_alert(alert)
|
|
|
|
def _submit(self):
|
|
"""Submit the document. Sets `docstatus` = 1, then saves."""
|
|
self.docstatus = DocStatus.SUBMITTED
|
|
return self.save()
|
|
|
|
def _cancel(self):
|
|
"""Cancel the document. Sets `docstatus` = 2, then saves."""
|
|
self.docstatus = DocStatus.CANCELLED
|
|
return self.save()
|
|
|
|
def _rename(
|
|
self, name: str | int, merge: bool = False, force: bool = False, validate_rename: bool = True
|
|
):
|
|
"""Rename the document. Triggers frappe.rename_doc, then reloads."""
|
|
from frappe.model.rename_doc import rename_doc
|
|
|
|
self.name = rename_doc(doc=self, new=name, merge=merge, force=force, validate=validate_rename)
|
|
self.reload()
|
|
|
|
@frappe.whitelist()
|
|
def submit(self):
|
|
"""Submit the document. Sets `docstatus` = 1, then saves."""
|
|
return self._submit()
|
|
|
|
@frappe.whitelist()
|
|
def cancel(self):
|
|
"""Cancel the document. Sets `docstatus` = 2, then saves."""
|
|
return self._cancel()
|
|
|
|
@frappe.whitelist()
|
|
def discard(self):
|
|
"""Discard the draft document. Sets `docstatus` = 2 with db_set."""
|
|
self._action = "discard"
|
|
|
|
self.check_if_locked()
|
|
self.set_user_and_timestamp()
|
|
self.check_if_latest()
|
|
|
|
if not self.docstatus.is_draft():
|
|
raise frappe.ValidationError(_("Only draft documents can be discarded"), self.docstatus)
|
|
|
|
self.check_permission("write")
|
|
|
|
self.run_method("before_discard")
|
|
self.db_set("docstatus", DocStatus.CANCELLED)
|
|
delattr(self, "_action")
|
|
self.run_method("on_discard")
|
|
|
|
@frappe.whitelist()
|
|
def rename(self, name: str | int, merge: bool = False, force: bool = False, validate_rename: bool = True):
|
|
"""Rename the document to `name`. This transforms the current object."""
|
|
return self._rename(name=name, merge=merge, force=force, validate_rename=validate_rename)
|
|
|
|
def delete(self, ignore_permissions=False, force=False, *, delete_permanently=False):
|
|
"""Delete document."""
|
|
return frappe.delete_doc(
|
|
self.doctype,
|
|
self.name,
|
|
ignore_permissions=ignore_permissions,
|
|
flags=self.flags,
|
|
force=force,
|
|
delete_permanently=delete_permanently,
|
|
)
|
|
|
|
def run_before_save_methods(self):
|
|
"""Run standard methods before `INSERT` or `UPDATE`. Standard Methods are:
|
|
|
|
- `validate`, `before_save` for **Save**.
|
|
- `validate`, `before_submit` for **Submit**.
|
|
- `before_cancel` for **Cancel**
|
|
- `before_update_after_submit` for **Update after Submit**
|
|
|
|
Will also update title_field if set"""
|
|
|
|
self.reset_seen()
|
|
|
|
# before_validate method should be executed before ignoring validations
|
|
if self._action in ("save", "submit"):
|
|
self.run_method("before_validate")
|
|
|
|
if self.flags.ignore_validate:
|
|
return
|
|
|
|
if self._action == "save":
|
|
self.run_method("validate")
|
|
self.run_method("before_save")
|
|
elif self._action == "submit":
|
|
self.run_method("validate")
|
|
self.run_method("before_submit")
|
|
elif self._action == "cancel":
|
|
self.run_method("before_cancel")
|
|
elif self._action == "update_after_submit":
|
|
self.run_method("before_update_after_submit")
|
|
|
|
self.set_title_field()
|
|
|
|
def load_doc_before_save(self, *, raise_exception: bool = False):
|
|
"""load existing document from db before saving"""
|
|
|
|
self._doc_before_save = None
|
|
|
|
if self.is_new():
|
|
return
|
|
|
|
try:
|
|
self._doc_before_save = frappe.get_doc(self.doctype, self.name, for_update=True)
|
|
except frappe.DoesNotExistError:
|
|
if raise_exception:
|
|
raise
|
|
|
|
return frappe.clear_last_message()
|
|
|
|
for fieldname in self._non_computed_table_fieldnames:
|
|
for row in self.get(fieldname):
|
|
row._doc_before_save = next(
|
|
(d for d in (self._doc_before_save.get(fieldname) or []) if d.name == row.name), None
|
|
)
|
|
|
|
def run_post_save_methods(self):
|
|
"""Run standard methods after `INSERT` or `UPDATE`. Standard Methods are:
|
|
|
|
- `on_update` for **Save**.
|
|
- `on_update`, `on_submit` for **Submit**.
|
|
- `on_cancel` for **Cancel**
|
|
- `update_after_submit` for **Update after Submit**"""
|
|
|
|
if self._action == "save":
|
|
self.run_method("on_update")
|
|
elif self._action == "submit":
|
|
self.run_method("on_update")
|
|
self.run_method("on_submit")
|
|
elif self._action == "cancel":
|
|
self.run_method("on_cancel")
|
|
self.check_no_back_links_exist()
|
|
elif self._action == "update_after_submit":
|
|
self.run_method("on_update_after_submit")
|
|
|
|
if not (frappe.flags.in_import and self.is_new()):
|
|
self.clear_cache()
|
|
|
|
if self.flags.get("notify_update", True):
|
|
self.notify_update()
|
|
|
|
update_global_search(self)
|
|
|
|
self.save_version()
|
|
|
|
self.run_method("on_change")
|
|
|
|
if (self.doctype, self.name) in frappe.flags.currently_saving:
|
|
frappe.flags.currently_saving.remove((self.doctype, self.name))
|
|
|
|
def clear_cache(self):
|
|
frappe.clear_document_cache(self.doctype, self.name)
|
|
|
|
def reset_seen(self):
|
|
"""Clear _seen property and set current user as seen"""
|
|
if (
|
|
getattr(self.meta, "track_seen", False)
|
|
and not getattr(self.meta, "issingle", False)
|
|
and not self.is_new()
|
|
):
|
|
frappe.db.set_value(
|
|
self.doctype, self.name, "_seen", json.dumps([frappe.session.user]), update_modified=False
|
|
)
|
|
|
|
def notify_update(self):
|
|
"""Publish realtime that the current document is modified"""
|
|
if (
|
|
frappe.flags.in_import
|
|
or frappe.flags.in_patch
|
|
or frappe.flags.in_migrate
|
|
or frappe.flags.in_install
|
|
):
|
|
return
|
|
|
|
frappe.publish_realtime(
|
|
"doc_update",
|
|
{"modified": self.modified, "doctype": self.doctype, "name": self.name},
|
|
doctype=self.doctype,
|
|
docname=self.name,
|
|
after_commit=True,
|
|
)
|
|
|
|
if not self.meta.get("read_only") and not self.meta.get("issingle") and not self.meta.get("istable"):
|
|
data = {"doctype": self.doctype, "name": self.name, "user": frappe.session.user}
|
|
frappe.publish_realtime("list_update", data, after_commit=True)
|
|
|
|
def db_set(self, fieldname, value=None, update_modified=True, notify=False, commit=False):
|
|
"""Set a value in the document object, update the timestamp and update the database.
|
|
|
|
WARNING: This method does not trigger controller validations and should
|
|
be used very carefully.
|
|
|
|
:param fieldname: fieldname of the property to be updated, or a {"field":"value"} dictionary
|
|
:param value: value of the property to be updated
|
|
:param update_modified: default True. updates the `modified` and `modified_by` properties
|
|
:param notify: default False. run doc.notify_update() to send updates via socketio
|
|
:param commit: default False. run frappe.db.commit()
|
|
"""
|
|
if isinstance(fieldname, dict):
|
|
self.update(fieldname)
|
|
else:
|
|
self.set(fieldname, value)
|
|
|
|
if update_modified and (self.doctype, self.name) not in frappe.flags.currently_saving:
|
|
# don't update modified timestamp if called from post save methods
|
|
# like on_update or on_submit
|
|
self.set("modified", now())
|
|
self.set("modified_by", frappe.session.user)
|
|
|
|
# load but do not reload doc_before_save because before_change or on_change might expect it
|
|
if not self.get_doc_before_save() and not self.meta.istable:
|
|
self.load_doc_before_save()
|
|
|
|
# to trigger notification on value change
|
|
self.run_method("before_change")
|
|
|
|
if self.name is None:
|
|
return
|
|
|
|
if self.meta.issingle:
|
|
frappe.db.set_single_value(
|
|
self.doctype,
|
|
fieldname,
|
|
value,
|
|
modified=self.modified,
|
|
modified_by=self.modified_by,
|
|
update_modified=update_modified,
|
|
)
|
|
else:
|
|
frappe.db.set_value(
|
|
self.doctype,
|
|
self.name,
|
|
fieldname,
|
|
value,
|
|
self.modified,
|
|
self.modified_by,
|
|
update_modified=update_modified,
|
|
)
|
|
|
|
self.run_method("on_change")
|
|
|
|
if notify:
|
|
self.notify_update()
|
|
|
|
if commit:
|
|
frappe.db.commit()
|
|
|
|
def db_get(self, fieldname):
|
|
"""get database value for this fieldname"""
|
|
return frappe.db.get_value(self.doctype, self.name, fieldname)
|
|
|
|
def check_no_back_links_exist(self):
|
|
"""Check if document links to any active document before Cancel."""
|
|
from frappe.model.delete_doc import check_if_doc_is_dynamically_linked, check_if_doc_is_linked
|
|
|
|
if not self.flags.ignore_links:
|
|
check_if_doc_is_linked(self, method="Cancel")
|
|
check_if_doc_is_dynamically_linked(self, method="Cancel")
|
|
|
|
def save_version(self):
|
|
"""Save version info"""
|
|
|
|
# don't track version under following conditions
|
|
if (
|
|
not getattr(self.meta, "track_changes", False)
|
|
or self.doctype == "Version"
|
|
or self.flags.ignore_version
|
|
or frappe.flags.in_install
|
|
or (not self._doc_before_save and frappe.flags.in_patch)
|
|
):
|
|
return
|
|
|
|
doc_to_compare = self._doc_before_save
|
|
if not doc_to_compare and (amended_from := self.get("amended_from")):
|
|
doc_to_compare = frappe.get_doc(self.doctype, amended_from)
|
|
|
|
version = frappe.new_doc("Version")
|
|
|
|
if not doc_to_compare and not self.flags.updater_reference:
|
|
return
|
|
|
|
if version.update_version_info(doc_to_compare, self):
|
|
version.insert(ignore_permissions=True)
|
|
|
|
@staticmethod
|
|
def hook(f):
|
|
"""Decorator: Make method `hookable` (i.e. extensible by another app).
|
|
|
|
Note: If each hooked method returns a value (dict), then all returns are
|
|
collated in one dict and returned. Ideally, don't return values in hookable
|
|
methods, set properties in the document."""
|
|
|
|
def add_to_return_value(self, new_return_value):
|
|
if new_return_value is None:
|
|
self._return_value = self.get("_return_value")
|
|
return
|
|
|
|
if isinstance(new_return_value, dict):
|
|
if not self.get("_return_value"):
|
|
self._return_value = {}
|
|
self._return_value.update(new_return_value)
|
|
else:
|
|
self._return_value = new_return_value
|
|
|
|
def compose(fn, *hooks):
|
|
def runner(self, method, *args, **kwargs):
|
|
add_to_return_value(self, fn(self, *args, **kwargs))
|
|
for f in hooks:
|
|
try:
|
|
frappe.db._disable_transaction_control += 1
|
|
add_to_return_value(self, f(self, method, *args, **kwargs))
|
|
finally:
|
|
frappe.db._disable_transaction_control -= 1
|
|
|
|
return self.__dict__.pop("_return_value", None)
|
|
|
|
return runner
|
|
|
|
def composer(self, *args, **kwargs):
|
|
hooks = []
|
|
method = f.__name__
|
|
doc_events = frappe.get_doc_hooks()
|
|
for handler in doc_events.get(self.doctype, {}).get(method, []) + doc_events.get("*", {}).get(
|
|
method, []
|
|
):
|
|
hooks.append(frappe.get_attr(handler))
|
|
|
|
composed = compose(f, *hooks)
|
|
return composed(self, method, *args, **kwargs)
|
|
|
|
return composer
|
|
|
|
def is_whitelisted(self, method_name):
|
|
method = getattr(self, method_name, None)
|
|
if not method:
|
|
raise NotFound(f"Method {method_name} not found")
|
|
|
|
is_whitelisted(getattr(method, "__func__", method))
|
|
|
|
def validate_value(self, fieldname, condition, val2, doc=None, raise_exception=None):
|
|
"""Check that value of fieldname should be 'condition' val2
|
|
else throw Exception."""
|
|
if not doc:
|
|
doc = self
|
|
|
|
val1 = doc.get_value(fieldname)
|
|
|
|
df = doc.meta.get_field(fieldname)
|
|
val2 = doc.cast(val2, df)
|
|
|
|
if not compare(val1, condition, val2):
|
|
label = doc.meta.get_label(fieldname)
|
|
if doc.get("parentfield"):
|
|
msg = _("Incorrect value in row {0}:").format(doc.idx)
|
|
else:
|
|
msg = _("Incorrect value:")
|
|
|
|
if condition == "in":
|
|
msg += _("{0} must be one of {1}").format(label, val2)
|
|
elif condition == "not in":
|
|
msg += _("{0} must be none of {1}").format(label, val2)
|
|
elif condition == "^":
|
|
msg += _("{0} must be beginning with '{1}'").format(label, val2)
|
|
elif condition == "=":
|
|
msg += _("{0} must be equal to '{1}'").format(label, val2)
|
|
else:
|
|
msg += _("{0} must be {1} {2}").format(label, condition, val2)
|
|
|
|
# raise passed exception or True
|
|
msgprint(msg, raise_exception=raise_exception or True)
|
|
|
|
def validate_table_has_rows(self, parentfield, raise_exception=None):
|
|
"""Raise exception if Table field is empty."""
|
|
if not (isinstance(self.get(parentfield), list) and len(self.get(parentfield)) > 0):
|
|
label = _(self.meta.get_label(parentfield))
|
|
frappe.throw(
|
|
_("Table {0} cannot be empty").format(label), raise_exception or frappe.EmptyTableError
|
|
)
|
|
|
|
def round_floats_in(self, doc, fieldnames=None, do_not_round_fields=None):
|
|
"""Round floats for all `Currency`, `Float`, `Percent` fields for the given doc.
|
|
|
|
:param doc: Document whose numeric properties are to be rounded.
|
|
:param fieldnames: [Optional] List of fields to be rounded."""
|
|
if not fieldnames:
|
|
fieldnames = (
|
|
df.fieldname
|
|
for df in doc.meta.get("fields", {"fieldtype": ["in", ["Currency", "Float", "Percent"]]})
|
|
)
|
|
|
|
# PERF: flt internally has to resolve this if we don't specify it.
|
|
rounding_method = frappe.get_system_settings("rounding_method")
|
|
for fieldname in fieldnames:
|
|
if do_not_round_fields and fieldname in do_not_round_fields:
|
|
continue
|
|
|
|
doc.set(
|
|
fieldname,
|
|
flt(
|
|
doc.get(fieldname),
|
|
self.precision(fieldname, doc.get("parentfield")),
|
|
rounding_method=rounding_method,
|
|
),
|
|
)
|
|
|
|
def get_url(self):
|
|
"""Return Desk URL for this document."""
|
|
return get_absolute_url(self.doctype, self.name)
|
|
|
|
@frappe.whitelist()
|
|
def add_comment(
|
|
self,
|
|
comment_type: str = "Comment",
|
|
text: str | None = None,
|
|
comment_email: str | None = None,
|
|
comment_by: str | None = None,
|
|
):
|
|
"""Add a comment to this document.
|
|
|
|
:param comment_type: e.g. `Comment`. See Communication for more info."""
|
|
|
|
return frappe.get_doc(
|
|
{
|
|
"doctype": "Comment",
|
|
"comment_type": comment_type,
|
|
"comment_email": comment_email or frappe.session.user,
|
|
"comment_by": comment_by,
|
|
"reference_doctype": self.doctype,
|
|
"reference_name": self.name,
|
|
"content": text or comment_type,
|
|
}
|
|
).insert(ignore_permissions=True)
|
|
|
|
def add_seen(self, user=None):
|
|
"""add the given/current user to list of users who have seen this document (_seen)"""
|
|
if not user:
|
|
user = frappe.session.user
|
|
|
|
if self.meta.track_seen and not frappe.flags.read_only and not self.meta.issingle:
|
|
_seen = self.get("_seen") or []
|
|
_seen = frappe.parse_json(_seen)
|
|
|
|
if user not in _seen:
|
|
_seen.append(user)
|
|
commit_after_response(
|
|
lambda: frappe.db.set_value(
|
|
self.doctype, self.name, "_seen", json.dumps(_seen), update_modified=False
|
|
)
|
|
)
|
|
|
|
def add_viewed(self, user=None, force=False, unique_views=False):
|
|
"""Add a view log for the current document"""
|
|
|
|
if not (getattr(self.meta, "track_views", False) or force):
|
|
return
|
|
|
|
user = user or frappe.session.user
|
|
|
|
if unique_views and frappe.db.exists(
|
|
"View Log", {"reference_doctype": self.doctype, "reference_name": self.name, "viewed_by": user}
|
|
):
|
|
return
|
|
|
|
view_log = frappe.get_doc(
|
|
{
|
|
"doctype": "View Log",
|
|
"viewed_by": user,
|
|
"reference_doctype": self.doctype,
|
|
"reference_name": self.name,
|
|
}
|
|
)
|
|
if frappe.flags.read_only:
|
|
view_log.deferred_insert()
|
|
else:
|
|
commit_after_response(lambda: view_log.insert(ignore_permissions=True))
|
|
|
|
return view_log
|
|
|
|
def log_error(self, title=None, message=None, *, defer_insert=False):
|
|
"""Helper function to create an Error Log"""
|
|
return frappe.log_error(
|
|
message=message,
|
|
title=title,
|
|
reference_doctype=self.doctype,
|
|
reference_name=self.name,
|
|
defer_insert=defer_insert,
|
|
)
|
|
|
|
def get_signature(self):
|
|
"""Return signature (hash) for private URL."""
|
|
return hashlib.sha224(f"{self.doctype}:{self.name}".encode(), usedforsecurity=False).hexdigest()
|
|
|
|
def get_document_share_key(self, expires_on=None, no_expiry=False):
|
|
if no_expiry:
|
|
expires_on = None
|
|
|
|
existing_key = frappe.db.exists(
|
|
"Document Share Key",
|
|
{
|
|
"reference_doctype": self.doctype,
|
|
"reference_docname": self.name,
|
|
"expires_on": expires_on,
|
|
},
|
|
)
|
|
if existing_key:
|
|
doc = frappe.get_doc("Document Share Key", existing_key)
|
|
else:
|
|
doc = frappe.new_doc("Document Share Key")
|
|
doc.reference_doctype = self.doctype
|
|
doc.reference_docname = self.name
|
|
doc.expires_on = expires_on
|
|
doc.flags.no_expiry = no_expiry
|
|
doc.insert(ignore_permissions=True)
|
|
|
|
return doc.key
|
|
|
|
def get_liked_by(self):
|
|
liked_by = getattr(self, "_liked_by", None)
|
|
if liked_by:
|
|
return json.loads(liked_by)
|
|
else:
|
|
return []
|
|
|
|
@property
|
|
def __onload(self):
|
|
onload = self.get("__onload")
|
|
if onload is None:
|
|
onload = frappe._dict()
|
|
self.set("__onload", onload)
|
|
|
|
return onload
|
|
|
|
def set_onload(self, key, value):
|
|
self.__onload[key] = value
|
|
|
|
def get_onload(self, key=None):
|
|
return self.__onload[key] if key else self.__onload
|
|
|
|
def queue_action(self, action, **kwargs):
|
|
"""Run an action in background. If the action has an inner function,
|
|
like _submit for submit, it will call that instead"""
|
|
# call _submit instead of submit, so you can override submit to call
|
|
# run_delayed based on some action
|
|
# See: Stock Reconciliation
|
|
from frappe.utils.background_jobs import enqueue
|
|
|
|
if hasattr(self, f"_{action}"):
|
|
action = f"_{action}"
|
|
|
|
self.check_if_locked()
|
|
self.lock()
|
|
|
|
enqueue_after_commit = kwargs.pop("enqueue_after_commit", None)
|
|
if enqueue_after_commit is None:
|
|
enqueue_after_commit = True
|
|
|
|
return enqueue(
|
|
"frappe.model.document.execute_action",
|
|
__doctype=self.doctype,
|
|
__name=self.name,
|
|
__action=action,
|
|
enqueue_after_commit=enqueue_after_commit,
|
|
**kwargs,
|
|
)
|
|
|
|
def lock(self, timeout=None):
|
|
"""Creates a lock file for the given document. If timeout is set,
|
|
it will retry every 1 second for acquiring the lock again
|
|
|
|
:param timeout: Timeout in seconds, default 0"""
|
|
signature = self.get_signature()
|
|
if file_lock.lock_exists(signature):
|
|
lock_exists = True
|
|
if file_lock.lock_age(signature) > DOCUMENT_LOCK_EXPIRY:
|
|
file_lock.delete_lock(signature)
|
|
lock_exists = False
|
|
if timeout:
|
|
for _ in range(timeout):
|
|
time.sleep(1)
|
|
if not file_lock.lock_exists(signature):
|
|
lock_exists = False
|
|
break
|
|
if lock_exists:
|
|
raise frappe.DocumentLockedError
|
|
file_lock.create_lock(signature)
|
|
frappe.local.locked_documents.append(self)
|
|
|
|
def unlock(self):
|
|
"""Delete the lock file for this document"""
|
|
file_lock.delete_lock(self.get_signature())
|
|
if self in frappe.local.locked_documents:
|
|
frappe.local.locked_documents.remove(self)
|
|
|
|
def validate_from_to_dates(self, from_date_field: str, to_date_field: str) -> None:
|
|
"""Validate that the value of `from_date_field` is not later than the value of `to_date_field`."""
|
|
from_date = self.get(from_date_field)
|
|
to_date = self.get(to_date_field)
|
|
if not (from_date and to_date):
|
|
return
|
|
|
|
if date_diff(to_date, from_date) < 0:
|
|
table_row = ""
|
|
if self.meta.istable:
|
|
table_row = (
|
|
_("{0} row #{1}:").format(
|
|
_(frappe.unscrub(self.parentfield)),
|
|
self.idx,
|
|
)
|
|
+ " "
|
|
)
|
|
|
|
frappe.throw(
|
|
table_row
|
|
+ _("{0} must be after {1}").format(
|
|
frappe.bold(_(self.meta.get_label(to_date_field))),
|
|
frappe.bold(_(self.meta.get_label(from_date_field))),
|
|
),
|
|
frappe.exceptions.InvalidDates,
|
|
)
|
|
|
|
def get_assigned_users(self):
|
|
assigned_users = frappe.get_all(
|
|
"ToDo",
|
|
fields=["allocated_to"],
|
|
filters={
|
|
"reference_type": self.doctype,
|
|
"reference_name": self.name,
|
|
"status": ("!=", "Cancelled"),
|
|
},
|
|
pluck="allocated_to",
|
|
)
|
|
|
|
return set(assigned_users)
|
|
|
|
def add_tag(self, tag):
|
|
"""Add a Tag to this document"""
|
|
from frappe.desk.doctype.tag.tag import DocTags
|
|
|
|
DocTags(self.doctype).add(self.name, tag)
|
|
|
|
def remove_tag(self, tag):
|
|
"""Remove a Tag to this document"""
|
|
from frappe.desk.doctype.tag.tag import DocTags
|
|
|
|
DocTags(self.doctype).remove(self.name, tag)
|
|
|
|
def get_tags(self):
|
|
"""Return a list of Tags attached to this document"""
|
|
from frappe.desk.doctype.tag.tag import DocTags
|
|
|
|
return DocTags(self.doctype).get_tags(self.name).split(",")[1:]
|
|
|
|
def deferred_insert(self) -> None:
|
|
"""Push the document to redis temporarily and insert later.
|
|
|
|
WARN: This doesn't guarantee insertion as redis can be restarted
|
|
before data is flushed to database.
|
|
"""
|
|
|
|
from frappe.deferred_insert import deferred_insert
|
|
|
|
self.set_user_and_timestamp()
|
|
|
|
doc = self.get_valid_dict(convert_dates_to_str=True, ignore_virtual=True)
|
|
deferred_insert(doctype=self.doctype, records=doc)
|
|
|
|
def __str__(self):
|
|
return f"{self.doctype} ({self.name or 'unsaved'})"
|
|
|
|
def __repr__(self):
|
|
doctype = f"doctype={self.doctype}"
|
|
name = self.name or "unsaved"
|
|
docstatus = f" docstatus={self.docstatus}" if self.docstatus else ""
|
|
parent = f" parent={self.parent}" if getattr(self, "parent", None) else ""
|
|
|
|
return f"<{self.__class__.__name__}: {doctype} {name}{docstatus}{parent}>"
|
|
|
|
|
|
def execute_action(__doctype, __name, __action, **kwargs):
|
|
"""Execute an action on a document (called by background worker)"""
|
|
doc = frappe.get_doc(__doctype, __name)
|
|
doc.unlock()
|
|
try:
|
|
getattr(doc, __action)(**kwargs)
|
|
except Exception:
|
|
frappe.db.rollback()
|
|
|
|
# add a comment (?)
|
|
if frappe.message_log:
|
|
msg = frappe.message_log[-1].get("message")
|
|
else:
|
|
msg = "<pre><code>" + frappe.get_traceback() + "</pre></code>"
|
|
|
|
doc.add_comment("Comment", _("Action Failed") + "<br><br>" + msg)
|
|
doc.notify_update()
|
|
|
|
|
|
def bulk_insert(
|
|
doctype: str,
|
|
documents: Iterable["Document"],
|
|
ignore_duplicates: bool = False,
|
|
chunk_size=1000,
|
|
commit_chunks=False,
|
|
):
|
|
"""Insert simple Documents objects to database in bulk.
|
|
|
|
Warning/Info:
|
|
- All documents are inserted without triggering ANY hooks.
|
|
- This function assumes you've done the due dilligence and inserts in similar fashion as db_insert
|
|
- Documents can be any iterable / generator containing Document objects
|
|
"""
|
|
|
|
doctype_meta = frappe.get_meta(doctype)
|
|
|
|
valid_column_map = {
|
|
doctype: doctype_meta.get_valid_columns(),
|
|
}
|
|
|
|
child_table_fields = doctype_meta.get_table_fields()
|
|
for child_table in child_table_fields:
|
|
valid_column_map[child_table.options] = frappe.get_meta(child_table.options).get_valid_columns()
|
|
|
|
documents = iter(documents)
|
|
while document_batch := list(itertools.islice(documents, chunk_size)):
|
|
values_map = {
|
|
doctype: _document_values_generator(document_batch, valid_column_map[doctype]),
|
|
}
|
|
|
|
for child_table in child_table_fields:
|
|
values_map[child_table.options] = _document_values_generator(
|
|
[
|
|
ch_doc
|
|
for ch_doc in (
|
|
child_docs for doc in document_batch for child_docs in doc.get(child_table.fieldname)
|
|
)
|
|
],
|
|
valid_column_map[child_table.options],
|
|
)
|
|
|
|
for dt, docs in values_map.items():
|
|
frappe.db.bulk_insert(dt, valid_column_map[dt], docs, ignore_duplicates=ignore_duplicates)
|
|
|
|
if commit_chunks:
|
|
frappe.db.commit()
|
|
|
|
|
|
def _document_values_generator(
|
|
documents: Iterable["Document"],
|
|
columns: list[str],
|
|
) -> Generator[tuple[Any]]:
|
|
for doc in documents:
|
|
doc.creation = doc.modified = now()
|
|
doc.owner = doc.modified_by = frappe.session.user
|
|
doc_values = doc.get_valid_dict(
|
|
convert_dates_to_str=True,
|
|
ignore_nulls=True,
|
|
ignore_virtual=True,
|
|
)
|
|
yield tuple(doc_values.get(col) for col in columns)
|
|
|
|
|
|
@frappe.whitelist()
|
|
def unlock_document(doctype: str, name: str):
|
|
frappe.get_lazy_doc(doctype, name).unlock()
|
|
frappe.msgprint(frappe._("Document Unlocked"), alert=True)
|
|
|
|
|
|
def get_lazy_controller(doctype):
|
|
lazy_controllers = frappe.lazy_controllers.setdefault(frappe.local.site, {})
|
|
if doctype not in lazy_controllers:
|
|
meta = frappe.get_meta(doctype)
|
|
original_controller = get_controller(doctype)
|
|
if meta.is_virtual: # not supported
|
|
lazy_controllers[doctype] = original_controller
|
|
warnings.warn("Virtual doctypes don't support lazy loading", stacklevel=2)
|
|
return original_controller
|
|
|
|
# Dynamically construct a class that subclasses LazyDocument and original controller.
|
|
lazy_controller = type(f"Lazy{original_controller.__name__}", (LazyDocument, original_controller), {})
|
|
for df in meta.get_table_fields():
|
|
setattr(lazy_controller, df.fieldname, LazyChildTable(df.fieldname, df.options))
|
|
|
|
lazy_controllers[doctype] = lazy_controller
|
|
return lazy_controllers[doctype]
|
|
|
|
|
|
class LazyDocument:
|
|
"""Mixin for Document class that implments lazy loading for child tables."""
|
|
|
|
@override
|
|
def load_children_from_db(self: Document):
|
|
"""Override Document which eagerly loads child tables"""
|
|
# This is a map of loaded children, it should get erased whenever load_children_from_db is
|
|
# called to allow reloading lazily again.
|
|
for fieldname in self._table_fieldnames:
|
|
self.__dict__.pop(fieldname, None)
|
|
|
|
@override
|
|
def get(self: Document, key, filters=None, limit=None, default=None):
|
|
# Ensure that table descriptor is triggered at least once
|
|
if isinstance(key, str) and key in self._table_fieldnames:
|
|
getattr(self, key, None)
|
|
return super().get(key, filters, limit, default)
|
|
|
|
@override
|
|
def append(self, key: str, value: D | dict | None = None, position: int = -1) -> D:
|
|
# Ensure that table descriptor is triggered at least once
|
|
# key is assumed to be a table fieldname (as expected by BaseDocument.append)
|
|
getattr(self, key, None)
|
|
return super().append(key, value, position)
|
|
|
|
@override
|
|
def db_update_all(self):
|
|
self.db_update()
|
|
for fieldname in self._table_fieldnames:
|
|
if fieldname not in self.__dict__:
|
|
# Not fetched, can't possibly change so no need to update
|
|
continue
|
|
for doc in self.get(fieldname):
|
|
doc.db_update()
|
|
|
|
@override
|
|
def init_child_tables(self):
|
|
# Avoid initializing anything, descriptor handles it.
|
|
return
|
|
|
|
|
|
class LazyChildTable:
|
|
__slots__ = ("doctype", "fieldname")
|
|
|
|
def __init__(self, fieldname: str, doctype: str) -> None:
|
|
self.fieldname = fieldname
|
|
self.doctype = doctype
|
|
|
|
def __get__(self, doc: Document, objtype=None):
|
|
# Note: avoid any high level access here, can cause recursion
|
|
fieldname = self.fieldname
|
|
__dict = doc.__dict__
|
|
assert fieldname not in __dict, "Descriptor should not override existing values"
|
|
children = doc._load_child_table_from_db(fieldname, self.doctype) or []
|
|
__dict[fieldname] = []
|
|
# Update __dict__ and convert to Document objects
|
|
doc.extend(fieldname, children)
|
|
return __dict[fieldname]
|
|
|
|
# Note: Don't implement __set__ method! https://docs.python.org/3/howto/descriptor.html#descriptor-protocol
|
|
|
|
|
|
def copy_doc(doc: "Document", ignore_no_copy: bool = True) -> "Document":
|
|
"""No_copy fields also get copied."""
|
|
import copy
|
|
from types import MappingProxyType
|
|
|
|
from frappe.model.base_document import BaseDocument
|
|
|
|
def remove_no_copy_fields(d):
|
|
for df in d.meta.get("fields", {"no_copy": 1}):
|
|
if hasattr(d, df.fieldname):
|
|
d.set(df.fieldname, None)
|
|
|
|
fields_to_clear = ["name", "owner", "creation", "modified", "modified_by"]
|
|
|
|
if not frappe.in_test:
|
|
fields_to_clear.append("docstatus")
|
|
|
|
if isinstance(doc, BaseDocument):
|
|
d = doc.as_dict()
|
|
elif isinstance(doc, MappingProxyType): # global test record
|
|
d = dict(doc)
|
|
else:
|
|
d = doc
|
|
|
|
newdoc = get_doc(copy.deepcopy(d))
|
|
newdoc.set("__islocal", 1)
|
|
for fieldname in [*fields_to_clear, "amended_from", "amendment_date"]:
|
|
newdoc.set(fieldname, None)
|
|
|
|
if not ignore_no_copy:
|
|
remove_no_copy_fields(newdoc)
|
|
|
|
for d in newdoc.get_all_children():
|
|
d.set("__islocal", 1)
|
|
|
|
for fieldname in fields_to_clear:
|
|
d.set(fieldname, None)
|
|
|
|
if not ignore_no_copy:
|
|
remove_no_copy_fields(d)
|
|
|
|
return newdoc
|
|
|
|
|
|
def new_doc(
|
|
doctype: str,
|
|
*,
|
|
parent_doc: "Document" | None = None,
|
|
parentfield: str | None = None,
|
|
as_dict: bool = False,
|
|
**kwargs,
|
|
) -> "Document":
|
|
"""Return a new document of the given DocType with defaults set.
|
|
|
|
:param doctype: DocType of the new document.
|
|
:param parent_doc: [optional] add to parent document.
|
|
:param parentfield: [optional] add against this `parentfield`.
|
|
:param as_dict: [optional] return as dictionary instead of Document.
|
|
:param kwargs: [optional] You can specify fields as field=value pairs in function call.
|
|
"""
|
|
|
|
from frappe.model.create_new import get_new_doc
|
|
|
|
new_doc = get_new_doc(doctype, parent_doc, parentfield, as_dict=as_dict)
|
|
|
|
return new_doc.update(kwargs)
|
|
|
|
|
|
def get_cached_doc(*args: Any, **kwargs: Any) -> "Document":
|
|
"""Identical to `frappe.get_doc`, but return from cache if available."""
|
|
if (key := can_cache_doc(args)) and (doc := frappe.cache.get_value(key)):
|
|
return doc
|
|
|
|
# Not found in cache, fetch from DB
|
|
doc = get_doc(*args, **kwargs)
|
|
|
|
# Store in cache
|
|
if not key:
|
|
key = get_document_cache_key(doc.doctype, doc.name)
|
|
|
|
_set_document_in_cache(key, doc)
|
|
|
|
return doc
|
|
|
|
|
|
def _set_document_in_cache(key: str, doc: "Document") -> None:
|
|
frappe.cache.set_value(key, doc, expires_in_sec=3600)
|
|
|
|
|
|
def can_cache_doc(args) -> str | None:
|
|
"""
|
|
Determine if document should be cached based on get_doc params.
|
|
Return cache key if doc can be cached, None otherwise.
|
|
"""
|
|
|
|
if not args:
|
|
return
|
|
|
|
doctype = args[0]
|
|
name = doctype if len(args) == 1 or args[1] is None else args[1]
|
|
|
|
# Only cache if both doctype and name are strings
|
|
if isinstance(doctype, str) and isinstance(name, str):
|
|
return get_document_cache_key(doctype, name)
|
|
|
|
|
|
def get_document_cache_key(doctype: str, name: str):
|
|
return f"document_cache::{doctype}::{name}"
|
|
|
|
|
|
def clear_document_cache(doctype: str, name: str | None = None) -> None:
|
|
frappe.db.value_cache.pop(doctype, None)
|
|
|
|
def clear_in_redis():
|
|
if name is not None:
|
|
frappe.cache.delete_value(get_document_cache_key(doctype, name))
|
|
else:
|
|
frappe.cache.delete_keys(get_document_cache_key(doctype, ""))
|
|
|
|
clear_in_redis()
|
|
if hasattr(frappe.db, "after_commit"):
|
|
frappe.db.after_commit.add(clear_in_redis)
|
|
frappe.db.after_rollback.add(clear_in_redis)
|
|
|
|
if doctype == "System Settings" and hasattr(frappe.local, "system_settings"):
|
|
delattr(frappe.local, "system_settings")
|
|
|
|
if doctype == "Website Settings" and hasattr(frappe.local, "website_settings"):
|
|
delattr(frappe.local, "website_settings")
|
|
|
|
|
|
def get_cached_value(
|
|
doctype: str, name: str | dict, fieldname: str | Iterable[str] = "name", as_dict: bool = False
|
|
) -> Any:
|
|
try:
|
|
doc = get_cached_doc(doctype, name)
|
|
except frappe.DoesNotExistError:
|
|
frappe.clear_last_message()
|
|
return
|
|
|
|
if isinstance(fieldname, str):
|
|
if as_dict:
|
|
frappe.throw("Cannot make dict for single fieldname")
|
|
return doc.get(fieldname)
|
|
|
|
values = [doc.get(f) for f in fieldname]
|
|
if as_dict:
|
|
return frappe._dict(zip(fieldname, values, strict=False))
|
|
return values
|
|
|
|
|
|
def get_single_value(setting: str, fieldname: str, /, *, as_dict: bool = False):
|
|
"""Return the cached value associated with the given fieldname from single DocType.
|
|
|
|
Usage:
|
|
telemetry_enabled = frappe.get_single_value("System Settings", "telemetry_enabled")
|
|
"""
|
|
return get_cached_value(setting, setting, fieldname=fieldname, as_dict=as_dict)
|
|
|
|
|
|
def get_last_doc(
|
|
doctype,
|
|
filters: FilterSignature | None = None,
|
|
order_by="creation desc",
|
|
*,
|
|
for_update=False,
|
|
):
|
|
"""Get last created document of this type."""
|
|
d = frappe.get_all(doctype, filters=filters, limit_page_length=1, order_by=order_by, pluck="name")
|
|
if d:
|
|
return get_doc(doctype, d[0], for_update=for_update)
|
|
else:
|
|
raise frappe.DoesNotExistError(doctype=doctype)
|
|
|
|
|
|
def get_single(doctype):
|
|
"""Return a `frappe.model.document.Document` object of the given Single doctype."""
|
|
return get_doc(doctype, doctype)
|