refactor: slim down init.py further (#33284)

* refactor: move document utils

* test: rewrite init.py test

The LOC is 1000 less than what it used to be.
This commit is contained in:
Ankush Menat 2025-07-11 15:34:50 +05:30 committed by GitHub
parent 1ed2447f6b
commit b74b8f16b7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 203 additions and 204 deletions

View file

@ -45,7 +45,7 @@ from frappe.utils.translations import _, _lt, set_user_lang
# Local application imports
from .exceptions import *
from .types import FilterSignature, _dict
from .types import _dict
from .utils.jinja import (
get_email_from_template,
get_jenv,
@ -699,30 +699,6 @@ def generate_hash(txt: str | None = None, length: int = 56) -> str:
return secrets.token_hex(math.ceil(length / 2))[:length]
def new_doc(
doctype: str,
*,
parent_doc: Optional["Document"] = None,
parentfield: str | None = None,
as_dict: bool = False,
**kwargs,
) -> "Document":
"""Return a new document of the given DocType with defaults set.
:param doctype: DocType of the new document.
:param parent_doc: [optional] add to parent document.
:param parentfield: [optional] add against this `parentfield`.
:param as_dict: [optional] return as dictionary instead of Document.
:param kwargs: [optional] You can specify fields as field=value pairs in function call.
"""
from frappe.model.create_new import get_new_doc
new_doc = get_new_doc(doctype, parent_doc, parentfield, as_dict=as_dict)
return new_doc.update(kwargs)
def set_value(doctype, docname, fieldname, value=None):
"""Set document value. Calls `frappe.client.set_value`"""
import frappe.client
@ -730,118 +706,6 @@ def set_value(doctype, docname, fieldname, value=None):
return frappe.client.set_value(doctype, docname, fieldname, value)
def get_cached_doc(*args: Any, **kwargs: Any) -> "Document":
"""Identical to `frappe.get_doc`, but return from cache if available."""
if (key := can_cache_doc(args)) and (doc := cache.get_value(key)):
return doc
# Not found in cache, fetch from DB
doc = get_doc(*args, **kwargs)
# Store in cache
if not key:
key = get_document_cache_key(doc.doctype, doc.name)
_set_document_in_cache(key, doc)
return doc
def _set_document_in_cache(key: str, doc: "Document") -> None:
cache.set_value(key, doc, expires_in_sec=3600)
def can_cache_doc(args) -> str | None:
"""
Determine if document should be cached based on get_doc params.
Return cache key if doc can be cached, None otherwise.
"""
if not args:
return
doctype = args[0]
name = doctype if len(args) == 1 or args[1] is None else args[1]
# Only cache if both doctype and name are strings
if isinstance(doctype, str) and isinstance(name, str):
return get_document_cache_key(doctype, name)
def get_document_cache_key(doctype: str, name: str):
return f"document_cache::{doctype}::{name}"
def clear_document_cache(doctype: str, name: str | None = None) -> None:
frappe.db.value_cache.pop(doctype, None)
def clear_in_redis():
if name is not None:
cache.delete_value(get_document_cache_key(doctype, name))
else:
cache.delete_keys(get_document_cache_key(doctype, ""))
clear_in_redis()
if hasattr(db, "after_commit"):
db.after_commit.add(clear_in_redis)
db.after_rollback.add(clear_in_redis)
if doctype == "System Settings" and hasattr(local, "system_settings"):
delattr(local, "system_settings")
if doctype == "Website Settings" and hasattr(local, "website_settings"):
delattr(local, "website_settings")
def get_cached_value(
doctype: str, name: str | dict, fieldname: str | Iterable[str] = "name", as_dict: bool = False
) -> Any:
try:
doc = get_cached_doc(doctype, name)
except DoesNotExistError:
clear_last_message()
return
if isinstance(fieldname, str):
if as_dict:
throw("Cannot make dict for single fieldname")
return doc.get(fieldname)
values = [doc.get(f) for f in fieldname]
if as_dict:
return _dict(zip(fieldname, values, strict=False))
return values
def get_single_value(setting: str, fieldname: str, /, *, as_dict: bool = False):
"""Return the cached value associated with the given fieldname from single DocType.
Usage:
telemetry_enabled = frappe.get_single_value("System Settings", "telemetry_enabled")
"""
return get_cached_value(setting, setting, fieldname=fieldname, as_dict=as_dict)
def get_last_doc(
doctype,
filters: FilterSignature | None = None,
order_by="creation desc",
*,
for_update=False,
):
"""Get last created document of this type."""
d = get_all(doctype, filters=filters, limit_page_length=1, order_by=order_by, pluck="name")
if d:
return get_doc(doctype, d[0], for_update=for_update)
else:
raise DoesNotExistError(doctype=doctype)
def get_single(doctype):
"""Return a `frappe.model.document.Document` object of the given Single doctype."""
return get_doc(doctype, doctype)
def get_meta_module(doctype):
import frappe.modules
@ -885,11 +749,6 @@ def delete_doc(
)
def delete_doc_if_exists(doctype, name, force=0):
"""Delete document if exists."""
delete_doc(doctype, name, force=force, ignore_missing=True)
def reload_doctype(doctype, force=False, reset_permissions=False):
"""Reload DocType from model (`[module]/[doctype]/[name]/[name].json`) files."""
reload_doc(
@ -1373,50 +1232,6 @@ def import_doc(path):
import_doc(path)
def copy_doc(doc: "Document", ignore_no_copy: bool = True) -> "Document":
"""No_copy fields also get copied."""
import copy
from types import MappingProxyType
from frappe.model.base_document import BaseDocument
def remove_no_copy_fields(d):
for df in d.meta.get("fields", {"no_copy": 1}):
if hasattr(d, df.fieldname):
d.set(df.fieldname, None)
fields_to_clear = ["name", "owner", "creation", "modified", "modified_by"]
if not in_test:
fields_to_clear.append("docstatus")
if isinstance(doc, BaseDocument):
d = doc.as_dict()
elif isinstance(doc, MappingProxyType): # global test record
d = dict(doc)
else:
d = doc
newdoc = get_doc(copy.deepcopy(d))
newdoc.set("__islocal", 1)
for fieldname in [*fields_to_clear, "amended_from", "amendment_date"]:
newdoc.set(fieldname, None)
if not ignore_no_copy:
remove_no_copy_fields(newdoc)
for d in newdoc.get_all_children():
d.set("__islocal", 1)
for fieldname in fields_to_clear:
d.set(fieldname, None)
if not ignore_no_copy:
remove_no_copy_fields(d)
return newdoc
def respond_as_web_page(
title,
html,
@ -1733,7 +1548,21 @@ import frappe._optimizations
from frappe.cache_manager import clear_cache, reset_metadata_version
from frappe.config import get_common_site_config, get_conf, get_site_config
from frappe.core.doctype.system_settings.system_settings import get_system_settings
from frappe.model.document import get_doc, get_lazy_doc
from frappe.model.document import (
get_doc,
get_lazy_doc,
copy_doc,
new_doc,
get_cached_doc,
can_cache_doc,
get_document_cache_key,
clear_document_cache,
get_cached_value,
get_single_value,
get_last_doc,
get_single,
_set_document_in_cache,
)
from frappe.model.meta import get_meta
from frappe.realtime import publish_progress, publish_realtime
from frappe.utils import get_traceback, mock, parse_json, safe_eval, create_folder
@ -1745,6 +1574,7 @@ from frappe.email import sendmail
# for backwards compatibility
format = format_value
delete_doc_if_exists = delete_doc
frappe._optimizations.optimize_all()
frappe._optimizations.register_fault_handler()

View file

@ -224,9 +224,6 @@ def load():
return {"timezones": get_all_timezones(), "defaults": defaults}
cache_key = frappe.get_document_cache_key("System Settings", "System Settings")
def get_system_settings(key: str):
"""Return the value associated with the given `key` from System Settings DocType."""
if not (system_settings := getattr(frappe.local, "system_settings", None)):
@ -241,7 +238,7 @@ def get_system_settings(key: str):
def clear_system_settings_cache():
frappe.client_cache.delete_value(cache_key)
frappe.client_cache.delete_value(frappe.get_document_cache_key("System Settings", "System Settings"))
frappe.cache.delete_value("system_settings")
frappe.cache.delete_value("time_zone")

View file

@ -28,6 +28,7 @@ from frappe.model.naming import set_new_name, validate_name
from frappe.model.utils import is_virtual_doctype, simple_singledispatch
from frappe.model.workflow import set_workflow_state_on_action, validate_workflow
from frappe.types import DF
from frappe.types.filter import FilterSignature
from frappe.utils import compare, cstr, date_diff, file_lock, flt, get_table_name, now
from frappe.utils.data import get_absolute_url, get_datetime, get_timedelta, getdate
from frappe.utils.global_search import update_global_search
@ -2024,3 +2025,183 @@ class LazyChildTable:
return __dict[fieldname]
# Note: Don't implement __set__ method! https://docs.python.org/3/howto/descriptor.html#descriptor-protocol
def copy_doc(doc: "Document", ignore_no_copy: bool = True) -> "Document":
"""No_copy fields also get copied."""
import copy
from types import MappingProxyType
from frappe.model.base_document import BaseDocument
def remove_no_copy_fields(d):
for df in d.meta.get("fields", {"no_copy": 1}):
if hasattr(d, df.fieldname):
d.set(df.fieldname, None)
fields_to_clear = ["name", "owner", "creation", "modified", "modified_by"]
if not frappe.in_test:
fields_to_clear.append("docstatus")
if isinstance(doc, BaseDocument):
d = doc.as_dict()
elif isinstance(doc, MappingProxyType): # global test record
d = dict(doc)
else:
d = doc
newdoc = get_doc(copy.deepcopy(d))
newdoc.set("__islocal", 1)
for fieldname in [*fields_to_clear, "amended_from", "amendment_date"]:
newdoc.set(fieldname, None)
if not ignore_no_copy:
remove_no_copy_fields(newdoc)
for d in newdoc.get_all_children():
d.set("__islocal", 1)
for fieldname in fields_to_clear:
d.set(fieldname, None)
if not ignore_no_copy:
remove_no_copy_fields(d)
return newdoc
def new_doc(
doctype: str,
*,
parent_doc: Optional["Document"] = None,
parentfield: str | None = None,
as_dict: bool = False,
**kwargs,
) -> "Document":
"""Return a new document of the given DocType with defaults set.
:param doctype: DocType of the new document.
:param parent_doc: [optional] add to parent document.
:param parentfield: [optional] add against this `parentfield`.
:param as_dict: [optional] return as dictionary instead of Document.
:param kwargs: [optional] You can specify fields as field=value pairs in function call.
"""
from frappe.model.create_new import get_new_doc
new_doc = get_new_doc(doctype, parent_doc, parentfield, as_dict=as_dict)
return new_doc.update(kwargs)
def get_cached_doc(*args: Any, **kwargs: Any) -> "Document":
"""Identical to `frappe.get_doc`, but return from cache if available."""
if (key := can_cache_doc(args)) and (doc := frappe.cache.get_value(key)):
return doc
# Not found in cache, fetch from DB
doc = get_doc(*args, **kwargs)
# Store in cache
if not key:
key = get_document_cache_key(doc.doctype, doc.name)
_set_document_in_cache(key, doc)
return doc
def _set_document_in_cache(key: str, doc: "Document") -> None:
frappe.cache.set_value(key, doc, expires_in_sec=3600)
def can_cache_doc(args) -> str | None:
"""
Determine if document should be cached based on get_doc params.
Return cache key if doc can be cached, None otherwise.
"""
if not args:
return
doctype = args[0]
name = doctype if len(args) == 1 or args[1] is None else args[1]
# Only cache if both doctype and name are strings
if isinstance(doctype, str) and isinstance(name, str):
return get_document_cache_key(doctype, name)
def get_document_cache_key(doctype: str, name: str):
return f"document_cache::{doctype}::{name}"
def clear_document_cache(doctype: str, name: str | None = None) -> None:
frappe.db.value_cache.pop(doctype, None)
def clear_in_redis():
if name is not None:
frappe.cache.delete_value(get_document_cache_key(doctype, name))
else:
frappe.cache.delete_keys(get_document_cache_key(doctype, ""))
clear_in_redis()
if hasattr(frappe.db, "after_commit"):
frappe.db.after_commit.add(clear_in_redis)
frappe.db.after_rollback.add(clear_in_redis)
if doctype == "System Settings" and hasattr(frappe.local, "system_settings"):
delattr(frappe.local, "system_settings")
if doctype == "Website Settings" and hasattr(frappe.local, "website_settings"):
delattr(frappe.local, "website_settings")
def get_cached_value(
doctype: str, name: str | dict, fieldname: str | Iterable[str] = "name", as_dict: bool = False
) -> Any:
try:
doc = get_cached_doc(doctype, name)
except frappe.DoesNotExistError:
frappe.clear_last_message()
return
if isinstance(fieldname, str):
if as_dict:
frappe.throw("Cannot make dict for single fieldname")
return doc.get(fieldname)
values = [doc.get(f) for f in fieldname]
if as_dict:
return frappe._dict(zip(fieldname, values, strict=False))
return values
def get_single_value(setting: str, fieldname: str, /, *, as_dict: bool = False):
"""Return the cached value associated with the given fieldname from single DocType.
Usage:
telemetry_enabled = frappe.get_single_value("System Settings", "telemetry_enabled")
"""
return get_cached_value(setting, setting, fieldname=fieldname, as_dict=as_dict)
def get_last_doc(
doctype,
filters: FilterSignature | None = None,
order_by="creation desc",
*,
for_update=False,
):
"""Get last created document of this type."""
d = frappe.get_all(doctype, filters=filters, limit_page_length=1, order_by=order_by, pluck="name")
if d:
return get_doc(doctype, d[0], for_update=for_update)
else:
raise frappe.DoesNotExistError(doctype=doctype)
def get_single(doctype):
"""Return a `frappe.model.document.Document` object of the given Single doctype."""
return get_doc(doctype, doctype)

View file

@ -14,22 +14,13 @@ class TestProjectMeta(IntegrationTestCase):
frappe/__init__.py has grown crazy big and keeps getting bigger. Plot the LOC over time and
you'll see the madness and laziness in action.
This new ~test~ tax will require reducing 3 line per day (~1000 in a year) from 1st Jan
2025 onwards. I am offering a headstart of 50 days in this PR: #28869
Don't try to delete or bypass this.
Don't try to delete or bypass this test.
"""
baseline = 2587 # as of 27th Dec 2025
start_date = datetime.date(2025, 1, 1)
today = datetime.date.today()
tax_to_collect = (today - start_date).days * 3
baseline = 1600 # as of 11th July Dec 2026
init_py_size = len(Path(frappe.__file__).read_text().splitlines())
expected = max(baseline - tax_to_collect, 1500)
zen = """Anzrfcnprf ner bar ubaxvat terng vqrn -- yrg'f qb zber bs gubfr!"""
d = {}
for c in (65, 97):
@ -38,7 +29,7 @@ class TestProjectMeta(IntegrationTestCase):
self.assertLessEqual(
init_py_size,
expected,
"""You have either added too many lines to frappe/__init__.py or that file hasn't shrunk fast enough. Remember the Zen of Python:\n"""
baseline,
"""\nDon't add more code in frappe/__init__.py!\nRemember the Zen of Python:\n"""
+ "".join([d.get(c, c) for c in zen]),
)