Certain people add attachment, before filling mandatory fields which will raise Missing Fields error. Or any other kind of errors raised by different validators due to which file is uploaded but doc is not saved. This will lead to orphaned/mislinked files. ex. new-purchase-receipt-1 This fix changes name of new docs to new-<doctype>-<no>-<10digithash> after saving the document we can use this new name to find any mislinked files created in past hour and relink them to the new doc on save.
1700 lines
49 KiB
Python
1700 lines
49 KiB
Python
# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors
|
|
# License: MIT. See LICENSE
|
|
import hashlib
|
|
import json
|
|
import time
|
|
from collections.abc import Generator, Iterable
|
|
from typing import TYPE_CHECKING, Any, Optional
|
|
|
|
from werkzeug.exceptions import NotFound
|
|
|
|
import frappe
|
|
from frappe import _, is_whitelisted, msgprint
|
|
from frappe.core.doctype.file.utils import relink_mismatched_files
|
|
from frappe.core.doctype.server_script.server_script_utils import run_server_script_for_doc_event
|
|
from frappe.desk.form.document_follow import follow_document
|
|
from frappe.integrations.doctype.webhook import run_webhooks
|
|
from frappe.model import optional_fields, table_fields
|
|
from frappe.model.base_document import BaseDocument, get_controller
|
|
from frappe.model.docstatus import DocStatus
|
|
from frappe.model.naming import set_new_name, validate_name
|
|
from frappe.model.utils import is_virtual_doctype
|
|
from frappe.model.workflow import set_workflow_state_on_action, validate_workflow
|
|
from frappe.types import DF
|
|
from frappe.utils import compare, cstr, date_diff, file_lock, flt, get_datetime_str, now
|
|
from frappe.utils.data import get_absolute_url
|
|
from frappe.utils.global_search import update_global_search
|
|
|
|
if TYPE_CHECKING:
|
|
from frappe.core.doctype.docfield.docfield import DocField
|
|
|
|
|
|
def get_doc(*args, **kwargs):
|
|
"""returns a frappe.model.Document object.
|
|
|
|
:param arg1: Document dict or DocType name.
|
|
:param arg2: [optional] document name.
|
|
:param for_update: [optional] select document for update.
|
|
|
|
There are multiple ways to call `get_doc`
|
|
|
|
# will fetch the latest user object (with child table) from the database
|
|
user = get_doc("User", "test@example.com")
|
|
|
|
# create a new object
|
|
user = get_doc({
|
|
"doctype":"User"
|
|
"email_id": "test@example.com",
|
|
"roles: [
|
|
{"role": "System Manager"}
|
|
]
|
|
})
|
|
|
|
# create new object with keyword arguments
|
|
user = get_doc(doctype='User', email_id='test@example.com')
|
|
|
|
# select a document for update
|
|
user = get_doc("User", "test@example.com", for_update=True)
|
|
"""
|
|
if args:
|
|
if isinstance(args[0], BaseDocument):
|
|
# already a document
|
|
return args[0]
|
|
elif isinstance(args[0], str):
|
|
doctype = args[0]
|
|
|
|
elif isinstance(args[0], dict):
|
|
# passed a dict
|
|
kwargs = args[0]
|
|
|
|
else:
|
|
raise ValueError("First non keyword argument must be a string or dict")
|
|
|
|
if len(args) < 2 and kwargs:
|
|
if "doctype" in kwargs:
|
|
doctype = kwargs["doctype"]
|
|
else:
|
|
raise ValueError('"doctype" is a required key')
|
|
|
|
controller = get_controller(doctype)
|
|
if controller:
|
|
return controller(*args, **kwargs)
|
|
|
|
raise ImportError(doctype)
|
|
|
|
|
|
class Document(BaseDocument):
|
|
"""All controllers inherit from `Document`."""
|
|
|
|
doctype: DF.Data
|
|
name: DF.Data | None
|
|
flags: frappe._dict[str, Any]
|
|
owner: DF.Link
|
|
creation: DF.Datetime
|
|
modified: DF.Datetime
|
|
modified_by: DF.Link
|
|
idx: DF.Int
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
"""Constructor.
|
|
|
|
:param arg1: DocType name as string or document **dict**
|
|
:param arg2: Document name, if `arg1` is DocType name.
|
|
|
|
If DocType name and document name are passed, the object will load
|
|
all values (including child documents) from the database.
|
|
"""
|
|
self.doctype = None
|
|
self.name = None
|
|
self.flags = frappe._dict()
|
|
|
|
if args and args[0]:
|
|
if isinstance(args[0], str):
|
|
# first arugment is doctype
|
|
self.doctype = args[0]
|
|
|
|
# doctype for singles, string value or filters for other documents
|
|
self.name = self.doctype if len(args) == 1 else args[1]
|
|
|
|
# for_update is set in flags to avoid changing load_from_db signature
|
|
# since it is used in virtual doctypes and inherited in child classes
|
|
self.flags.for_update = kwargs.get("for_update")
|
|
self.load_from_db()
|
|
return
|
|
|
|
if isinstance(args[0], dict):
|
|
# first argument is a dict
|
|
kwargs = args[0]
|
|
|
|
if kwargs:
|
|
# init base document
|
|
super().__init__(kwargs)
|
|
self.init_child_tables()
|
|
self.init_valid_columns()
|
|
|
|
else:
|
|
# incorrect arguments. let's not proceed.
|
|
raise ValueError("Illegal arguments")
|
|
|
|
@property
|
|
def is_locked(self):
|
|
return file_lock.lock_exists(self.get_signature())
|
|
|
|
@staticmethod
|
|
def whitelist(fn):
|
|
"""Decorator: Whitelist method to be called remotely via REST API."""
|
|
frappe.whitelist()(fn)
|
|
return fn
|
|
|
|
def load_from_db(self):
|
|
"""Load document and children from database and create properties
|
|
from fields"""
|
|
self.flags.ignore_children = True
|
|
if not getattr(self, "_metaclass", False) and self.meta.issingle:
|
|
single_doc = frappe.db.get_singles_dict(self.doctype, for_update=self.flags.for_update)
|
|
if not single_doc:
|
|
single_doc = frappe.new_doc(self.doctype, as_dict=True)
|
|
single_doc["name"] = self.doctype
|
|
del single_doc["__islocal"]
|
|
|
|
super().__init__(single_doc)
|
|
self.init_valid_columns()
|
|
self._fix_numeric_types()
|
|
|
|
else:
|
|
get_value_kwargs = {"for_update": self.flags.for_update, "as_dict": True}
|
|
if not isinstance(self.name, (dict, list)):
|
|
get_value_kwargs["order_by"] = None
|
|
|
|
d = frappe.db.get_value(
|
|
doctype=self.doctype, filters=self.name, fieldname="*", **get_value_kwargs
|
|
)
|
|
|
|
if not d:
|
|
frappe.throw(
|
|
_("{0} {1} not found").format(_(self.doctype), self.name), frappe.DoesNotExistError
|
|
)
|
|
|
|
super().__init__(d)
|
|
self.flags.pop("ignore_children", None)
|
|
|
|
for df in self._get_table_fields():
|
|
# Make sure not to query the DB for a child table, if it is a virtual one.
|
|
# During frappe is installed, the property "is_virtual" is not available in tabDocType, so
|
|
# we need to filter those cases for the access to frappe.db.get_value() as it would crash otherwise.
|
|
if hasattr(self, "doctype") and not hasattr(self, "module") and is_virtual_doctype(df.options):
|
|
self.set(df.fieldname, [])
|
|
continue
|
|
|
|
children = (
|
|
frappe.db.get_values(
|
|
df.options,
|
|
{"parent": self.name, "parenttype": self.doctype, "parentfield": df.fieldname},
|
|
"*",
|
|
as_dict=True,
|
|
order_by="idx asc",
|
|
for_update=self.flags.for_update,
|
|
)
|
|
or []
|
|
)
|
|
|
|
self.set(df.fieldname, children)
|
|
|
|
# sometimes __setup__ can depend on child values, hence calling again at the end
|
|
if hasattr(self, "__setup__"):
|
|
self.__setup__()
|
|
|
|
def reload(self):
|
|
"""Reload document from database"""
|
|
self.load_from_db()
|
|
|
|
def get_latest(self):
|
|
if not getattr(self, "_doc_before_save", None):
|
|
self.load_doc_before_save()
|
|
|
|
return self._doc_before_save
|
|
|
|
def check_permission(self, permtype="read", permlevel=None):
|
|
"""Raise `frappe.PermissionError` if not permitted"""
|
|
if not self.has_permission(permtype):
|
|
self.raise_no_permission_to(permtype)
|
|
|
|
def has_permission(self, permtype="read") -> bool:
|
|
"""
|
|
Call `frappe.permissions.has_permission` if `ignore_permissions` flag isn't truthy
|
|
|
|
:param permtype: `read`, `write`, `submit`, `cancel`, `delete`, etc.
|
|
"""
|
|
|
|
if self.flags.ignore_permissions:
|
|
return True
|
|
|
|
import frappe.permissions
|
|
|
|
return frappe.permissions.has_permission(self.doctype, permtype, self)
|
|
|
|
def raise_no_permission_to(self, perm_type):
|
|
"""Raise `frappe.PermissionError`."""
|
|
frappe.flags.error_message = (
|
|
_("Insufficient Permission for {0}").format(self.doctype) + f" ({frappe.bold(_(perm_type))})"
|
|
)
|
|
raise frappe.PermissionError
|
|
|
|
def insert(
|
|
self,
|
|
ignore_permissions=None,
|
|
ignore_links=None,
|
|
ignore_if_duplicate=False,
|
|
ignore_mandatory=None,
|
|
set_name=None,
|
|
set_child_names=True,
|
|
) -> "Document":
|
|
"""Insert the document in the database (as a new document).
|
|
This will check for user permissions and execute `before_insert`,
|
|
`validate`, `on_update`, `after_insert` methods if they are written.
|
|
|
|
:param ignore_permissions: Do not check permissions if True."""
|
|
if self.flags.in_print:
|
|
return
|
|
|
|
self.flags.notifications_executed = []
|
|
|
|
if ignore_permissions is not None:
|
|
self.flags.ignore_permissions = ignore_permissions
|
|
|
|
if ignore_links is not None:
|
|
self.flags.ignore_links = ignore_links
|
|
|
|
if ignore_mandatory is not None:
|
|
self.flags.ignore_mandatory = ignore_mandatory
|
|
|
|
self.set("__islocal", True)
|
|
|
|
self._set_defaults()
|
|
self.set_user_and_timestamp()
|
|
self.set_docstatus()
|
|
self.check_if_latest()
|
|
self._validate_links()
|
|
self.check_permission("create")
|
|
self.run_method("before_insert")
|
|
self.set_new_name(set_name=set_name, set_child_names=set_child_names)
|
|
self.set_parent_in_children()
|
|
self.validate_higher_perm_levels()
|
|
|
|
self.flags.in_insert = True
|
|
self.run_before_save_methods()
|
|
self._validate()
|
|
self.set_docstatus()
|
|
self.flags.in_insert = False
|
|
|
|
# run validate, on update etc.
|
|
|
|
# parent
|
|
if getattr(self.meta, "issingle", 0):
|
|
self.update_single(self.get_valid_dict())
|
|
else:
|
|
self.db_insert(ignore_if_duplicate=ignore_if_duplicate)
|
|
|
|
# children
|
|
for d in self.get_all_children():
|
|
d.db_insert()
|
|
|
|
self.run_method("after_insert")
|
|
self.flags.in_insert = True
|
|
|
|
if self.get("amended_from"):
|
|
self.copy_attachments_from_amended_from()
|
|
|
|
relink_mismatched_files(self)
|
|
self.run_post_save_methods()
|
|
self.flags.in_insert = False
|
|
|
|
# delete __islocal
|
|
if hasattr(self, "__islocal"):
|
|
delattr(self, "__islocal")
|
|
|
|
# clear unsaved flag
|
|
if hasattr(self, "__unsaved"):
|
|
delattr(self, "__unsaved")
|
|
|
|
if not (
|
|
frappe.flags.in_migrate or frappe.local.flags.in_install or frappe.flags.in_setup_wizard
|
|
):
|
|
if frappe.get_cached_value("User", frappe.session.user, "follow_created_documents"):
|
|
follow_document(self.doctype, self.name, frappe.session.user)
|
|
return self
|
|
|
|
def check_if_locked(self):
|
|
if self.creation and self.is_locked:
|
|
raise frappe.DocumentLockedError
|
|
|
|
def save(self, *args, **kwargs):
|
|
"""Wrapper for _save"""
|
|
return self._save(*args, **kwargs)
|
|
|
|
def _save(self, ignore_permissions=None, ignore_version=None) -> "Document":
|
|
"""Save the current document in the database in the **DocType**'s table or
|
|
`tabSingles` (for single types).
|
|
|
|
This will check for user permissions and execute
|
|
`validate` before updating, `on_update` after updating triggers.
|
|
|
|
:param ignore_permissions: Do not check permissions if True.
|
|
:param ignore_version: Do not save version if True."""
|
|
if self.flags.in_print:
|
|
return
|
|
|
|
self.flags.notifications_executed = []
|
|
|
|
if ignore_permissions is not None:
|
|
self.flags.ignore_permissions = ignore_permissions
|
|
|
|
self.flags.ignore_version = frappe.flags.in_test if ignore_version is None else ignore_version
|
|
|
|
if self.get("__islocal") or not self.get("name"):
|
|
return self.insert()
|
|
|
|
self.check_if_locked()
|
|
self.check_permission("write", "save")
|
|
|
|
self.set_user_and_timestamp()
|
|
self.set_docstatus()
|
|
self.check_if_latest()
|
|
self.set_parent_in_children()
|
|
self.set_name_in_children()
|
|
|
|
self.validate_higher_perm_levels()
|
|
self._validate_links()
|
|
self.run_before_save_methods()
|
|
|
|
if self._action != "cancel":
|
|
self._validate()
|
|
|
|
if self._action == "update_after_submit":
|
|
self.validate_update_after_submit()
|
|
|
|
self.set_docstatus()
|
|
|
|
# parent
|
|
if self.meta.issingle:
|
|
self.update_single(self.get_valid_dict())
|
|
else:
|
|
self.db_update()
|
|
|
|
self.update_children()
|
|
self.run_post_save_methods()
|
|
|
|
# clear unsaved flag
|
|
if hasattr(self, "__unsaved"):
|
|
delattr(self, "__unsaved")
|
|
|
|
return self
|
|
|
|
def copy_attachments_from_amended_from(self):
|
|
"""Copy attachments from `amended_from`"""
|
|
from frappe.desk.form.load import get_attachments
|
|
|
|
# loop through attachments
|
|
for attach_item in get_attachments(self.doctype, self.amended_from):
|
|
|
|
# save attachments to new doc
|
|
_file = frappe.get_doc(
|
|
{
|
|
"doctype": "File",
|
|
"file_url": attach_item.file_url,
|
|
"file_name": attach_item.file_name,
|
|
"attached_to_name": self.name,
|
|
"attached_to_doctype": self.doctype,
|
|
"folder": "Home/Attachments",
|
|
"is_private": attach_item.is_private,
|
|
}
|
|
)
|
|
_file.save()
|
|
|
|
def update_children(self):
|
|
"""update child tables"""
|
|
for df in self.meta.get_table_fields():
|
|
self.update_child_table(df.fieldname, df)
|
|
|
|
def update_child_table(self, fieldname: str, df: Optional["DocField"] = None):
|
|
"""sync child table for given fieldname"""
|
|
rows = []
|
|
df: "DocField" = df or self.meta.get_field(fieldname)
|
|
|
|
for d in self.get(df.fieldname):
|
|
d: Document
|
|
d.db_update()
|
|
rows.append(d.name)
|
|
|
|
if (
|
|
df.options in (self.flags.ignore_children_type or [])
|
|
or frappe.get_meta(df.options).is_virtual == 1
|
|
):
|
|
# do not delete rows for this because of flags
|
|
# hack for docperm :(
|
|
return
|
|
|
|
# delete rows that do not match the ones in the document
|
|
tbl = frappe.qb.DocType(df.options)
|
|
qry = (
|
|
frappe.qb.from_(tbl)
|
|
.where(tbl.parent == self.name)
|
|
.where(tbl.parenttype == self.doctype)
|
|
.where(tbl.parentfield == fieldname)
|
|
.delete()
|
|
)
|
|
|
|
if rows:
|
|
qry = qry.where(tbl.name.notin(rows))
|
|
|
|
qry.run()
|
|
|
|
def get_doc_before_save(self) -> "Document":
|
|
return getattr(self, "_doc_before_save", None)
|
|
|
|
def has_value_changed(self, fieldname):
|
|
"""Returns true if value is changed before and after saving"""
|
|
previous = self.get_doc_before_save()
|
|
return previous.get(fieldname) != self.get(fieldname) if previous else True
|
|
|
|
def set_new_name(self, force=False, set_name=None, set_child_names=True):
|
|
"""Calls `frappe.naming.set_new_name` for parent and child docs."""
|
|
|
|
if self.flags.name_set and not force:
|
|
return
|
|
|
|
# If autoname has set as Prompt (name)
|
|
if self.get("__newname"):
|
|
self.name = validate_name(self.doctype, self.get("__newname"))
|
|
self.flags.name_set = True
|
|
return
|
|
|
|
if set_name:
|
|
self.name = validate_name(self.doctype, set_name)
|
|
else:
|
|
set_new_name(self)
|
|
|
|
if set_child_names:
|
|
# set name for children
|
|
for d in self.get_all_children():
|
|
set_new_name(d)
|
|
|
|
self.flags.name_set = True
|
|
|
|
def get_title(self):
|
|
"""Get the document title based on title_field or `title` or `name`"""
|
|
return self.get(self.meta.get_title_field()) or ""
|
|
|
|
def set_title_field(self):
|
|
"""Set title field based on template"""
|
|
|
|
def get_values():
|
|
values = self.as_dict()
|
|
# format values
|
|
for key, value in values.items():
|
|
if value is None:
|
|
values[key] = ""
|
|
return values
|
|
|
|
if self.meta.get("title_field") == "title":
|
|
df = self.meta.get_field(self.meta.title_field)
|
|
|
|
if df.options:
|
|
self.set(df.fieldname, df.options.format(**get_values()))
|
|
elif self.is_new() and not self.get(df.fieldname) and df.default:
|
|
# set default title for new transactions (if default)
|
|
self.set(df.fieldname, df.default.format(**get_values()))
|
|
|
|
def update_single(self, d):
|
|
"""Updates values for Single type Document in `tabSingles`."""
|
|
frappe.db.delete("Singles", {"doctype": self.doctype})
|
|
for field, value in d.items():
|
|
if field != "doctype":
|
|
frappe.db.sql(
|
|
"""insert into `tabSingles` (doctype, field, value)
|
|
values (%s, %s, %s)""",
|
|
(self.doctype, field, value),
|
|
)
|
|
|
|
if self.doctype in frappe.db.value_cache:
|
|
del frappe.db.value_cache[self.doctype]
|
|
|
|
def set_user_and_timestamp(self):
|
|
self._original_modified = self.modified
|
|
self.modified = now()
|
|
self.modified_by = frappe.session.user
|
|
|
|
# We'd probably want the creation and owner to be set via API
|
|
# or Data import at some point, that'd have to be handled here
|
|
if self.is_new() and not (
|
|
frappe.flags.in_install or frappe.flags.in_patch or frappe.flags.in_migrate
|
|
):
|
|
self.creation = self.modified
|
|
self.owner = self.modified_by
|
|
|
|
for d in self.get_all_children():
|
|
d.modified = self.modified
|
|
d.modified_by = self.modified_by
|
|
if not d.owner:
|
|
d.owner = self.owner
|
|
if not d.creation:
|
|
d.creation = self.creation
|
|
|
|
frappe.flags.currently_saving.append((self.doctype, self.name))
|
|
|
|
def set_docstatus(self):
|
|
if self.docstatus is None:
|
|
self.docstatus = DocStatus.draft()
|
|
|
|
for d in self.get_all_children():
|
|
d.docstatus = self.docstatus
|
|
|
|
def _validate(self):
|
|
self._validate_mandatory()
|
|
self._validate_data_fields()
|
|
self._validate_selects()
|
|
self._validate_non_negative()
|
|
self._validate_length()
|
|
self._fix_rating_value()
|
|
self._validate_code_fields()
|
|
self._sync_autoname_field()
|
|
self._extract_images_from_text_editor()
|
|
self._sanitize_content()
|
|
self._save_passwords()
|
|
self.validate_workflow()
|
|
|
|
for d in self.get_all_children():
|
|
d._validate_data_fields()
|
|
d._validate_selects()
|
|
d._validate_non_negative()
|
|
d._validate_length()
|
|
d._validate_code_fields()
|
|
d._sync_autoname_field()
|
|
d._extract_images_from_text_editor()
|
|
d._sanitize_content()
|
|
d._save_passwords()
|
|
if self.is_new():
|
|
# don't set fields like _assign, _comments for new doc
|
|
for fieldname in optional_fields:
|
|
self.set(fieldname, None)
|
|
else:
|
|
self.validate_set_only_once()
|
|
|
|
def _validate_non_negative(self):
|
|
def get_msg(df):
|
|
if self.get("parentfield"):
|
|
return "{} {} #{}: {} {}".format(
|
|
frappe.bold(_(self.doctype)),
|
|
_("Row"),
|
|
self.idx,
|
|
_("Value cannot be negative for"),
|
|
frappe.bold(_(df.label)),
|
|
)
|
|
else:
|
|
return _("Value cannot be negative for {0}: {1}").format(
|
|
_(df.parent), frappe.bold(_(df.label))
|
|
)
|
|
|
|
for df in self.meta.get(
|
|
"fields", {"non_negative": ("=", 1), "fieldtype": ("in", ["Int", "Float", "Currency"])}
|
|
):
|
|
|
|
if flt(self.get(df.fieldname)) < 0:
|
|
msg = get_msg(df)
|
|
frappe.throw(msg, frappe.NonNegativeError, title=_("Negative Value"))
|
|
|
|
def _fix_rating_value(self):
|
|
for field in self.meta.get("fields", {"fieldtype": "Rating"}):
|
|
value = self.get(field.fieldname)
|
|
if not isinstance(value, float):
|
|
value = flt(value)
|
|
|
|
# Make sure rating is between 0 and 1
|
|
self.set(field.fieldname, max(0, min(value, 1)))
|
|
|
|
def validate_workflow(self):
|
|
"""Validate if the workflow transition is valid"""
|
|
if frappe.flags.in_install == "frappe":
|
|
return
|
|
workflow = self.meta.get_workflow()
|
|
if workflow:
|
|
validate_workflow(self)
|
|
if not self._action == "save":
|
|
set_workflow_state_on_action(self, workflow, self._action)
|
|
|
|
def validate_set_only_once(self):
|
|
"""Validate that fields are not changed if not in insert"""
|
|
set_only_once_fields = self.meta.get_set_only_once_fields()
|
|
|
|
if set_only_once_fields and self._doc_before_save:
|
|
# document exists before saving
|
|
for field in set_only_once_fields:
|
|
fail = False
|
|
value = self.get(field.fieldname)
|
|
original_value = self._doc_before_save.get(field.fieldname)
|
|
|
|
if field.fieldtype in table_fields:
|
|
fail = not self.is_child_table_same(field.fieldname)
|
|
elif field.fieldtype in ("Date", "Datetime", "Time"):
|
|
fail = str(value) != str(original_value)
|
|
else:
|
|
fail = value != original_value
|
|
|
|
if fail:
|
|
frappe.throw(
|
|
_("Value cannot be changed for {0}").format(
|
|
frappe.bold(self.meta.get_label(field.fieldname))
|
|
),
|
|
exc=frappe.CannotChangeConstantError,
|
|
)
|
|
|
|
return False
|
|
|
|
def is_child_table_same(self, fieldname):
|
|
"""Validate child table is same as original table before saving"""
|
|
value = self.get(fieldname)
|
|
original_value = self._doc_before_save.get(fieldname)
|
|
same = True
|
|
|
|
if len(original_value) != len(value):
|
|
same = False
|
|
else:
|
|
# check all child entries
|
|
for i, d in enumerate(original_value):
|
|
new_child = value[i].as_dict(convert_dates_to_str=True)
|
|
original_child = d.as_dict(convert_dates_to_str=True)
|
|
|
|
# all fields must be same other than modified and modified_by
|
|
for key in ("modified", "modified_by", "creation"):
|
|
del new_child[key]
|
|
del original_child[key]
|
|
|
|
if original_child != new_child:
|
|
same = False
|
|
break
|
|
|
|
return same
|
|
|
|
def apply_fieldlevel_read_permissions(self):
|
|
"""Remove values the user is not allowed to read (called when loading in desk)"""
|
|
|
|
if frappe.session.user == "Administrator":
|
|
return
|
|
|
|
has_higher_permlevel = False
|
|
|
|
all_fields = self.meta.fields.copy()
|
|
for table_field in self.meta.get_table_fields():
|
|
all_fields += frappe.get_meta(table_field.options).fields or []
|
|
|
|
for df in all_fields:
|
|
if df.permlevel > 0:
|
|
has_higher_permlevel = True
|
|
break
|
|
|
|
if not has_higher_permlevel:
|
|
return
|
|
|
|
has_access_to = self.get_permlevel_access("read")
|
|
|
|
for df in self.meta.fields:
|
|
if df.permlevel and hasattr(self, df.fieldname) and df.permlevel not in has_access_to:
|
|
try:
|
|
delattr(self, df.fieldname)
|
|
except AttributeError:
|
|
# hasattr might return True for class attribute which can't be delattr-ed.
|
|
continue
|
|
|
|
for table_field in self.meta.get_table_fields():
|
|
for df in frappe.get_meta(table_field.options).fields or []:
|
|
if df.permlevel and df.permlevel not in has_access_to:
|
|
for child in self.get(table_field.fieldname) or []:
|
|
if hasattr(child, df.fieldname):
|
|
delattr(child, df.fieldname)
|
|
|
|
def validate_higher_perm_levels(self):
|
|
"""If the user does not have permissions at permlevel > 0, then reset the values to original / default"""
|
|
if self.flags.ignore_permissions or frappe.flags.in_install:
|
|
return
|
|
|
|
if frappe.session.user == "Administrator":
|
|
return
|
|
|
|
has_access_to = self.get_permlevel_access()
|
|
high_permlevel_fields = self.meta.get_high_permlevel_fields()
|
|
|
|
if high_permlevel_fields:
|
|
self.reset_values_if_no_permlevel_access(has_access_to, high_permlevel_fields)
|
|
|
|
# If new record then don't reset the values for child table
|
|
if self.is_new():
|
|
return
|
|
|
|
# check for child tables
|
|
for df in self.meta.get_table_fields():
|
|
high_permlevel_fields = frappe.get_meta(df.options).get_high_permlevel_fields()
|
|
if high_permlevel_fields:
|
|
for d in self.get(df.fieldname):
|
|
d.reset_values_if_no_permlevel_access(has_access_to, high_permlevel_fields)
|
|
|
|
def get_permlevel_access(self, permission_type="write"):
|
|
allowed_permlevels = []
|
|
roles = frappe.get_roles()
|
|
|
|
for perm in self.get_permissions():
|
|
if (
|
|
perm.role in roles and perm.get(permission_type) and perm.permlevel not in allowed_permlevels
|
|
):
|
|
allowed_permlevels.append(perm.permlevel)
|
|
|
|
return allowed_permlevels
|
|
|
|
def has_permlevel_access_to(self, fieldname, df=None, permission_type="read"):
|
|
if not df:
|
|
df = self.meta.get_field(fieldname)
|
|
|
|
return df.permlevel in self.get_permlevel_access(permission_type)
|
|
|
|
def get_permissions(self):
|
|
if self.meta.istable:
|
|
# use parent permissions
|
|
permissions = frappe.get_meta(self.parenttype).permissions
|
|
else:
|
|
permissions = self.meta.permissions
|
|
|
|
return permissions
|
|
|
|
def _set_defaults(self):
|
|
if frappe.flags.in_import:
|
|
return
|
|
|
|
new_doc = frappe.new_doc(self.doctype, as_dict=True)
|
|
self.update_if_missing(new_doc)
|
|
|
|
# children
|
|
for df in self.meta.get_table_fields():
|
|
new_doc = frappe.new_doc(df.options, as_dict=True)
|
|
value = self.get(df.fieldname)
|
|
if isinstance(value, list):
|
|
for d in value:
|
|
d.update_if_missing(new_doc)
|
|
|
|
def check_if_latest(self):
|
|
"""Checks if `modified` timestamp provided by document being updated is same as the
|
|
`modified` timestamp in the database. If there is a different, the document has been
|
|
updated in the database after the current copy was read. Will throw an error if
|
|
timestamps don't match.
|
|
|
|
Will also validate document transitions (Save > Submit > Cancel) calling
|
|
`self.check_docstatus_transition`."""
|
|
|
|
self.load_doc_before_save(raise_exception=True)
|
|
|
|
self._action = "save"
|
|
previous = self._doc_before_save
|
|
|
|
# previous is None for new document insert
|
|
if not previous:
|
|
self.check_docstatus_transition(0)
|
|
return
|
|
|
|
if cstr(previous.modified) != cstr(self._original_modified):
|
|
frappe.msgprint(
|
|
_("Error: Document has been modified after you have opened it")
|
|
+ (f" ({previous.modified}, {self.modified}). ")
|
|
+ _("Please refresh to get the latest document."),
|
|
raise_exception=frappe.TimestampMismatchError,
|
|
)
|
|
|
|
if not self.meta.issingle:
|
|
self.check_docstatus_transition(previous.docstatus)
|
|
|
|
def check_docstatus_transition(self, to_docstatus):
|
|
"""Ensures valid `docstatus` transition.
|
|
Valid transitions are (number in brackets is `docstatus`):
|
|
|
|
- Save (0) > Save (0)
|
|
- Save (0) > Submit (1)
|
|
- Submit (1) > Submit (1)
|
|
- Submit (1) > Cancel (2)
|
|
|
|
"""
|
|
if not self.docstatus:
|
|
self.docstatus = DocStatus.draft()
|
|
|
|
if to_docstatus == DocStatus.draft():
|
|
if self.docstatus.is_draft():
|
|
self._action = "save"
|
|
elif self.docstatus.is_submitted():
|
|
self._action = "submit"
|
|
self.check_permission("submit")
|
|
elif self.docstatus.is_cancelled():
|
|
raise frappe.DocstatusTransitionError(
|
|
_("Cannot change docstatus from 0 (Draft) to 2 (Cancelled)")
|
|
)
|
|
else:
|
|
raise frappe.ValidationError(_("Invalid docstatus"), self.docstatus)
|
|
|
|
elif to_docstatus == DocStatus.submitted():
|
|
if self.docstatus.is_submitted():
|
|
self._action = "update_after_submit"
|
|
self.check_permission("submit")
|
|
elif self.docstatus.is_cancelled():
|
|
self._action = "cancel"
|
|
self.check_permission("cancel")
|
|
elif self.docstatus.is_draft():
|
|
raise frappe.DocstatusTransitionError(
|
|
_("Cannot change docstatus from 1 (Submitted) to 0 (Draft)")
|
|
)
|
|
else:
|
|
raise frappe.ValidationError(_("Invalid docstatus"), self.docstatus)
|
|
|
|
elif to_docstatus == DocStatus.cancelled():
|
|
raise frappe.ValidationError(_("Cannot edit cancelled document"))
|
|
|
|
def set_parent_in_children(self):
|
|
"""Updates `parent` and `parenttype` property in all children."""
|
|
for d in self.get_all_children():
|
|
d.parent = self.name
|
|
d.parenttype = self.doctype
|
|
|
|
def set_name_in_children(self):
|
|
# Set name for any new children
|
|
for d in self.get_all_children():
|
|
if not d.name:
|
|
set_new_name(d)
|
|
|
|
def validate_update_after_submit(self):
|
|
if self.flags.ignore_validate_update_after_submit:
|
|
return
|
|
|
|
self._validate_update_after_submit()
|
|
for d in self.get_all_children():
|
|
if d.is_new() and self.meta.get_field(d.parentfield).allow_on_submit:
|
|
# in case of a new row, don't validate allow on submit, if table is allow on submit
|
|
continue
|
|
|
|
d._validate_update_after_submit()
|
|
|
|
# TODO check only allowed values are updated
|
|
|
|
def _validate_mandatory(self):
|
|
if self.flags.ignore_mandatory:
|
|
return
|
|
|
|
missing = self._get_missing_mandatory_fields()
|
|
for d in self.get_all_children():
|
|
missing.extend(d._get_missing_mandatory_fields())
|
|
|
|
if not missing:
|
|
return
|
|
|
|
for fieldname, msg in missing:
|
|
msgprint(msg)
|
|
|
|
if frappe.flags.print_messages:
|
|
print(self.as_json().encode("utf-8"))
|
|
|
|
raise frappe.MandatoryError(
|
|
"[{doctype}, {name}]: {fields}".format(
|
|
fields=", ".join(each[0] for each in missing), doctype=self.doctype, name=self.name
|
|
)
|
|
)
|
|
|
|
def _validate_links(self):
|
|
if self.flags.ignore_links or self._action == "cancel":
|
|
return
|
|
|
|
invalid_links, cancelled_links = self.get_invalid_links()
|
|
|
|
for d in self.get_all_children():
|
|
result = d.get_invalid_links(is_submittable=self.meta.is_submittable)
|
|
invalid_links.extend(result[0])
|
|
cancelled_links.extend(result[1])
|
|
|
|
if invalid_links:
|
|
msg = ", ".join(each[2] for each in invalid_links)
|
|
frappe.throw(_("Could not find {0}").format(msg), frappe.LinkValidationError)
|
|
|
|
if cancelled_links:
|
|
msg = ", ".join(each[2] for each in cancelled_links)
|
|
frappe.throw(_("Cannot link cancelled document: {0}").format(msg), frappe.CancelledLinkError)
|
|
|
|
def get_all_children(self, parenttype=None) -> list["Document"]:
|
|
"""Returns all children documents from **Table** type fields in a list."""
|
|
|
|
children = []
|
|
|
|
for df in self.meta.get_table_fields():
|
|
if parenttype and df.options != parenttype:
|
|
continue
|
|
|
|
if value := self.get(df.fieldname):
|
|
children.extend(value)
|
|
|
|
return children
|
|
|
|
def run_method(self, method, *args, **kwargs):
|
|
"""run standard triggers, plus those in hooks"""
|
|
|
|
def fn(self, *args, **kwargs):
|
|
method_object = getattr(self, method, None)
|
|
|
|
# Cannot have a field with same name as method
|
|
# If method found in __dict__, expect it to be callable
|
|
if method in self.__dict__ or callable(method_object):
|
|
return method_object(*args, **kwargs)
|
|
|
|
fn.__name__ = str(method)
|
|
out = Document.hook(fn)(self, *args, **kwargs)
|
|
|
|
self.run_notifications(method)
|
|
run_webhooks(self, method)
|
|
run_server_script_for_doc_event(self, method)
|
|
|
|
return out
|
|
|
|
def run_trigger(self, method, *args, **kwargs):
|
|
return self.run_method(method, *args, **kwargs)
|
|
|
|
def run_notifications(self, method):
|
|
"""Run notifications for this method"""
|
|
if (
|
|
(frappe.flags.in_import and frappe.flags.mute_emails)
|
|
or frappe.flags.in_patch
|
|
or frappe.flags.in_install
|
|
):
|
|
return
|
|
|
|
if self.flags.notifications_executed is None:
|
|
self.flags.notifications_executed = []
|
|
|
|
from frappe.email.doctype.notification.notification import evaluate_alert
|
|
|
|
if self.flags.notifications is None:
|
|
|
|
def _get_notifications():
|
|
"""returns enabled notifications for the current doctype"""
|
|
|
|
return frappe.get_all(
|
|
"Notification",
|
|
fields=["name", "event", "method"],
|
|
filters={"enabled": 1, "document_type": self.doctype},
|
|
)
|
|
|
|
self.flags.notifications = frappe.cache.hget("notifications", self.doctype, _get_notifications)
|
|
|
|
if not self.flags.notifications:
|
|
return
|
|
|
|
def _evaluate_alert(alert):
|
|
if alert.name in self.flags.notifications_executed:
|
|
return
|
|
|
|
evaluate_alert(self, alert.name, alert.event)
|
|
self.flags.notifications_executed.append(alert.name)
|
|
|
|
event_map = {
|
|
"on_update": "Save",
|
|
"after_insert": "New",
|
|
"on_submit": "Submit",
|
|
"on_cancel": "Cancel",
|
|
}
|
|
|
|
if not self.flags.in_insert:
|
|
# value change is not applicable in insert
|
|
event_map["on_change"] = "Value Change"
|
|
|
|
for alert in self.flags.notifications:
|
|
event = event_map.get(method, None)
|
|
if event and alert.event == event:
|
|
_evaluate_alert(alert)
|
|
elif alert.event == "Method" and method == alert.method:
|
|
_evaluate_alert(alert)
|
|
|
|
@whitelist.__func__
|
|
def _submit(self):
|
|
"""Submit the document. Sets `docstatus` = 1, then saves."""
|
|
self.docstatus = DocStatus.submitted()
|
|
return self.save()
|
|
|
|
@whitelist.__func__
|
|
def _cancel(self):
|
|
"""Cancel the document. Sets `docstatus` = 2, then saves."""
|
|
self.docstatus = DocStatus.cancelled()
|
|
return self.save()
|
|
|
|
@whitelist.__func__
|
|
def _rename(
|
|
self, name: str, merge: bool = False, force: bool = False, validate_rename: bool = True
|
|
):
|
|
"""Rename the document. Triggers frappe.rename_doc, then reloads."""
|
|
from frappe.model.rename_doc import rename_doc
|
|
|
|
self.name = rename_doc(doc=self, new=name, merge=merge, force=force, validate=validate_rename)
|
|
self.reload()
|
|
|
|
@whitelist.__func__
|
|
def submit(self):
|
|
"""Submit the document. Sets `docstatus` = 1, then saves."""
|
|
return self._submit()
|
|
|
|
@whitelist.__func__
|
|
def cancel(self):
|
|
"""Cancel the document. Sets `docstatus` = 2, then saves."""
|
|
return self._cancel()
|
|
|
|
@whitelist.__func__
|
|
def rename(
|
|
self, name: str, merge: bool = False, force: bool = False, validate_rename: bool = True
|
|
):
|
|
"""Rename the document to `name`. This transforms the current object."""
|
|
return self._rename(name=name, merge=merge, force=force, validate_rename=validate_rename)
|
|
|
|
def delete(self, ignore_permissions=False, force=False, *, delete_permanently=False):
|
|
"""Delete document."""
|
|
return frappe.delete_doc(
|
|
self.doctype,
|
|
self.name,
|
|
ignore_permissions=ignore_permissions,
|
|
flags=self.flags,
|
|
force=force,
|
|
delete_permanently=delete_permanently,
|
|
)
|
|
|
|
def run_before_save_methods(self):
|
|
"""Run standard methods before `INSERT` or `UPDATE`. Standard Methods are:
|
|
|
|
- `validate`, `before_save` for **Save**.
|
|
- `validate`, `before_submit` for **Submit**.
|
|
- `before_cancel` for **Cancel**
|
|
- `before_update_after_submit` for **Update after Submit**
|
|
|
|
Will also update title_field if set"""
|
|
|
|
self.reset_seen()
|
|
|
|
# before_validate method should be executed before ignoring validations
|
|
if self._action in ("save", "submit"):
|
|
self.run_method("before_validate")
|
|
|
|
if self.flags.ignore_validate:
|
|
return
|
|
|
|
if self._action == "save":
|
|
self.run_method("validate")
|
|
self.run_method("before_save")
|
|
elif self._action == "submit":
|
|
self.run_method("validate")
|
|
self.run_method("before_submit")
|
|
elif self._action == "cancel":
|
|
self.run_method("before_cancel")
|
|
elif self._action == "update_after_submit":
|
|
self.run_method("before_update_after_submit")
|
|
|
|
self.set_title_field()
|
|
|
|
def load_doc_before_save(self, *, raise_exception: bool = False):
|
|
"""load existing document from db before saving"""
|
|
|
|
self._doc_before_save = None
|
|
|
|
if self.is_new():
|
|
return
|
|
|
|
try:
|
|
self._doc_before_save = frappe.get_doc(self.doctype, self.name, for_update=True)
|
|
except frappe.DoesNotExistError:
|
|
if raise_exception:
|
|
raise
|
|
|
|
frappe.clear_last_message()
|
|
|
|
def run_post_save_methods(self):
|
|
"""Run standard methods after `INSERT` or `UPDATE`. Standard Methods are:
|
|
|
|
- `on_update` for **Save**.
|
|
- `on_update`, `on_submit` for **Submit**.
|
|
- `on_cancel` for **Cancel**
|
|
- `update_after_submit` for **Update after Submit**"""
|
|
|
|
if self._action == "save":
|
|
self.run_method("on_update")
|
|
elif self._action == "submit":
|
|
self.run_method("on_update")
|
|
self.run_method("on_submit")
|
|
elif self._action == "cancel":
|
|
self.run_method("on_cancel")
|
|
self.check_no_back_links_exist()
|
|
elif self._action == "update_after_submit":
|
|
self.run_method("on_update_after_submit")
|
|
|
|
self.clear_cache()
|
|
|
|
if self.flags.get("notify_update", True):
|
|
self.notify_update()
|
|
|
|
update_global_search(self)
|
|
|
|
self.save_version()
|
|
|
|
self.run_method("on_change")
|
|
|
|
if (self.doctype, self.name) in frappe.flags.currently_saving:
|
|
frappe.flags.currently_saving.remove((self.doctype, self.name))
|
|
|
|
def clear_cache(self):
|
|
frappe.clear_document_cache(self.doctype, self.name)
|
|
|
|
def reset_seen(self):
|
|
"""Clear _seen property and set current user as seen"""
|
|
if (
|
|
getattr(self.meta, "track_seen", False)
|
|
and not getattr(self.meta, "issingle", False)
|
|
and not self.is_new()
|
|
):
|
|
frappe.db.set_value(
|
|
self.doctype, self.name, "_seen", json.dumps([frappe.session.user]), update_modified=False
|
|
)
|
|
|
|
def notify_update(self):
|
|
"""Publish realtime that the current document is modified"""
|
|
if frappe.flags.in_patch:
|
|
return
|
|
|
|
frappe.publish_realtime(
|
|
"doc_update",
|
|
{"modified": self.modified, "doctype": self.doctype, "name": self.name},
|
|
doctype=self.doctype,
|
|
docname=self.name,
|
|
after_commit=True,
|
|
)
|
|
|
|
if (
|
|
not self.meta.get("read_only")
|
|
and not self.meta.get("issingle")
|
|
and not self.meta.get("istable")
|
|
):
|
|
data = {"doctype": self.doctype, "name": self.name, "user": frappe.session.user}
|
|
frappe.publish_realtime("list_update", data, after_commit=True)
|
|
|
|
def db_set(self, fieldname, value=None, update_modified=True, notify=False, commit=False):
|
|
"""Set a value in the document object, update the timestamp and update the database.
|
|
|
|
WARNING: This method does not trigger controller validations and should
|
|
be used very carefully.
|
|
|
|
:param fieldname: fieldname of the property to be updated, or a {"field":"value"} dictionary
|
|
:param value: value of the property to be updated
|
|
:param update_modified: default True. updates the `modified` and `modified_by` properties
|
|
:param notify: default False. run doc.notify_update() to send updates via socketio
|
|
:param commit: default False. run frappe.db.commit()
|
|
"""
|
|
if isinstance(fieldname, dict):
|
|
self.update(fieldname)
|
|
else:
|
|
self.set(fieldname, value)
|
|
|
|
if update_modified and (self.doctype, self.name) not in frappe.flags.currently_saving:
|
|
# don't update modified timestamp if called from post save methods
|
|
# like on_update or on_submit
|
|
self.set("modified", now())
|
|
self.set("modified_by", frappe.session.user)
|
|
|
|
# load but do not reload doc_before_save because before_change or on_change might expect it
|
|
if not self.get_doc_before_save():
|
|
self.load_doc_before_save()
|
|
|
|
# to trigger notification on value change
|
|
self.run_method("before_change")
|
|
|
|
if self.name is None:
|
|
return
|
|
|
|
if self.meta.issingle:
|
|
frappe.db.set_single_value(
|
|
self.doctype,
|
|
fieldname,
|
|
value,
|
|
modified=self.modified,
|
|
modified_by=self.modified_by,
|
|
update_modified=update_modified,
|
|
)
|
|
else:
|
|
frappe.db.set_value(
|
|
self.doctype,
|
|
self.name,
|
|
fieldname,
|
|
value,
|
|
self.modified,
|
|
self.modified_by,
|
|
update_modified=update_modified,
|
|
)
|
|
|
|
self.run_method("on_change")
|
|
|
|
if notify:
|
|
self.notify_update()
|
|
|
|
if commit:
|
|
frappe.db.commit()
|
|
|
|
def db_get(self, fieldname):
|
|
"""get database value for this fieldname"""
|
|
return frappe.db.get_value(self.doctype, self.name, fieldname)
|
|
|
|
def check_no_back_links_exist(self):
|
|
"""Check if document links to any active document before Cancel."""
|
|
from frappe.model.delete_doc import check_if_doc_is_dynamically_linked, check_if_doc_is_linked
|
|
|
|
if not self.flags.ignore_links:
|
|
check_if_doc_is_linked(self, method="Cancel")
|
|
check_if_doc_is_dynamically_linked(self, method="Cancel")
|
|
|
|
def save_version(self):
|
|
"""Save version info"""
|
|
|
|
# don't track version under following conditions
|
|
if (
|
|
not getattr(self.meta, "track_changes", False)
|
|
or self.doctype == "Version"
|
|
or self.flags.ignore_version
|
|
or frappe.flags.in_install
|
|
or (not self._doc_before_save and frappe.flags.in_patch)
|
|
):
|
|
return
|
|
|
|
doc_to_compare = self._doc_before_save
|
|
if not doc_to_compare and (amended_from := self.get("amended_from")):
|
|
doc_to_compare = frappe.get_doc(self.doctype, amended_from)
|
|
|
|
version = frappe.new_doc("Version")
|
|
if is_useful_diff := version.update_version_info(doc_to_compare, self):
|
|
version.insert(ignore_permissions=True)
|
|
|
|
if not frappe.flags.in_migrate:
|
|
# follow since you made a change?
|
|
if frappe.get_cached_value("User", frappe.session.user, "follow_created_documents"):
|
|
follow_document(self.doctype, self.name, frappe.session.user)
|
|
|
|
@staticmethod
|
|
def hook(f):
|
|
"""Decorator: Make method `hookable` (i.e. extensible by another app).
|
|
|
|
Note: If each hooked method returns a value (dict), then all returns are
|
|
collated in one dict and returned. Ideally, don't return values in hookable
|
|
methods, set properties in the document."""
|
|
|
|
def add_to_return_value(self, new_return_value):
|
|
if new_return_value is None:
|
|
self._return_value = self.get("_return_value")
|
|
return
|
|
|
|
if isinstance(new_return_value, dict):
|
|
if not self.get("_return_value"):
|
|
self._return_value = {}
|
|
self._return_value.update(new_return_value)
|
|
else:
|
|
self._return_value = new_return_value
|
|
|
|
def compose(fn, *hooks):
|
|
def runner(self, method, *args, **kwargs):
|
|
add_to_return_value(self, fn(self, *args, **kwargs))
|
|
for f in hooks:
|
|
add_to_return_value(self, f(self, method, *args, **kwargs))
|
|
|
|
return self.__dict__.pop("_return_value", None)
|
|
|
|
return runner
|
|
|
|
def composer(self, *args, **kwargs):
|
|
hooks = []
|
|
method = f.__name__
|
|
doc_events = frappe.get_doc_hooks()
|
|
for handler in doc_events.get(self.doctype, {}).get(method, []) + doc_events.get("*", {}).get(
|
|
method, []
|
|
):
|
|
hooks.append(frappe.get_attr(handler))
|
|
|
|
composed = compose(f, *hooks)
|
|
return composed(self, method, *args, **kwargs)
|
|
|
|
return composer
|
|
|
|
def is_whitelisted(self, method_name):
|
|
method = getattr(self, method_name, None)
|
|
if not method:
|
|
raise NotFound(f"Method {method_name} not found")
|
|
|
|
is_whitelisted(getattr(method, "__func__", method))
|
|
|
|
def validate_value(self, fieldname, condition, val2, doc=None, raise_exception=None):
|
|
"""Check that value of fieldname should be 'condition' val2
|
|
else throw Exception."""
|
|
error_condition_map = {
|
|
"in": _("one of"),
|
|
"not in": _("none of"),
|
|
"^": _("beginning with"),
|
|
}
|
|
|
|
if not doc:
|
|
doc = self
|
|
|
|
val1 = doc.get_value(fieldname)
|
|
|
|
df = doc.meta.get_field(fieldname)
|
|
val2 = doc.cast(val2, df)
|
|
|
|
if not compare(val1, condition, val2):
|
|
label = doc.meta.get_label(fieldname)
|
|
condition_str = error_condition_map.get(condition, condition)
|
|
if doc.get("parentfield"):
|
|
msg = _("Incorrect value in row {0}: {1} must be {2} {3}").format(
|
|
doc.idx, label, condition_str, val2
|
|
)
|
|
else:
|
|
msg = _("Incorrect value: {0} must be {1} {2}").format(label, condition_str, val2)
|
|
|
|
# raise passed exception or True
|
|
msgprint(msg, raise_exception=raise_exception or True)
|
|
|
|
def validate_table_has_rows(self, parentfield, raise_exception=None):
|
|
"""Raise exception if Table field is empty."""
|
|
if not (isinstance(self.get(parentfield), list) and len(self.get(parentfield)) > 0):
|
|
label = self.meta.get_label(parentfield)
|
|
frappe.throw(
|
|
_("Table {0} cannot be empty").format(label), raise_exception or frappe.EmptyTableError
|
|
)
|
|
|
|
def round_floats_in(self, doc, fieldnames=None):
|
|
"""Round floats for all `Currency`, `Float`, `Percent` fields for the given doc.
|
|
|
|
:param doc: Document whose numeric properties are to be rounded.
|
|
:param fieldnames: [Optional] List of fields to be rounded."""
|
|
if not fieldnames:
|
|
fieldnames = (
|
|
df.fieldname
|
|
for df in doc.meta.get("fields", {"fieldtype": ["in", ["Currency", "Float", "Percent"]]})
|
|
)
|
|
|
|
for fieldname in fieldnames:
|
|
doc.set(fieldname, flt(doc.get(fieldname), self.precision(fieldname, doc.get("parentfield"))))
|
|
|
|
def get_url(self):
|
|
"""Returns Desk URL for this document."""
|
|
return get_absolute_url(self.doctype, self.name)
|
|
|
|
def add_comment(
|
|
self,
|
|
comment_type="Comment",
|
|
text=None,
|
|
comment_email=None,
|
|
comment_by=None,
|
|
):
|
|
"""Add a comment to this document.
|
|
|
|
:param comment_type: e.g. `Comment`. See Communication for more info."""
|
|
|
|
return frappe.get_doc(
|
|
{
|
|
"doctype": "Comment",
|
|
"comment_type": comment_type,
|
|
"comment_email": comment_email or frappe.session.user,
|
|
"comment_by": comment_by,
|
|
"reference_doctype": self.doctype,
|
|
"reference_name": self.name,
|
|
"content": text or comment_type,
|
|
}
|
|
).insert(ignore_permissions=True)
|
|
|
|
def add_seen(self, user=None):
|
|
"""add the given/current user to list of users who have seen this document (_seen)"""
|
|
if not user:
|
|
user = frappe.session.user
|
|
|
|
if self.meta.track_seen and not frappe.flags.read_only and not self.meta.issingle:
|
|
_seen = self.get("_seen") or []
|
|
_seen = frappe.parse_json(_seen)
|
|
|
|
if user not in _seen:
|
|
_seen.append(user)
|
|
frappe.db.set_value(self.doctype, self.name, "_seen", json.dumps(_seen), update_modified=False)
|
|
frappe.local.flags.commit = True
|
|
|
|
def add_viewed(self, user=None, force=False, unique_views=False):
|
|
"""add log to communication when a user views a document"""
|
|
if not user:
|
|
user = frappe.session.user
|
|
|
|
if unique_views and frappe.db.exists(
|
|
"View Log", {"reference_doctype": self.doctype, "reference_name": self.name, "viewed_by": user}
|
|
):
|
|
return
|
|
|
|
if (hasattr(self.meta, "track_views") and self.meta.track_views) or force:
|
|
view_log = frappe.get_doc(
|
|
{
|
|
"doctype": "View Log",
|
|
"viewed_by": user,
|
|
"reference_doctype": self.doctype,
|
|
"reference_name": self.name,
|
|
}
|
|
)
|
|
if frappe.flags.read_only:
|
|
view_log.deferred_insert()
|
|
else:
|
|
view_log.insert(ignore_permissions=True)
|
|
frappe.local.flags.commit = True
|
|
|
|
return view_log
|
|
|
|
def log_error(self, title=None, message=None):
|
|
"""Helper function to create an Error Log"""
|
|
return frappe.log_error(
|
|
message=message, title=title, reference_doctype=self.doctype, reference_name=self.name
|
|
)
|
|
|
|
def get_signature(self):
|
|
"""Returns signature (hash) for private URL."""
|
|
return hashlib.sha224(get_datetime_str(self.creation).encode()).hexdigest()
|
|
|
|
def get_document_share_key(self, expires_on=None, no_expiry=False):
|
|
if no_expiry:
|
|
expires_on = None
|
|
|
|
existing_key = frappe.db.exists(
|
|
"Document Share Key",
|
|
{
|
|
"reference_doctype": self.doctype,
|
|
"reference_docname": self.name,
|
|
"expires_on": expires_on,
|
|
},
|
|
)
|
|
if existing_key:
|
|
doc = frappe.get_doc("Document Share Key", existing_key)
|
|
else:
|
|
doc = frappe.new_doc("Document Share Key")
|
|
doc.reference_doctype = self.doctype
|
|
doc.reference_docname = self.name
|
|
doc.expires_on = expires_on
|
|
doc.flags.no_expiry = no_expiry
|
|
doc.insert(ignore_permissions=True)
|
|
|
|
return doc.key
|
|
|
|
def get_liked_by(self):
|
|
liked_by = getattr(self, "_liked_by", None)
|
|
if liked_by:
|
|
return json.loads(liked_by)
|
|
else:
|
|
return []
|
|
|
|
def set_onload(self, key, value):
|
|
if not self.get("__onload"):
|
|
self.set("__onload", frappe._dict())
|
|
self.get("__onload")[key] = value
|
|
|
|
def get_onload(self, key=None):
|
|
if not key:
|
|
return self.get("__onload", frappe._dict())
|
|
|
|
return self.get("__onload")[key]
|
|
|
|
def queue_action(self, action, **kwargs):
|
|
"""Run an action in background. If the action has an inner function,
|
|
like _submit for submit, it will call that instead"""
|
|
# call _submit instead of submit, so you can override submit to call
|
|
# run_delayed based on some action
|
|
# See: Stock Reconciliation
|
|
from frappe.utils.background_jobs import enqueue
|
|
|
|
if hasattr(self, f"_{action}"):
|
|
action = f"_{action}"
|
|
|
|
try:
|
|
self.lock()
|
|
except frappe.DocumentLockedError:
|
|
frappe.throw(
|
|
_("This document is currently queued for execution. Please try again"),
|
|
title=_("Document Queued"),
|
|
)
|
|
|
|
return enqueue(
|
|
"frappe.model.document.execute_action",
|
|
__doctype=self.doctype,
|
|
__name=self.name,
|
|
__action=action,
|
|
**kwargs,
|
|
)
|
|
|
|
def lock(self, timeout=None):
|
|
"""Creates a lock file for the given document. If timeout is set,
|
|
it will retry every 1 second for acquiring the lock again
|
|
|
|
:param timeout: Timeout in seconds, default 0"""
|
|
signature = self.get_signature()
|
|
if file_lock.lock_exists(signature):
|
|
lock_exists = True
|
|
if timeout:
|
|
for i in range(timeout):
|
|
time.sleep(1)
|
|
if not file_lock.lock_exists(signature):
|
|
lock_exists = False
|
|
break
|
|
if lock_exists:
|
|
raise frappe.DocumentLockedError
|
|
file_lock.create_lock(signature)
|
|
frappe.local.locked_documents.append(self)
|
|
|
|
def unlock(self):
|
|
"""Delete the lock file for this document"""
|
|
file_lock.delete_lock(self.get_signature())
|
|
if self in frappe.local.locked_documents:
|
|
frappe.local.locked_documents.remove(self)
|
|
|
|
def validate_from_to_dates(self, from_date_field: str, to_date_field: str) -> None:
|
|
"""Validate that the value of `from_date_field` is not later than the value of `to_date_field`."""
|
|
from_date = self.get(from_date_field)
|
|
to_date = self.get(to_date_field)
|
|
if not (from_date and to_date):
|
|
return
|
|
|
|
if date_diff(to_date, from_date) < 0:
|
|
frappe.throw(
|
|
_("{0} must be after {1}").format(
|
|
frappe.bold(_(self.meta.get_label(to_date_field))),
|
|
frappe.bold(_(self.meta.get_label(from_date_field))),
|
|
),
|
|
frappe.exceptions.InvalidDates,
|
|
)
|
|
|
|
def get_assigned_users(self):
|
|
assigned_users = frappe.get_all(
|
|
"ToDo",
|
|
fields=["allocated_to"],
|
|
filters={
|
|
"reference_type": self.doctype,
|
|
"reference_name": self.name,
|
|
"status": ("!=", "Cancelled"),
|
|
},
|
|
pluck="allocated_to",
|
|
)
|
|
|
|
return set(assigned_users)
|
|
|
|
def add_tag(self, tag):
|
|
"""Add a Tag to this document"""
|
|
from frappe.desk.doctype.tag.tag import DocTags
|
|
|
|
DocTags(self.doctype).add(self.name, tag)
|
|
|
|
def get_tags(self):
|
|
"""Return a list of Tags attached to this document"""
|
|
from frappe.desk.doctype.tag.tag import DocTags
|
|
|
|
return DocTags(self.doctype).get_tags(self.name).split(",")[1:]
|
|
|
|
def deferred_insert(self) -> None:
|
|
"""Push the document to redis temporarily and insert later.
|
|
|
|
WARN: This doesn't guarantee insertion as redis can be restarted
|
|
before data is flushed to database.
|
|
"""
|
|
|
|
from frappe.deferred_insert import deferred_insert
|
|
|
|
self.set_user_and_timestamp()
|
|
|
|
doc = self.get_valid_dict(convert_dates_to_str=True, ignore_virtual=True)
|
|
deferred_insert(doctype=self.doctype, records=doc)
|
|
|
|
def __repr__(self):
|
|
name = self.name or "unsaved"
|
|
doctype = self.__class__.__name__
|
|
|
|
docstatus = f" docstatus={self.docstatus}" if self.docstatus else ""
|
|
parent = f" parent={self.parent}" if getattr(self, "parent", None) else ""
|
|
|
|
return f"<{doctype}: {name}{docstatus}{parent}>"
|
|
|
|
def __str__(self):
|
|
name = self.name or "unsaved"
|
|
doctype = self.__class__.__name__
|
|
|
|
return f"{doctype}({name})"
|
|
|
|
|
|
def execute_action(__doctype, __name, __action, **kwargs):
|
|
"""Execute an action on a document (called by background worker)"""
|
|
doc = frappe.get_doc(__doctype, __name)
|
|
doc.unlock()
|
|
try:
|
|
getattr(doc, __action)(**kwargs)
|
|
except Exception:
|
|
frappe.db.rollback()
|
|
|
|
# add a comment (?)
|
|
if frappe.local.message_log:
|
|
msg = json.loads(frappe.local.message_log[-1]).get("message")
|
|
else:
|
|
msg = "<pre><code>" + frappe.get_traceback() + "</pre></code>"
|
|
|
|
doc.add_comment("Comment", _("Action Failed") + "<br><br>" + msg)
|
|
doc.notify_update()
|
|
|
|
|
|
def bulk_insert(
|
|
doctype: str,
|
|
documents: Iterable["Document"],
|
|
ignore_duplicates: bool = False,
|
|
chunk_size=10_000,
|
|
):
|
|
"""Insert simple Documents objects to database in bulk.
|
|
|
|
Warning/Info:
|
|
- All documents are inserted without triggering ANY hooks.
|
|
- This function assumes you've done the due dilligence and inserts in similar fashion as db_insert
|
|
- Documents can be any iterable / generator containing Document objects
|
|
"""
|
|
|
|
doctype_meta = frappe.get_meta(doctype)
|
|
documents = list(documents)
|
|
|
|
valid_column_map = {
|
|
doctype: doctype_meta.get_valid_columns(),
|
|
}
|
|
values_map = {
|
|
doctype: _document_values_generator(documents, valid_column_map[doctype]),
|
|
}
|
|
|
|
for child_table in doctype_meta.get_table_fields():
|
|
valid_column_map[child_table.options] = frappe.get_meta(child_table.options).get_valid_columns()
|
|
values_map[child_table.options] = _document_values_generator(
|
|
(
|
|
ch_doc
|
|
for ch_doc in (
|
|
child_docs for doc in documents for child_docs in doc.get(child_table.fieldname)
|
|
)
|
|
),
|
|
valid_column_map[child_table.options],
|
|
)
|
|
|
|
for dt, docs in values_map.items():
|
|
frappe.db.bulk_insert(
|
|
dt, valid_column_map[dt], docs, ignore_duplicates=ignore_duplicates, chunk_size=chunk_size
|
|
)
|
|
|
|
|
|
def _document_values_generator(
|
|
documents: Iterable["Document"],
|
|
columns: list[str],
|
|
) -> Generator[tuple[Any], None, None]:
|
|
for doc in documents:
|
|
doc.creation = doc.modified = now()
|
|
doc.created_by = doc.modified_by = frappe.session.user
|
|
doc_values = doc.get_valid_dict(
|
|
convert_dates_to_str=True,
|
|
ignore_nulls=True,
|
|
ignore_virtual=True,
|
|
)
|
|
yield tuple(doc_values.get(col) for col in columns)
|