Merge branch 'develop' into feat-webform-placeholder-support

This commit is contained in:
UmakanthKaspa 2025-10-15 22:31:12 +05:30 committed by GitHub
commit ea9c86ecf4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
76 changed files with 10237 additions and 7491 deletions

View file

@ -227,17 +227,17 @@ context("Control Link", () => {
field_name: "assigned_by",
property: "default",
property_type: "Text",
value: "Administrator",
value: cy.config("testUser"),
},
true
);
cy.reload();
cy.new_form("ToDo");
cy.fill_field("description", "new", "Text Editor").wait(200);
cy.fill_field("description", "new", "Text Editor").blur().wait(200);
cy.save();
cy.get(".frappe-control[data-fieldname=assigned_by_full_name] .control-value").should(
"contain",
"Administrator"
"Frappe"
);
// if user clears default value explicitly, system should not reset default again
cy.get_field("assigned_by").clear().blur();

View file

@ -437,14 +437,6 @@ def validate_link(doctype: str, docname: str, fields=None):
if not values.name:
return values
if not frappe.has_permission(doctype, "read", doc=values.name):
frappe.throw(
_("You do not have permission to access {0} {1}").format(
frappe.bold(doctype), frappe.bold(docname)
),
frappe.PermissionError,
)
if not fields:
frappe.local.response_headers.set("Cache-Control", "private,max-age=1800,stale-while-revalidate=7200")
return values

View file

@ -1027,6 +1027,13 @@ def list_sites(context: CliCtxObj, output_json=False):
click.echo("No sites found")
@click.command("setup-chrome")
def setup_chrome():
from frappe.utils.print_utils import setup_chromium
setup_chromium()
commands = [
build,
clear_cache,
@ -1059,4 +1066,5 @@ commands = [
add_to_email_queue,
rebuild_global_search,
list_sites,
setup_chrome,
]

View file

@ -1642,15 +1642,22 @@ def validate_fields(meta: Meta):
title=_("Invalid Option"),
)
if meta.is_virtual != child_doctype_meta.is_virtual:
error_msg = " should be virtual." if meta.is_virtual else " cannot be virtual."
if meta.is_virtual and not child_doctype_meta.is_virtual:
frappe.throw(
_("Child Table {0} for field {1}" + error_msg).format(
_("Child Table {0} for field {1} must be virtual").format(
frappe.bold(doctype), frappe.bold(docfield.fieldname)
),
title=_("Invalid Option"),
)
if not meta.is_virtual and child_doctype_meta.is_virtual and not docfield.is_virtual:
frappe.throw(
_("Field {0} must be a virtual field to support virtual doctype.").format(
frappe.bold(docfield.fieldname)
),
title=_("Virtual tables must be virtual fields"),
)
def check_max_height(docfield):
if getattr(docfield, "max_height", None) and (docfield.max_height[-2:] not in ("px", "em")):
frappe.throw(f"Max for {frappe.bold(docfield.fieldname)} height must be in px, em, rem")

View file

@ -122,16 +122,19 @@ class SystemSettings(Document):
if len(parts) != 2 or not (cint(parts[0]) or cint(parts[1])):
frappe.throw(_("Session Expiry must be in format {0}").format("hh:mm"))
if self.enable_two_factor_auth:
if self.two_factor_method == "SMS":
if not frappe.db.get_single_value("SMS Settings", "sms_gateway_url"):
frappe.throw(
_("Please setup SMS before setting it as an authentication method, via SMS Settings")
)
toggle_two_factor_auth(True, roles=["All"])
else:
self.bypass_2fa_for_retricted_ip_users = 0
self.bypass_restrict_ip_check_if_2fa_enabled = 0
if self.has_value_changed("enable_two_factor_auth"):
if self.enable_two_factor_auth:
if self.two_factor_method == "SMS":
if not frappe.db.get_single_value("SMS Settings", "sms_gateway_url"):
frappe.throw(
_(
"Please setup SMS before setting it as an authentication method, via SMS Settings"
)
)
toggle_two_factor_auth(True, roles=["All"])
else:
self.bypass_2fa_for_retricted_ip_users = 0
self.bypass_restrict_ip_check_if_2fa_enabled = 0
frappe.flags.update_last_reset_password_date = False
if self.force_user_to_reset_password and not cint(

View file

@ -499,3 +499,11 @@ function show_api_key_dialog(api_key, api_secret) {
1
);
}
frappe.ui.form.on("User Session Display", {
sign_out(frm, doctype, name) {
frappe
.xcall("frappe.core.doctype.user.user.clear_session", { sid_hash: name })
.then(() => frm.reload_doc());
},
});

View file

@ -111,7 +111,9 @@
"column_break_65",
"api_secret",
"onboarding_status",
"connections_tab"
"connections_tab",
"sessions_tab",
"active_sessions"
],
"fields": [
{
@ -835,6 +837,19 @@
"fieldname": "show_absolute_datetime_in_timeline",
"fieldtype": "Check",
"label": "Show Absolute Datetime in Timeline"
},
{
"fieldname": "sessions_tab",
"fieldtype": "Tab Break",
"label": "Sessions"
},
{
"depends_on": "eval:doc.name == frappe.session.user",
"fieldname": "active_sessions",
"fieldtype": "Table",
"is_virtual": 1,
"label": "Active Sessions",
"options": "User Session Display"
}
],
"icon": "fa fa-user",
@ -888,7 +903,7 @@
}
],
"make_attachments_public": 1,
"modified": "2025-06-05 23:55:27.884061",
"modified": "2025-10-15 18:07:21.606511",
"modified_by": "Administrator",
"module": "Core",
"name": "User",

View file

@ -3,6 +3,7 @@
from collections.abc import Iterable
from datetime import timedelta
from functools import cached_property
import frappe
import frappe.defaults
@ -63,9 +64,11 @@ class User(Document):
from frappe.core.doctype.has_role.has_role import HasRole
from frappe.core.doctype.user_email.user_email import UserEmail
from frappe.core.doctype.user_role_profile.user_role_profile import UserRoleProfile
from frappe.core.doctype.user_session_display.user_session_display import UserSessionDisplay
from frappe.core.doctype.user_social_login.user_social_login import UserSocialLogin
from frappe.types import DF
active_sessions: DF.Table[UserSessionDisplay]
allowed_in_mentions: DF.Check
api_key: DF.Data | None
api_secret: DF.Password | None
@ -143,6 +146,41 @@ class User(Document):
__new_password = None
@cached_property
def active_sessions(self):
sessions = frappe.qb.DocType("Sessions")
if self.name != frappe.session.user:
# sec: only allow users to see their sessions.
return []
sessions_data = (
frappe.qb.from_(sessions)
.select(sessions.user, sessions.sessiondata, sessions.sid)
.where(sessions.user == self.name)
).run(as_dict=True)
def mask(sid: str):
return sid[:4] + "*" * 10
session_docs = []
for session in sessions_data:
data = frappe.parse_json(session.sessiondata)
sid_hash = sha256_hash(session.sid)
session_docs.append(
{
"name": sid_hash,
"id": mask(sid_hash),
"owner": session.user,
"modified_by": session.user,
"ip_address": data.session_ip,
"last_updated": data.last_updated,
"is_current": session.sid == frappe.session.sid,
"session_created": data.creation,
"user_agent": data.user_agent,
}
)
return session_docs
def __setup__(self):
# because it is handled separately
self.flags.ignore_save_passwords = ["new_password"]
@ -1395,3 +1433,20 @@ def impersonate(user: str, reason: str):
notification.set("type", "Alert")
notification.insert(ignore_permissions=True)
frappe.local.login_manager.impersonate(user)
@frappe.whitelist()
@rate_limit(limit=10, seconds=60 * 60, methods="POST")
def clear_session(sid_hash: str):
from frappe.sessions import delete_session
sessions = frappe.qb.DocType("Sessions")
sessions_data = (
frappe.qb.from_(sessions).select(sessions.sid).where(sessions.user == frappe.session.user)
).run(pluck=True)
for session in sessions_data:
if sha256_hash(session) == sid_hash:
delete_session(sid=session, reason="Force Logged out by the user", user=frappe.session.user)
frappe.toast(_("Successfully signed out"))
return

View file

@ -0,0 +1,86 @@
{
"actions": [],
"allow_rename": 1,
"creation": "2025-06-23 14:19:48.353900",
"doctype": "DocType",
"engine": "InnoDB",
"field_order": [
"id",
"is_current",
"ip_address",
"user_agent",
"column_break_cjgl",
"session_created",
"last_updated",
"actions_section",
"sign_out"
],
"fields": [
{
"fieldname": "id",
"fieldtype": "Data",
"in_list_view": 1,
"label": "ID"
},
{
"fieldname": "ip_address",
"fieldtype": "Data",
"in_list_view": 1,
"label": "IP Address"
},
{
"fieldname": "session_created",
"fieldtype": "Datetime",
"in_list_view": 1,
"label": "Session Created"
},
{
"fieldname": "last_updated",
"fieldtype": "Datetime",
"in_list_view": 1,
"label": "Last Updated"
},
{
"depends_on": "eval:!doc.is_current",
"fieldname": "sign_out",
"fieldtype": "Button",
"label": "Sign Out"
},
{
"fieldname": "column_break_cjgl",
"fieldtype": "Column Break"
},
{
"fieldname": "actions_section",
"fieldtype": "Section Break",
"label": "Actions"
},
{
"fieldname": "user_agent",
"fieldtype": "Small Text",
"label": "User Agent"
},
{
"default": "0",
"fieldname": "is_current",
"fieldtype": "Check",
"in_list_view": 1,
"label": "Is Current"
}
],
"grid_page_length": 50,
"index_web_pages_for_search": 1,
"is_virtual": 1,
"istable": 1,
"links": [],
"modified": "2025-09-10 11:43:47.325045",
"modified_by": "Administrator",
"module": "Core",
"name": "User Session Display",
"owner": "Administrator",
"permissions": [],
"row_format": "Dynamic",
"sort_field": "creation",
"sort_order": "DESC",
"states": []
}

View file

@ -0,0 +1,38 @@
# Copyright (c) 2025, Frappe Technologies and contributors
# For license information, please see license.txt
# import frappe
from frappe.model.document import Document
class UserSessionDisplay(Document):
# begin: auto-generated types
# This code is auto-generated. Do not modify anything in this block.
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from frappe.types import DF
id: DF.Data | None
ip_address: DF.Data | None
is_current: DF.Check
last_updated: DF.Datetime | None
parent: DF.Data
parentfield: DF.Data
parenttype: DF.Data
session_created: DF.Datetime | None
user_agent: DF.SmallText | None
# end: auto-generated types
def db_insert(self, *args, **kwargs):
raise NotImplementedError
def load_from_db(self, *args, **kwargs):
raise NotImplementedError
def db_update(self, *args, **kwargs):
raise NotImplementedError
def delete(self, *args, **kwargs):
raise NotImplementedError

View file

@ -447,7 +447,7 @@ def get_title_values_for_table_and_multiselect_fields(doc, table_fields=None):
if not table_fields:
meta = frappe.get_meta(doc.doctype)
table_fields = meta.get_table_fields()
table_fields = meta.get_table_fields(include_computed=True)
for field in table_fields:
if not doc.get(field.fieldname):

View file

@ -89,7 +89,7 @@ on_logout = "frappe.core.doctype.session_default_settings.session_default_settin
pdf_header_html = "frappe.utils.pdf.pdf_header_html"
pdf_body_html = "frappe.utils.pdf.pdf_body_html"
pdf_footer_html = "frappe.utils.pdf.pdf_footer_html"
pdf_generator = "frappe.utils.pdf.get_chrome_pdf"
# permissions
permission_query_conditions = {

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

View file

@ -61,14 +61,6 @@ TABLE_DOCTYPES_FOR_CHILD_TABLES = MappingProxyType({})
DOCTYPES_FOR_DOCTYPE = {"DocType", *TABLE_DOCTYPES_FOR_DOCTYPE.values()}
UNPICKLABLE_KEYS = (
"meta",
"permitted_fieldnames",
"_parent_doc",
"_weakref",
"_table_fieldnames",
)
def _reduce_extended_instance(doc):
"""Make extended class instances pickle-able.
@ -145,7 +137,44 @@ def import_controller(doctype):
if not issubclass(class_, BaseDocument):
raise ImportError(f"{doctype}: {classname} is not a subclass of BaseDocument")
return _get_extended_class(class_, doctype)
class_ = _get_extended_class(class_, doctype)
return _update_computed_ct_props(class_, doctype)
def _update_computed_ct_props(class_, doctype):
if doctype in DOCTYPES_FOR_DOCTYPE or getattr(class_, "_computed_ct_props_updated", False):
return class_
meta = frappe.get_meta(doctype)
for df in meta.get_table_fields(include_computed=True):
if df.is_virtual:
_update_computed_ct_prop(class_, df)
class_._computed_ct_props_updated = True
return class_
def _update_computed_ct_prop(class_, df):
fieldname = df.fieldname
original_prop = getattr(class_, fieldname, None)
def computed_ct_prop(self):
if original_prop and is_a_property(original_prop):
value = original_prop.__get__(self, type(self))
elif options := getattr(df, "options", None):
value = self._evaluate_virtual_field_options(options)
else:
# no property or options found
# to compare, default value is None for non-child table virtual fields
value = []
# converting to document objects + caching
self.set(fieldname, value)
return self.__dict__[fieldname]
setattr(class_, fieldname, property(computed_ct_prop))
def _get_extended_class(base_class, doctype):
@ -187,22 +216,6 @@ def _get_extended_class(base_class, doctype):
)
RESERVED_KEYWORDS = frozenset(
(
"doctype",
"meta",
"flags",
"_weakref",
"_parent_doc",
"_table_fields",
"_doc_before_save",
"_table_fieldnames",
"permitted_fieldnames",
"dont_update_if_missing",
)
)
class BaseDocument:
def __init__(self, d):
if d.get("doctype"):
@ -421,6 +434,7 @@ class BaseDocument:
controller = get_controller(doctype)
child = controller.__new__(controller)
child._table_fieldnames = TABLE_DOCTYPES_FOR_CHILD_TABLES
child._non_computed_table_fieldnames = TABLE_DOCTYPES_FOR_CHILD_TABLES
child.__init__(value)
__dict = child.__dict__
@ -447,7 +461,14 @@ class BaseDocument:
return self.meta._table_doctypes
def _get_table_fields(self):
@cached_property
def _non_computed_table_fieldnames(self) -> dict:
if self.doctype in DOCTYPES_FOR_DOCTYPE:
return self._table_fieldnames
return self.meta._non_computed_table_doctypes
def _get_table_fields(self, include_computed=False):
"""
To get table fields during Document init
Meta.get_table_fields goes into recursion for special doctypes
@ -460,7 +481,16 @@ class BaseDocument:
if self.doctype in DOCTYPES_FOR_DOCTYPE:
return ()
return self.meta.get_table_fields()
return self.meta.get_table_fields(include_computed=include_computed)
def _evaluate_virtual_field_options(self, options):
from frappe.utils.safe_exec import get_safe_globals
return frappe.safe_eval(
code=options,
eval_globals=get_safe_globals(),
eval_locals={"doc": self},
)
def get_valid_dict(
self, sanitize=True, convert_dates_to_str=False, ignore_nulls=False, ignore_virtual=False
@ -489,13 +519,7 @@ class BaseDocument:
value = getattr(self, fieldname)
elif options := getattr(df, "options", None):
from frappe.utils.safe_exec import get_safe_globals
value = frappe.safe_eval(
code=options,
eval_globals=get_safe_globals(),
eval_locals={"doc": self},
)
value = self._evaluate_virtual_field_options(options)
fieldtype = df.fieldtype
if isinstance(value, list) and fieldtype not in table_fields:
@ -541,12 +565,12 @@ class BaseDocument:
without worrying about whether or not they have values
"""
if not self._table_fieldnames:
if not self._non_computed_table_fieldnames:
return
__dict = self.__dict__
for fieldname in self._table_fieldnames:
for fieldname in self._non_computed_table_fieldnames:
if __dict.get(fieldname) is None:
__dict[fieldname] = []
@ -605,12 +629,17 @@ class BaseDocument:
convert_dates_to_str=False,
no_child_table_fields=False,
no_private_properties=False,
*,
ignore_computed_child_tables=False,
) -> dict:
doc = self.get_valid_dict(convert_dates_to_str=convert_dates_to_str, ignore_nulls=no_nulls)
doc["doctype"] = self.doctype
for fieldname in self._table_fieldnames:
children = self.get(fieldname) or []
table_fieldnames = (
self._non_computed_table_fieldnames if ignore_computed_child_tables else self._table_fieldnames
)
for fieldname in table_fieldnames:
children = getattr(self, fieldname, None) or []
doc[fieldname] = [
d.as_dict(
convert_dates_to_str=convert_dates_to_str,
@ -766,7 +795,7 @@ class BaseDocument:
"""Raw update parent + children
DOES NOT VALIDATE AND CALL TRIGGERS"""
self.db_update()
for fieldname in self._table_fieldnames:
for fieldname in self._non_computed_table_fieldnames:
for doc in self.get(fieldname):
doc.db_update()
@ -1500,3 +1529,24 @@ def _filter(data, filters, limit=None):
break
return out
CACHED_PROPERTIES = (prop for prop, value in vars(BaseDocument).items() if isinstance(value, cached_property))
UNPICKLABLE_KEYS = frozenset(
(
"_parent_doc",
*CACHED_PROPERTIES,
)
)
RESERVED_KEYWORDS = frozenset(
(
"doctype",
"flags",
"_parent_doc",
"_doc_before_save",
"dont_update_if_missing",
*CACHED_PROPERTIES,
)
)

View file

@ -8,7 +8,11 @@ import json
import re
from collections import Counter
from collections.abc import Mapping, Sequence
from functools import cached_property
from functools import cached_property, lru_cache
import sqlparse
from sqlparse import tokens
from sqlparse.sql import Function, Parenthesis, Statement
import frappe
import frappe.defaults
@ -33,6 +37,22 @@ from frappe.utils import (
)
from frappe.utils.data import DateTimeLikeObject, get_datetime, getdate, sbool
@lru_cache(maxsize=128)
def _parse_sql(field: str) -> Statement | None:
"""
Parse a given SQL statement using `sqlparse`.
Args:
field (str): The SQL statement string to parse.
Returns:
Statement | None: A `sqlparse.sql.Statement` object if parsing succeeds, otherwise `None`.
"""
if parsed := sqlparse.parse(field):
return parsed[0]
LOCATE_PATTERN = re.compile(r"locate\([^,]+,\s*[`\"]?name[`\"]?\s*\)", flags=re.IGNORECASE)
LOCATE_CAST_PATTERN = re.compile(r"locate\(([^,]+),\s*([`\"]?name[`\"]?)\s*\)", flags=re.IGNORECASE)
FUNC_IFNULL_PATTERN = re.compile(r"(strpos|ifnull|coalesce)\(\s*[`\"]?name[`\"]?\s*,", flags=re.IGNORECASE)
@ -456,6 +476,46 @@ from {tables}
"sleep",
]
def _find_subqueries(parsed: Statement) -> list:
"""
Recursively find all subqueries in a parsed SQL statement.
"""
subqueries = []
for token in parsed.tokens:
if isinstance(token, Parenthesis):
# Check for DML token for subquery check
is_subquery = False
for sub_token in token.tokens:
if sub_token.ttype is tokens.DML:
is_subquery = True
break
if is_subquery:
subqueries.append(token)
# Recursively check for nested subqueries
subqueries.extend(_find_subqueries(token))
elif token.is_group:
subqueries.extend(_find_subqueries(token))
return subqueries
def _check_sql_token(statement: Statement) -> None:
"""
Checks the output of `sqlparse.parse()` to detect blocked functions and subqueries.
"""
if _find_subqueries(statement):
_raise_exception()
for token in statement.tokens:
if isinstance(token, Function):
if (name := (token.get_name())) and name.lower() in blacklisted_functions:
_raise_exception()
if token.ttype == tokens.Keyword:
if token.value.lower() in blacklisted_keywords:
_raise_exception()
if token.is_group:
_check_sql_token(token)
def _raise_exception():
frappe.throw(_("Use of sub-query or function is restricted"), frappe.DataError)
@ -470,21 +530,8 @@ from {tables}
lower_field = field.lower().strip()
if SUB_QUERY_PATTERN.match(field):
# Check for subquery anywhere in the field, not just at the beginning
if "(" in lower_field:
# Check all parentheses pairs, not just the first one
paren_start = 0
while True:
location = lower_field.find("(", paren_start)
if location == -1:
break
token = lower_field[location + 1 :].lstrip().split(" ", 1)[0]
if any(
re.search(r"\b" + re.escape(keyword) + r"\b", token)
for keyword in blacklisted_keywords + blacklisted_functions
):
_raise_exception()
paren_start = location + 1
# Check all tokens for subquery detection
_check_sql_token(_parse_sql(field))
if "@" in lower_field:
# prevent access to global variables

View file

@ -288,7 +288,8 @@ class Document(BaseDocument):
for fieldname, child_doctype in self._table_fieldnames.items():
# Make sure not to query the DB for a child table, if it is a virtual one.
if not is_doctype and is_virtual_doctype(child_doctype):
self.set(fieldname, [])
# Remove cache so that the virtual field loads again
self.__dict__.pop(fieldname, None)
continue
if is_doctype:
@ -442,6 +443,7 @@ class Document(BaseDocument):
for d in self.get_all_children():
d.db_insert()
self.reset_computed_child_tables()
self.run_method("after_insert")
self.flags.in_insert = True
@ -547,6 +549,7 @@ class Document(BaseDocument):
self.db_update()
self.update_children()
self.reset_computed_child_tables()
self.run_post_save_methods()
# clear unsaved flag
@ -594,6 +597,8 @@ class Document(BaseDocument):
def update_child_table(self, fieldname: str, df: Optional["DocField"] = None):
"""sync child table for given fieldname"""
df: DocField = df or self.meta.get_field(fieldname)
if df.is_virtual:
return
all_rows = self.get(df.fieldname)
# delete rows that do not match the ones in the document
@ -623,6 +628,12 @@ class Document(BaseDocument):
d: Document
d.db_update()
def reset_computed_child_tables(self):
"""Reset computed child tables so that they are reloaded next time"""
for df in self.meta.get_table_fields(include_computed=True):
if df.is_virtual:
self.__dict__.pop(df.fieldname, None)
def get_doc_before_save(self) -> "Self":
return getattr(self, "_doc_before_save", None)
@ -887,7 +898,7 @@ class Document(BaseDocument):
return
all_fields = self.meta.fields.copy()
for table_field in self.meta.get_table_fields():
for table_field in self.meta.get_table_fields(include_computed=True):
all_fields += frappe.get_meta(table_field.options).fields or []
if all(df.permlevel == 0 for df in all_fields):
@ -903,7 +914,7 @@ class Document(BaseDocument):
# hasattr might return True for class attribute which can't be delattr-ed.
continue
for table_field in self.meta.get_table_fields():
for table_field in self.meta.get_table_fields(include_computed=True):
for df in frappe.get_meta(table_field.options).fields or []:
if df.permlevel and df.permlevel not in has_access_to:
for child in self.get(table_field.fieldname) or []:
@ -1118,12 +1129,16 @@ class Document(BaseDocument):
msg = ", ".join(each[2] for each in cancelled_links)
frappe.throw(_("Cannot link cancelled document: {0}").format(msg), frappe.CancelledLinkError)
def get_all_children(self, parenttype=None) -> list["Document"]:
"""Return all children documents from **Table** type fields in a list."""
def get_all_children(self, parenttype=None, *, include_computed=False) -> list["Document"]:
"""
Return all child documents from **Table** type fields in a list.
Excludes computed tables by default, unless `include_computed` is set to True.
"""
children = []
table_fieldnames = self._table_fieldnames if include_computed else self._non_computed_table_fieldnames
for fieldname, child_doctype in self._table_fieldnames.items():
for fieldname, child_doctype in table_fieldnames.items():
if parenttype and child_doctype != parenttype:
continue
@ -1323,7 +1338,7 @@ class Document(BaseDocument):
return frappe.clear_last_message()
for fieldname in self._table_fieldnames:
for fieldname in self._non_computed_table_fieldnames:
for row in self.get(fieldname):
row._doc_before_save = next(
(d for d in (self._doc_before_save.get(fieldname) or []) if d.name == row.name), None
@ -1985,8 +2000,8 @@ def get_lazy_controller(doctype):
# Dynamically construct a class that subclasses LazyDocument and original controller.
lazy_controller = type(f"Lazy{original_controller.__name__}", (LazyDocument, original_controller), {})
for fieldname, child_doctype in meta._table_doctypes.items():
setattr(lazy_controller, fieldname, LazyChildTable(fieldname, child_doctype))
for df in meta.get_table_fields():
setattr(lazy_controller, df.fieldname, LazyChildTable(df.fieldname, df.options))
lazy_controllers[doctype] = lazy_controller
return lazy_controllers[doctype]

View file

@ -244,8 +244,8 @@ class Meta(Document):
return set_only_once_fields
def get_table_fields(self):
return self._table_fields
def get_table_fields(self, include_computed=False):
return self._table_fields if include_computed else self._non_computed_table_fields
def get_global_search_fields(self):
"""Return list of fields with `in_global_search` set and `name` if set."""
@ -505,18 +505,38 @@ class Meta(Document):
if get_datetime(recent_change) > add_to_date(None, days=-1 * LARGE_TABLE_RECENCY_THRESHOLD):
self.is_large_table = True
def init_field_caches(self):
# field map
self._fields = {field.fieldname: field for field in self.fields}
@cached_property
def _fields(self):
return {field.fieldname: field for field in self.fields}
# table fields
@cached_property
def _table_fields(self):
if self.name == "DocType":
self._table_fields = DOCTYPE_TABLE_FIELDS
else:
self._table_fields = self.get("fields", {"fieldtype": ["in", table_fields]})
return DOCTYPE_TABLE_FIELDS
# table fieldname: doctype map
self._table_doctypes = {field.fieldname: field.options for field in self._table_fields}
return self.get("fields", {"fieldtype": ["in", table_fields]})
@cached_property
def _non_computed_table_fields(self):
if self.name == "DocType":
return self._table_fields
return self.get("fields", {"fieldtype": ["in", table_fields], "is_virtual": 0})
@cached_property
def _table_doctypes(self):
return {field.fieldname: field.options for field in self._table_fields}
@cached_property
def _non_computed_table_doctypes(self):
return {field.fieldname: field.options for field in self._non_computed_table_fields}
def init_field_caches(self):
self._fields
self._table_fields
self._non_computed_table_fields
self._table_doctypes
self._non_computed_table_doctypes
def sort_fields(self):
"""
@ -979,14 +999,7 @@ def _update_field_order_based_on_insert_after(field_order, insert_after_map):
field_order.extend(fields)
CACHE_PROPERTIES = frozenset(
(
"_fields",
"_table_fields",
"_table_doctypes",
*(prop for prop, value in vars(Meta).items() if isinstance(value, cached_property)),
)
)
CACHE_PROPERTIES = frozenset(prop for prop, value in vars(Meta).items() if isinstance(value, cached_property))
def _serialize(doc, no_nulls=False, *, is_child=False):

View file

@ -33,7 +33,7 @@ def export_to_files(record_list=None, record_module=None, verbose=0, create_init
def write_document_file(doc, record_module=None, create_init=True, folder_name=None):
doc_export = doc.as_dict(no_nulls=True)
doc_export = doc.as_dict(no_nulls=True, ignore_computed_child_tables=True)
doc.run_method("before_export", doc_export)
doc_export = strip_default_fields(doc, doc_export)

View file

@ -309,21 +309,22 @@ def make_boilerplate(
base_class_import = "from frappe.utils.nestedset import NestedSet"
if doc.get("is_virtual"):
controller_body = indent(
dedent(
"""
controller_body = """
def db_insert(self, *args, **kwargs):
raise NotImplementedError
def load_from_db(self):
def load_from_db(self, *args, **kwargs):
raise NotImplementedError
def db_update(self):
def db_update(self, *args, **kwargs):
raise NotImplementedError
def delete(self):
def delete(self, *args, **kwargs):
raise NotImplementedError
"""
if not doc.get("istable"):
controller_body += """
@staticmethod
def get_list(filters=None, page_length=20, **kwargs):
pass
@ -336,9 +337,8 @@ def make_boilerplate(
def get_stats(**kwargs):
pass
"""
),
"\t",
)
controller_body = indent(dedent(controller_body), "\t")
with open(target_file_path, "w") as target, open(template_file_path) as source:
template = source.read()

View file

@ -802,7 +802,9 @@ def has_child_permission(
if parent_meta.istable or not (
valid_parentfields := [
df.fieldname for df in parent_meta.get_table_fields() if df.options == child_doctype
df.fieldname
for df in parent_meta.get_table_fields(include_computed=True)
if df.options == child_doctype
]
):
push_perm_check_log(

View file

@ -268,7 +268,7 @@
"fieldname": "pdf_generator",
"fieldtype": "Select",
"label": "PDF Generator",
"options": "wkhtmltopdf"
"options": "wkhtmltopdf\nchrome"
},
{
"default": "DocType",
@ -292,7 +292,7 @@
"icon": "fa fa-print",
"idx": 1,
"links": [],
"modified": "2025-09-16 11:20:20.151669",
"modified": "2025-09-23 10:39:51.123539",
"modified_by": "Administrator",
"module": "Printing",
"name": "Print Format",

View file

@ -40,7 +40,7 @@ class PrintFormat(Document):
page_number: DF.Literal[
"Hide", "Top Left", "Top Center", "Top Right", "Bottom Left", "Bottom Center", "Bottom Right"
]
pdf_generator: DF.Literal["wkhtmltopdf"]
pdf_generator: DF.Literal["wkhtmltopdf", "chrome"]
print_format_builder: DF.Check
print_format_builder_beta: DF.Check
print_format_for: DF.Literal["DocType", "Report"]

View file

@ -259,12 +259,28 @@ frappe.ui.form.PrintView = class {
print_format.name &&
(print_format.print_format_builder || print_format.print_format_builder_beta) &&
print_format.standard === "No";
let is_standard_but_editable = print_format.name && print_format.custom_format;
if (is_standard_but_editable) {
let is_standard_jinja_custom =
print_format.standard === "Yes" &&
print_format.custom_format &&
print_format.print_format_type === "Jinja";
if (is_standard_jinja_custom) {
let doc = frappe.get_doc("Print Format", print_format.name);
frappe.model.with_doctype("Print Format", () => {
let newdoc = frappe.model.copy_doc(doc);
frappe.set_route("Form", "Print Format", newdoc.name);
});
return;
}
let is_editable = print_format.name && print_format.custom_format;
if (is_editable) {
frappe.set_route("Form", "Print Format", print_format.name);
return;
}
if (is_custom_format) {
if (print_format.print_format_builder_beta) {
frappe.set_route("print-format-builder-beta", print_format.name);
@ -680,11 +696,15 @@ frappe.ui.form.PrintView = class {
}
} else {
this.is_wkhtmltopdf_valid();
this.render_page("/api/method/frappe.utils.print_format.download_pdf?");
this.render_page(
"/api/method/frappe.utils.print_format.download_pdf?",
false,
print_format?.pdf_generator
);
}
}
render_page(method, printit = false) {
render_page(method, printit = false, pdf_generator = "wkhtmltopdf") {
let w = window.open(
frappe.urllib.get_full_url(
method +
@ -701,7 +721,9 @@ frappe.ui.form.PrintView = class {
encodeURIComponent(this.get_letterhead()) +
"&settings=" +
encodeURIComponent(JSON.stringify(this.additional_settings)) +
(this.lang_code ? "&_lang=" + this.lang_code : "")
(this.lang_code ? "&_lang=" + this.lang_code : "") +
"&pdf_generator=" +
encodeURIComponent(pdf_generator)
)
);
if (!w) {

View file

@ -1879,7 +1879,7 @@ frappe.ui.form.Form = class FrappeForm {
// returns list of children that are selected. returns [parentfield, name] for each
var selected = {},
me = this;
frappe.meta.get_table_fields(this.doctype).forEach(function (df) {
frappe.meta.get_table_fields(this.doctype, true).forEach(function (df) {
// handle TableMultiselect child fields
let _selected = [];
@ -1900,7 +1900,7 @@ frappe.ui.form.Form = class FrappeForm {
if (frappe.meta.docfield_map[this.doctype][fieldname]) {
doctype = this.doctype;
} else {
frappe.meta.get_table_fields(this.doctype).every(function (df) {
frappe.meta.get_table_fields(this.doctype, true).every(function (df) {
if (frappe.meta.docfield_map[df.options][fieldname]) {
doctype = df.options;
return false;

View file

@ -332,7 +332,7 @@ frappe.views.ListView = class ListView extends frappe.views.BaseList {
this.list_view_settings = list_view_settings;
this.setup_columns();
this.refresh(true);
this.refresh();
}
refresh(refresh_header = false) {

View file

@ -149,9 +149,17 @@ $.extend(frappe.meta, {
return docfield_map && docfield_map[fn];
},
get_table_fields: function (dt) {
get_table_fields: function (dt, include_computed = false) {
return $.map(frappe.meta.docfield_list[dt], function (d) {
return frappe.model.table_fields.includes(d.fieldtype) ? d : null;
if (!frappe.model.table_fields.includes(d.fieldtype)) {
return null;
}
if (!include_computed && d.is_virtual) {
return null;
}
return d;
});
},
@ -164,7 +172,7 @@ $.extend(frappe.meta, {
// found in parent
out = doctype;
} else {
frappe.meta.get_table_fields(doctype).every(function (d) {
frappe.meta.get_table_fields(doctype, true).every(function (d) {
if (
frappe.meta.has_field(d.options, key) ||
frappe.model.child_table_field_list.includes(key)

View file

@ -196,12 +196,12 @@ frappe.ui.SortSelector = class SortSelector {
}
}
get_sql_string() {
// build string like: `tabSales Invoice`.subject, `tabSales Invoice`.name desc
// build string like: `tabSales Invoice`.`subject`, `tabSales Invoice`.`name` desc
const table_name = "`tab" + this.doctype + "`";
const sort_by = `${table_name}.${this.sort_by}`;
const sort_by = `${table_name}.\`${this.sort_by}\``;
if (!["name", "creation", "modified"].includes(this.sort_by)) {
// add name column for deterministic ordering
return `${sort_by} ${this.sort_order}, ${table_name}.name ${this.sort_order}`;
return `${sort_by} ${this.sort_order}, ${table_name}.\`name\` ${this.sort_order}`;
} else {
return `${sort_by} ${this.sort_order}`;
}

View file

@ -1585,7 +1585,10 @@ frappe.views.ReportView = class ReportView extends frappe.views.ListView {
fields: this.get_dialog_fields(),
primary_action: (values) => {
// doctype fields
let fields = values[this.doctype].map((f) => [f, this.doctype]);
let fields = (values[this.doctype] || []).map((f) => [
f,
this.doctype,
]);
delete values[this.doctype];
// child table fields
@ -1610,6 +1613,18 @@ frappe.views.ReportView = class ReportView extends frappe.views.ListView {
},
});
const $bulk = $(`
<div class="mb-3">
<button class="btn btn-default btn-xs" data-action="select_all">${__("Select All")}</button>
<button class="btn btn-default btn-xs" data-action="unselect_all">${__("Unselect All")}</button>
</div>
`);
const toggleAll = (checked) =>
d.$wrapper.find(":checkbox").prop("checked", checked).trigger("change");
$bulk.on("click", "[data-action=select_all]", () => toggleAll(true));
$bulk.on("click", "[data-action=unselect_all]", () => toggleAll(false));
d.$body.prepend($bulk);
d.$body.prepend(`
<div class="columns-search">
<input type="text" placeholder="${__(

View file

@ -310,6 +310,7 @@ textarea.form-control {
}
.barcode-wrapper {
margin-top: var(--margin-xs);
text-align: center;
background-color: var(--control-bg);
border-radius: var(--border-radius);

View file

@ -677,8 +677,7 @@
}
.column-limit-reached {
background-color: var(--subtle-accent);
border-top-left-radius: var(--border-radius-md);
border-top-right-radius: var(--border-radius-md);
border-radius: var(--border-radius-md);
border: 1px solid var(--table-border-color);
overflow-y: hidden;
.form-grid {
@ -688,6 +687,9 @@
&:focus-visible {
@include grid-focus();
}
.grid-static-col {
height: 38px;
}
.grid-static-col.col-xs-1 {
flex: 1 0 60px;
width: 60px;

View file

@ -182,7 +182,7 @@ a.card-link:hover {
}
.text-extra-muted {
color: #d1d8dd !important;
color: var(--gray-500) !important;
}
.no-underline {

View file

@ -257,6 +257,9 @@ class Session:
self.data.data.user = self.user
self.data.data.session_ip = frappe.local.request_ip
if frappe.request:
self.data.data.user_agent = frappe.request.headers.get("User-Agent")
if session_end:
self.data.data.session_end = session_end
@ -420,6 +423,7 @@ class Session:
) and not frappe.flags.read_only:
self.data.data.last_updated = now
self.data.data.lang = str(frappe.lang)
self.data.data.session_ip = frappe.local.request_ip
Sessions = frappe.qb.DocType("Sessions")
# update sessions table

View file

@ -0,0 +1,49 @@
<!DOCTYPE html>
<html lang={{ lang }} dir={{ layout_direction }}>
<head>
<meta charset="utf-8">
{% for tag in head -%}
{{ tag | string }}
{%- endfor %}
<style>
body {
margin: 0 !important;
border: 0 !important;
padding-top: 1mm !important;
}
.letter-head,
.letter-head-footer {
margin-top: -12mm !important;
}
/* Dont show explicit links for <a> tags */
@media print {
/* padding is added to simulate old wkhtmltopdf format prints */
.wrapper {
box-sizing: border-box;
padding: 1mm 0 1mm !important;
page-break-after: always !important;
}
[document-status] {
margin-bottom: 0 !important;
}
a[href]:after {
content: none;
}
}
</style>
{% for tag in styles -%}
{{ tag | string }}
{%- endfor %}
</head>
<body>
<div class="print-format">
<div class="wrapper">
{% for tag in content -%}
{{ tag | string }}
{%- endfor %}
</div>
</div>
</body>
</html>

View file

@ -489,6 +489,23 @@ class TestDBQuery(IntegrationTestCase):
)
self.assertTrue("_relevance" in data[0])
# Test that fields with keywords in strings are allowed
data = DatabaseQuery("DocType").execute(
fields=["name", "locate('select', name)"],
limit_start=0,
limit_page_length=1,
)
self.assertTrue(data)
# Test that subqueries with other DML are blocked
self.assertRaises(
frappe.DataError,
DatabaseQuery("DocType").execute,
fields=["name", "issingle", "(insert into tabUser values (1))"],
limit_start=0,
limit_page_length=1,
)
data = DatabaseQuery("DocType").execute(
fields=["name", "issingle", "date(creation) as creation"],
limit_start=0,
@ -554,6 +571,16 @@ class TestDBQuery(IntegrationTestCase):
limit_page_length=1,
)
# Ensure search terms aren't blocked as functions
from frappe.desk.search import search_link
search_terms = ("global", "user")
for term in search_terms:
with self.subTest(term=term):
result = search_link("ToDo", term)
self.assertIsInstance(result, list)
def test_nested_permission(self):
frappe.set_user("Administrator")
create_nested_doctype()

View file

@ -2176,7 +2176,7 @@ def get_filter(doctype: str, filters: FilterSignature, filters_config=None) -> "
meta = frappe.get_meta(f.doctype)
if not meta.has_field(f.fieldname):
# try and match the doctype name from child tables
for df in meta.get_table_fields():
for df in meta.get_table_fields(include_computed=True):
if frappe.get_meta(df.options).has_field(f.fieldname):
f.doctype = df.options
break

View file

@ -131,6 +131,37 @@ def get_pdf(html, options=None, output: PdfWriter | None = None):
return filedata
def measure_time(func):
import time
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
print(f"Function {func.__name__} took {end_time - start_time:.4f} seconds")
return result
return wrapper
@measure_time
def get_chrome_pdf(print_format, html, options, output, pdf_generator=None):
from frappe.utils.pdf_generator.browser import Browser
from frappe.utils.pdf_generator.chrome_pdf_generator import ChromePDFGenerator
from frappe.utils.pdf_generator.pdf_merge import PDFTransformer
if pdf_generator != "chrome":
# Use the default pdf generator
return
# scrubbing url to expand url is not required as we have set url.
# also, planning to remove network requests anyway 🤞
generator = ChromePDFGenerator()
browser = Browser(generator, print_format, html, options)
transformer = PDFTransformer(browser)
# transforms and merges header, footer into body pdf and returns merged pdf
return transformer.transform_pdf(output=output)
def get_file_data_from_writer(writer_obj):
# https://docs.python.org/3/library/io.html
stream = io.BytesIO()

View file

@ -0,0 +1,465 @@
from typing import ClassVar
from bs4 import BeautifulSoup
import frappe
from frappe.utils.print_utils import convert_uom, parse_float_and_unit
class Browser:
def __init__(self, generator, print_format, html, options):
self.is_print_designer = frappe.get_cached_value("Print Format", print_format, "print_designer")
self.browserID = frappe.utils.random_string(10)
generator.add_browser(self.browserID)
# sets soup from html
self.set_html(html)
# sets wkhtmltopdf options
self.set_options(options)
# start cdp connection and create browser context ( kind of like new window / incognito mode)
self.open(generator)
# opens header and footer pages and sets content ( not waiting for it to load)
self.prepare_header_footer()
# opens body page and sets content and waits for it to finshing load
self.setup_body_page()
# prepare options as per chrome for pdf
self.prepare_options_for_pdf()
# generate header and footer pages if they are not dynamic ( first, odd, even, last)
self.update_header_footer_page_pd()
# if header and footer are not dynamic start generating pdf for them (non-blocking)
self.try_async_header_footer_pdf()
# now wait for page to load as we need DOM to generate pdf
self.body_page.wait_for_set_content()
self.body_pdf = self.body_page.generate_pdf(raw=not self.header_page and not self.footer_page)
self.body_page.close()
self.update_header_footer_page()
if self.header_page:
if not self.is_header_dynamic:
self.header_pdf = self.header_page.get_pdf_from_stream(self.header_page.get_pdf_stream_id())
else:
self.header_pdf = self.header_page.generate_pdf()
self.header_page.close()
if self.footer_page:
if not self.is_footer_dynamic:
self.footer_pdf = self.footer_page.get_pdf_from_stream(self.footer_page.get_pdf_stream_id())
else:
self.footer_pdf = self.footer_page.generate_pdf()
self.footer_page.close()
self.close()
generator.remove_browser(self.browserID)
def open(self, generator):
from frappe.utils.pdf_generator.cdp_connection import CDPSocketClient
# checking because if we share browser accross request _devtools_url will already be set for subsequent requests.
if not generator._devtools_url:
generator._set_devtools_url()
# start the CDP websocket connection to browser
self.session = CDPSocketClient(generator._devtools_url)
self.session.connect()
self.create_browser_context()
def create_browser_context(self):
# create browser context
result, error = self.session.send("Target.createBrowserContext", {"disposeOnDetach": True})
if error:
frappe.log_error(title="Error creating browser context:", message=f"{error}")
self.browser_context_id = result["browserContextId"]
def set_html(self, html):
self.soup = BeautifulSoup(html, "html5lib")
def set_options(self, options):
self.options = options
def new_page(self, page_type):
"""
# create a new page in the browser inside browser context
----
TODO: Implement Deterministic rendering for headless-chrome via DevTools Protocol ( waiting for macos support )
https://docs.google.com/document/d/1PppegrpXhOzKKAuNlP6XOEnviXFGUiX2hop00Cxcv4o/edit?tab=t.0#bookmark=id.dukbomwxpb3j
NOTE: In theory this will make it faster but more importantly use less cpu, ram etc.
"""
from frappe.utils.pdf_generator.page import Page
page = Page(self.session, self.browser_context_id, page_type)
page.is_print_designer = self.is_print_designer
return page
def setup_body_page(self):
self.body_page = self.new_page("body")
self.body_page.set_tab_url(frappe.request.host_url)
self.body_page.wait_for_navigate()
self.body_page.set_content(str(self.soup))
def close_page(self, type):
page = getattr(self, f"{type}_page")
page.close()
def is_page_no_used(self, soup):
# Check if any of the classes exist
classes_to_check = [
"page",
"frompage",
"topage",
"page_info_page",
"page_info_frompage",
"page_info_topage",
]
# Loop through the classes to check
for class_name in classes_to_check:
if soup.find(class_=class_name): # Check if any element with the class is found
return True # Return True if class is found
return False
def prepare_header_footer(self):
# code is structured like this to improve performance by running commands in chrome as soon as possible.
soup = self.soup
options = self.options
# open header and footer pages
self._open_header_footer_pages()
# get tags to pass to header template.
head = soup.find("head").contents
styles = soup.find_all("style")
# set header and footer content ( not waiting for it to load yet).
if self.header_page:
self.header_page.wait_for_navigate()
self.header_page.set_content(
self.get_rendered_header_footer(self.header_content, "header", head, styles, css=[])
)
if self.footer_page:
self.footer_page.wait_for_navigate()
self.footer_page.set_content(
self.get_rendered_header_footer(self.footer_content, "footer", head, styles, css=[])
)
if self.header_page:
self.header_page.wait_for_set_content()
self.header_height = self.header_page.get_element_height()
self.is_header_dynamic = self.is_page_no_used(self.header_content)
del self.header_content
else:
# bad implicit setting of margin #backwards-compatibility
options["margin-top"] = "15mm"
if self.footer_page:
self.footer_page.wait_for_set_content()
self.footer_height = self.footer_page.get_element_height()
self.is_footer_dynamic = self.is_page_no_used(self.footer_content)
del self.footer_content
else:
# bad implicit setting of margin #backwards-compatibility
options["margin-bottom"] = "15mm"
# Remove instances of them from main content for render_template
for html_id in ["header-html", "footer-html"]:
for tag in soup.find_all(id=html_id):
tag.extract()
def try_async_header_footer_pdf(self):
if self.header_page and not self.is_header_dynamic:
self.header_page.generate_pdf(wait_for_pdf=False)
if self.footer_page and not self.is_footer_dynamic:
self.footer_page.generate_pdf(wait_for_pdf=False)
def _get_converted_num(self, num_str, unit="px"):
parsed = parse_float_and_unit(num_str)
if parsed:
return convert_uom(parsed["value"], parsed["unit"], unit, only_number=True)
def _parse_pdf_options_from_html(self):
from frappe.utils.pdf import get_print_format_styles
soup: BeautifulSoup = self.soup
options = {}
print_format_css = get_print_format_styles(soup)
attrs = (
"margin-top",
"margin-bottom",
"margin-left",
"margin-right",
"page-size",
"header-spacing",
"orientation",
"page-width",
"page-height",
)
options |= {style.name: style.value for style in print_format_css if style.name in attrs}
self.options.update(options)
def _set_default_page_size(self):
options = self.options
pdf_page_size = (
options.get("page-size") or frappe.db.get_single_value("Print Settings", "pdf_page_size") or "A4"
)
if pdf_page_size == "Custom":
options["page-height"] = options.get("page-height") or frappe.db.get_single_value(
"Print Settings", "pdf_page_height"
)
options["page-width"] = options.get("page-width") or frappe.db.get_single_value(
"Print Settings", "pdf_page_width"
)
else:
options["page-size"] = pdf_page_size
def prepare_options_for_pdf(self):
self._parse_pdf_options_from_html()
self._set_default_page_size()
options = self.options
updated_options = {
"scale": 1,
"printBackground": True,
"transferMode": "ReturnAsStream",
"marginTop": 0,
"marginBottom": 0,
"marginLeft": 0,
"marginRight": 0,
"landscape": options.get("orientation", "Portrait") == "Landscape",
"preferCSSPageSize": False,
"pageRanges": options.get("page-ranges", ""),
# Experimental
"generateTaggedPDF": options.get("generate-tagged-pdf", False),
"generateOutline": options.get("generate-outline", False),
}
# bad implicit setting of margin #backwards-compatibility
if not self.is_print_designer:
if not options.get("margin-right"):
options["margin-right"] = "15mm"
if not options.get("margin-left"):
options["margin-left"] = "15mm"
if not options.get("page-height") or not options.get("page-width"):
if not (page_size := self.options.get("page-size")):
raise frappe.ValidationError("Page size is required")
if page_size == "CUSTOM":
raise frappe.ValidationError("Custom page size requires page-height and page-width")
size = PageSize.get(page_size)
if not size:
raise frappe.ValidationError("Invalid page size")
options["page-height"] = convert_uom(size["height"], "mm", "px", only_number=True)
options["page-width"] = convert_uom(size["width"], "mm", "px", only_number=True)
if isinstance(options["page-height"], str):
options["page-height"] = self._get_converted_num(options["page-height"])
if isinstance(options["page-width"], str):
options["page-width"] = self._get_converted_num(options["page-width"])
updated_options["paperWidth"] = convert_uom(options["page-width"], "px", "in", only_number=True)
if options.get("margin-left"):
updated_options["marginLeft"] = convert_uom(
self._get_converted_num(options["margin-left"]), "px", "in", only_number=True
)
if options.get("margin-right"):
updated_options["marginRight"] = convert_uom(
self._get_converted_num(options["margin-right"]), "px", "in", only_number=True
)
# make copy of options to update them in header, body, footer.
self.body_page.options = updated_options.copy()
if self.header_page:
self.header_page.options = updated_options.copy()
if self.footer_page:
self.footer_page.options = updated_options.copy()
margin_top = self._get_converted_num(options.get("margin-top", 0))
margin_bottom = self._get_converted_num(options.get("margin-bottom", 0))
header_with_top_margin = 0
header_with_spacing_top_margin = 0
footer_with_bottom_margin = 0
footer_height = 0
if self.header_page:
header_with_top_margin = self.header_height + margin_top
header_spacing = options.get("header-spacing", 0)
header_with_spacing_top_margin = header_with_top_margin + header_spacing
self.header_page.options["paperHeight"] = (
convert_uom(header_with_spacing_top_margin, "px", "in", only_number=True)
if header_with_spacing_top_margin
else 0
)
margin_top = convert_uom(margin_top, "px", "in", only_number=True)
if self.header_page:
self.header_page.options["marginTop"] = margin_top
else:
self.body_page.options["marginTop"] = margin_top
if self.footer_page:
footer_height = self.footer_height
self.footer_page.options["paperHeight"] = (
convert_uom(footer_height, "px", "in", only_number=True) if footer_height else 0
)
footer_with_bottom_margin = self.footer_height + margin_bottom
margin_bottom = convert_uom(margin_bottom, "px", "in", only_number=True)
if self.footer_page:
self.footer_page.options["marginBottom"] = margin_bottom
else:
self.body_page.options["marginBottom"] = margin_bottom
body_height = options.get("page-height") - (
header_with_spacing_top_margin + footer_with_bottom_margin
)
"""
matching scale for some old formats is 1.46 #backwards-compatibility ( scale 1 is better in my opinion)
If we face issues in custom formats then only we should enable this.
"""
self.body_page.options["paperHeight"] = convert_uom(body_height, "px", "in", only_number=True)
def get_rendered_header_footer(self, content, type, head, styles, css):
from frappe.utils.pdf import toggle_visible_pdf
html_id = f"{type}-html"
content = content.extract()
toggle_visible_pdf(content)
id_map = {"header": "pdf_header_html", "footer": "pdf_footer_html"}
hook_func = frappe.get_hooks(id_map.get(type))
return frappe.call(
hook_func[-1],
soup=self.soup,
head=head,
content=content,
styles=styles,
html_id=html_id,
css=css,
path="templates/print_formats/chrome_pdf_header_footer.html",
)
def update_header_footer_page(self):
if not self.header_page and not self.footer_page:
return
total_pages = len(self.body_pdf.pages)
# function is added to html from update_page_no.js
if self.header_page:
if self.is_header_dynamic:
self.header_page.evaluate(
f"clone_and_update('{'#header-render-container' if self.is_print_designer else '.wrapper'}', {total_pages}, {1 if self.is_print_designer else 0}, 'Header', 1);",
await_promise=True,
)
if self.footer_page:
if self.is_footer_dynamic:
self.footer_page.evaluate(
f"clone_and_update('{'#footer-render-container' if self.is_print_designer else '.wrapper'}', {total_pages}, {1 if self.is_print_designer else 0}, 'Footer', 1);",
await_promise=True,
)
def update_header_footer_page_pd(self):
if not self.is_print_designer:
return
if not self.header_page and not self.footer_page:
return
# function is added to html from update_page_no.js
if self.header_page and not self.is_header_dynamic:
self.header_page.evaluate(
"clone_and_update('#header-render-container', 0, 1, 'Header', 0);",
await_promise=True,
)
if self.footer_page and not self.is_footer_dynamic:
self.footer_page.evaluate(
"clone_and_update('#footer-render-container', 0, 1, 'Footer', 0);",
await_promise=True,
)
def _open_header_footer_pages(self):
self.header_page = None
self.footer_page = None
# open new page for header/footer if they exist.
# It sends CDP command to the browser to open a new tab.
if header_content := self.soup.find(id="header-html"):
self.header_page = self.new_page("header")
self.header_page.set_tab_url(frappe.request.host_url)
if footer_content := self.soup.find(id="footer-html"):
self.footer_page = self.new_page("footer")
self.footer_page.set_tab_url(frappe.request.host_url)
self.header_content = header_content
self.footer_content = footer_content
def close(self):
self.session.disconnect()
class PageSize:
page_sizes: ClassVar[dict[str, tuple[int, int]]] = {
"A10": (26, 37),
"A1": (594, 841),
"A0": (841, 1189),
"A3": (297, 420),
"A2": (420, 594),
"A5": (148, 210),
"A4": (210, 297),
"A7": (74, 105),
"A6": (105, 148),
"A9": (37, 52),
"A8": (52, 74),
"B10": (44, 31),
"B1+": (1020, 720),
"B4": (353, 250),
"B5": (250, 176),
"B6": (176, 125),
"B7": (125, 88),
"B0": (1414, 1000),
"B1": (1000, 707),
"B2": (707, 500),
"B3": (500, 353),
"B2+": (720, 520),
"B8": (88, 62),
"B9": (62, 44),
"C10": (40, 28),
"C9": (57, 40),
"C8": (81, 57),
"C3": (458, 324),
"C2": (648, 458),
"C1": (917, 648),
"C0": (1297, 917),
"C7": (114, 81),
"C6": (162, 114),
"C5": (229, 162),
"C4": (324, 229),
"Legal": (216, 356),
"Junior Legal": (127, 203),
"Letter": (216, 279),
"Tabloid": (279, 432),
"Ledger": (432, 279),
"ANSI C": (432, 559),
"ANSI A (letter)": (216, 279),
"ANSI B (ledger & tabloid)": (279, 432),
"ANSI E": (864, 1118),
"ANSI D": (559, 864),
}
@classmethod
def get(cls, name):
if name in cls.page_sizes:
width, height = cls.page_sizes[name]
return {"width": width, "height": height}
else:
return None # Return None if the page size is not found

View file

@ -0,0 +1,179 @@
import asyncio
import websockets
import frappe
class CDPSocketClient:
"""
Manages WebSocket communications with Chrome DevTools Protocol.
Ensures robust error handling and consistent logging.
"""
def __init__(self, websocket_url):
self.websocket_url = websocket_url
self.connection = None
self.message_id = 0
self.pending_messages = {}
self.listeners = {}
self.listen_task = None
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
def connect(self):
"""Open the WebSocket connection and start listening for messages."""
self.loop.run_until_complete(self._connect())
self.listen_task = self.loop.create_task(self._listen())
async def _connect(self):
try:
self.connection = await websockets.connect(self.websocket_url)
except Exception:
frappe.log_error(title="Failed to connect to WebSocket:", message=f"{frappe.get_traceback()}")
raise
async def _listen(self):
try:
async for message in self.connection:
self._handle_message(frappe.json.loads(message))
except Exception:
frappe.log_error(title="WebSocket listening error:", message=f"{frappe.get_traceback()}")
def _handle_message(self, response):
method = response.get("method")
params = response.get("params", {})
session_id = response.get("sessionId")
target_id = params.get("targetId")
frame_id = params.get("frameId")
message_id = response.get("id")
composite_key = (method, session_id, target_id, frame_id)
# Handle responses with `id`
if message_id and message_id in self.pending_messages:
future = self.pending_messages.pop(message_id)
if composite_key in self.pending_messages:
self.pending_messages.pop(composite_key)
future.set_result(response)
# Handle responses without `id` using a composite key
elif method:
if composite_key in self.pending_messages:
# print("matched using composite_key", composite_key)
future = self.pending_messages.pop(composite_key)
future.set_result(response)
if method in self.listeners:
for callback, future, filters in self.listeners[method]:
# added not filters["key"] might cause cross talk between different sessions
if (
(not session_id or not filters["sessionId"] or filters["sessionId"] == session_id)
and (not target_id or not filters["targetId"] or filters["targetId"] == target_id)
and (not frame_id or not filters["frameId"] or filters["frameId"] == frame_id)
):
callback(future, response)
def disconnect(self):
try:
if self.listen_task:
self.listen_task.cancel()
self.loop.run_until_complete(self._disconnect())
# Cancel all pending tasks before stopping the loop was causing degrading performance over time to not cancelled properly
pending_tasks = [task for task in asyncio.all_tasks(self.loop) if not task.done()]
for task in pending_tasks:
task.cancel()
try:
self.loop.run_until_complete(task) # Ensure tasks finish before loop stops
except asyncio.CancelledError:
pass # Ignore cancellation errors
except Exception:
frappe.log_error(title="Error while disconnecting:", message=f"{frappe.get_traceback()}")
raise
async def _disconnect(self):
try:
if self.connection and not self.connection.closed:
await self.connection.close()
self.connection = None
except Exception:
frappe.log_error(
title="Error during WebSocket disconnection:", message=f"{frappe.get_traceback()}"
)
def send(self, method, params=None, session_id=None, return_future=False):
if return_future:
return asyncio.ensure_future(
self._send(method, params, session_id, wait_future_fulfill=False), loop=self.loop
)
future = self.loop.run_until_complete(self._send(method, params, session_id))
return self._destructure_response(future.result())
async def _send(self, method, params=None, session_id=None, wait_future_fulfill=True):
self.message_id += 1
message_id = self.message_id
message = {
"id": message_id,
"method": method,
"params": params or {},
}
if session_id:
message["sessionId"] = session_id
if self.connection is None:
raise RuntimeError("WebSocket connection is not open.")
future = asyncio.Future()
self.pending_messages[message_id] = future
# Dynamically create the composite key
if any(
[
method,
session_id,
params.get("targetId") if params else None,
params.get("frameId") if params else None,
]
):
composite_key = (
method,
session_id,
params.get("targetId") if params else None,
params.get("frameId") if params else None,
)
self.pending_messages[composite_key] = future
await self.connection.send(frappe.json.dumps(message))
if wait_future_fulfill:
await future
return future
def _destructure_response(self, response):
"""Destructure the response to extract useful information."""
result = response.get("result", None)
error = response.get("error", None)
return result, error
def start_listener(self, method, callback, session_id=None, target_id=None, frame_id=None):
"""Register a listener for a specific CDP event with optional filtering."""
if method not in self.listeners:
self.listeners[method] = []
future = self.loop.create_future()
event = (callback, future, {"sessionId": session_id, "targetId": target_id, "frameId": frame_id})
if event not in self.listeners[method]:
self.listeners[method].append(event)
return event
def wait_for_event(self, event, timeout=3):
if type(event) is tuple:
event = event[1]
try:
self.loop.run_until_complete(asyncio.wait_for(event, timeout))
except asyncio.TimeoutError:
frappe.log_error(title="Timeout waiting for event", message=f"{frappe.get_traceback()}")
def remove_listener(self, method, event):
"""Remove a listener for a specific CDP event."""
self.listeners[method].remove(event)

View file

@ -0,0 +1,284 @@
import os
import platform
import subprocess
import time
from pathlib import Path
from typing import ClassVar
import requests
import frappe
from frappe import _
# TODO: close browser when worker is killed.
class ChromePDFGenerator:
EXECUTABLE_PATHS: ClassVar[dict[str, list[str]]] = {
"linux": ["chrome-linux", "headless_shell"],
"darwin": ["chrome-mac", "headless_shell"],
"windows": ["chrome-win", "headless_shell.exe"],
}
_instance = None
_browsers: ClassVar[list] = []
def add_browser(self, browser):
self._browsers.append(browser)
def remove_browser(self, browser):
self._browsers.remove(browser)
def __new__(cls):
# if instance or _chromium_process is not available create object else return current instance stored in cls._instance
if cls._instance is None or not cls._instance._chromium_process:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
"""Initialize only once."""
if hasattr(self, "_initialized"): # Prevent multiple initializations
return
self._initialized = True # Mark as initialized
self._chromium_process = None
self._chromium_path = None
self._devtools_url = None
self._initialize_chromium()
def _initialize_chromium(self):
# ideally browser is initailized from before request hook.
# if _chromium_process is not available then initialize it.
if self._chromium_process:
return
# get site config and load chromium settings.
site_config = frappe.get_common_site_config()
# only when we want to chromium on separate docker / server ( not implemented/tested yet )
self.CHROMIUM_WEBSOCKET_URL = site_config.get("chromium_websocket_url", "")
if self.CHROMIUM_WEBSOCKET_URL:
frappe.warn("Using external chromium websocket url. Make sure it is accessible.")
self._devtools_url = self.CHROMIUM_WEBSOCKET_URL
return
# only when we want to use chromium from a specific path ( incase we don't have chromium in bench folder )
self.CHROMIUM_BINARY_PATH = site_config.get("chromium_binary_path", "")
"""
Number of allowed open websocket connections to chromium.
This number will basically define how many concurrent requests can be handled by one chromium instance.
#TODO: Implement/Modify logic to handle multiple chromium instance in one class / per worker. currently we are starting one chromium.
"""
self.CHROME_OPEN_CONNECTIONS = site_config.get("chromium_max_concurrent", 1)
# if we want to use persistent ( long running ) chromium for all sites.
# current approch starts chrome per worker process.
# TODO: Better Implement logic to support for persistent chrome proccess.
self.USE_PERSISTENT_CHROMIUM = site_config.get("use_persistent_chromium", False)
# time to wait for chromium to start and provide dev tools url used in _set_devtools_url.
self.START_TIMEOUT = site_config.get("chromium_start_timeout", 3)
self._chromium_path = (
self._find_chromium_executable() if not self.CHROMIUM_BINARY_PATH else self.CHROMIUM_BINARY_PATH
)
if self._verify_chromium_installation():
if not self._devtools_url:
self.start_chromium_process()
def _find_chromium_executable(self):
"""Finds the Chromium executable or raises an error if not found."""
bench_path = frappe.utils.get_bench_path()
"""Determine the path to the Chromium executable. chromium is downloaded by download_chromium in print_designer/install.py"""
chromium_dir = os.path.join(bench_path, "chromium")
if not os.path.exists(chromium_dir):
frappe.throw(_("Chromium is not downloaded. Please run the setup first."))
platform_name = platform.system().lower()
if platform_name not in ["linux", "darwin", "windows"]:
frappe.throw(f"Unsupported platform: {platform_name}")
executable_name = self.EXECUTABLE_PATHS.get(platform_name)
# Construct the full path to the executable
exec_path = Path(chromium_dir).joinpath(*executable_name)
if not exec_path.exists():
frappe.throw(
f"Chromium executable not found: {exec_path}. please run bench setup-new-pdf-backend"
)
return str(exec_path)
def _verify_chromium_installation(self):
"""Ensures Chromium is available and executable, raising clearer errors if not."""
if not os.path.exists(self._chromium_path):
frappe.throw(
f"Chromium not available at the specified path. Please check the path: {self._chromium_path}"
)
if not os.access(self._chromium_path, os.X_OK):
frappe.throw(f"Chromium not executable at {self._chromium_path}")
return True
def start_chromium_process(self, debug=False):
"""
Launches Chromium in headless mode with robust logging and error handling.
chrome switches
https://peter.sh/experiments/chromium-command-line-switches/
NOTE: dbus issue in docker
https://source.chromium.org/chromium/chromium/src/+/main:content/app/content_main.cc;l=229-241?q=DBUS_SESSION_BUS_ADDRESS&ss=chromium
"""
try:
if debug:
command_args = [
"/Applications/Google Chrome.app/Contents/MacOS/Google Chrome", # path to locally installed chrome browser for debugging.
"--remote-debugging-port=0",
"--user-data-dir=/tmp/chromium-{}-user-data".format(
frappe.local.site + frappe.utils.random_string(10)
),
"--disable-gpu",
"--no-sandbox",
"--no-first-run",
"",
]
else:
command_args = [
self._chromium_path,
# 0 will automatically select a random open port from the ephemeral port range.
"--remote-debugging-port=0",
"--disable-gpu", # GPU is not available in production environment.
"--disable-field-trial-config",
"--disable-background-networking",
"--disable-background-timer-throttling",
"--disable-backgrounding-occluded-windows",
"--disable-back-forward-cache",
"--disable-breakpad",
"--disable-client-side-phishing-detection",
"--disable-component-extensions-with-background-pages",
"--disable-component-update",
"--no-default-browser-check",
"--disable-default-apps",
"--disable-dev-shm-usage",
"--disable-extensions",
"--disable-features=ImprovedCookieControls,LazyFrameLoading,GlobalMediaControls,DestroyProfileOnBrowserClose,MediaRouter,DialMediaRouteProvider,AcceptCHFrame,AutoExpandDetailsElement,CertificateTransparencyComponentUpdater,AvoidUnnecessaryBeforeUnloadCheckSync,Translate,HttpsUpgrades,PaintHolding,ThirdPartyStoragePartitioning,LensOverlay,PlzDedicatedWorker",
"--allow-pre-commit-input",
"--disable-hang-monitor",
"--disable-ipc-flooding-protection",
"--disable-popup-blocking",
"--disable-prompt-on-repost",
"--disable-renderer-backgrounding",
"--force-color-profile=srgb",
"--metrics-recording-only",
"--no-first-run",
"--password-store=basic",
"--use-mock-keychain",
"--no-service-autorun",
"--export-tagged-pdf",
"--disable-search-engine-choice-screen",
"--unsafely-disable-devtools-self-xss-warnings",
"--enable-use-zoom-for-dsf=false",
"--use-angle",
"--headless",
"--hide-scrollbars",
"--mute-audio",
"--blink-settings=primaryHoverType=2,availableHoverTypes=2,primaryPointerType=4,availablePointerTypes=4",
"--no-sandbox",
"--no-startup-window",
# related to HeadlessExperimental flag enable when Implement Deterministic rendering. check page class for more info.
# "--enable-surface-synchronization",
# "--run-all-compositor-stages-before-draw",
# "--disable-threaded-animation",
# "--disable-threaded-scrolling",
# "--disable-checker-imaging",
]
self._start_chromium_process(command_args)
except Exception as e:
frappe.log_error(f"Error starting Chromium: {e}")
frappe.throw(_("Could not start Chromium. Check logs for details."))
# Apply the decorator to monitor Chromium subprocess usage for development / debugging purposes.
# it will print and write usage data to a file ( defaults to chrome_process_usage.json).
# from print_designer.pdf_generator.monitor_subprocess import monitor_subprocess_usage
# @monitor_subprocess_usage(interval=0.1)
def _start_chromium_process(self, command_args):
if platform.system().lower() == "windows":
# hide cmd window
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
startupinfo.wShowWindow = subprocess.SW_HIDE
self._chromium_process = subprocess.Popen(
command_args,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
startupinfo=startupinfo,
text=True,
)
else:
self._chromium_process = subprocess.Popen(
command_args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
)
return self._chromium_process
def _set_devtools_url(self):
"""
Monitor Chromium's stderr for the DevTools WebSocket URL
----------------
other approch: if we choose port using find_available_port we can avoid this entirely and fetch_devtools_url() method.
NOTE: 1) in current approch output to stderr is pretty consistent.
2) other approch may seem reliable but it is slow compared to this in testing.
TODO:
final approch can be decided later after testing in production.
"""
stderr = self._chromium_process.stderr
start_time = time.time()
while time.time() - start_time < self.START_TIMEOUT:
# Read a single line from stderr and check if it contains the DevTools URL.
# Not using select() because it is not supported on Windows for non-socket file descriptors.
line = stderr.readline()
# not sure if "DevTools listening on" is consistent in all chromium versions.
if "DevTools listening on" in line:
url_start = line.find("ws://")
if url_start != -1:
self._devtools_url = line[url_start:].strip()
break
if not self._devtools_url:
self._chromium_process.terminate()
raise TimeoutError("Chromium took too long to start.")
def _close_browser(self):
"""
Close the headless Chromium browser.
"""
if self._browsers:
frappe.log("Cannot close Chromium as there are active browser instances.")
return
if self._chromium_process:
self._chromium_process.terminate()
ChromePDFGenerator._instance = None
self._chromium_process = None
self._devtools_url = None
frappe.log("Headless Chromium closed successfully.")
# not used anywhere in the code. read _set_devtools_url for more info. useful in case we want to take different approch to fetch devtools url.
def fetch_devtools_url(self, port):
if not port:
return None
url = f"http://127.0.0.1:{port}/json/version"
try:
response = requests.get(url)
response.raise_for_status() # Raise an exception for HTTP errors
response_data = response.json()
return response_data["webSocketDebuggerUrl"].strip()
except requests.ConnectionError:
frappe.log_error(
f"Failed to connect to the Chrome DevTools Protocol. Is Chrome running with --remote-debugging-port={port}"
)
except requests.RequestException as e:
frappe.log_error(f"An error occurred: {e}")
return None

View file

@ -0,0 +1,356 @@
import base64
import time
import urllib
import frappe
"""
CDP commands documentation can be found here.
https://chromedevtools.github.io/devtools-protocol/
"""
class Page:
def __init__(self, session, browser_context_id, page_type):
self.session = session
result, error = self.session.send(
"Target.createTarget", {"url": "", "browserContextId": browser_context_id}
)
if error:
frappe.log_error(title="Error creating new page:", message=f"{error}")
self.target_id = result["targetId"]
self.type = page_type
result, error = self.session.send(
"Target.attachToTarget", {"targetId": self.target_id, "flatten": True}
)
if error:
raise RuntimeError(f"Error attaching to target: {error}")
self.session_id = result["sessionId"]
self.send("Page.enable")
self.frame_id = None
self.get_frame_id_on_demand()
self.set_media_emulation("print")
self.set_cookies()
# TODO: make send to return future and don't wait for it by default.
def send(self, method, params=None, return_future=False):
if params is None:
params = {}
return self.session.send(method, params, self.session_id, return_future)
def get_frame_id_on_demand(self):
if self.frame_id:
return self.frame_id
try:
result, error = self.send("Page.getFrameTree")
if error:
raise RuntimeError(f"Error fetching frameId: {error}")
frame_tree = result["frameTree"]
frame = frame_tree["frame"]
self.frame_id = frame["id"]
return self.frame_id
except Exception:
frappe.log_error(title="Error fetching frameId:", message=f"{frappe.get_traceback()}")
raise
def _ensure_frame_id(self):
if not self.frame_id:
self.get_frame_id_on_demand()
return self.frame_id
def set_media_emulation(self, media_type: str = "print"):
"""Set media emulation for the page."""
return self.send("Emulation.setEmulatedMedia", {"media": media_type})
def set_cookies(self):
if frappe.session and frappe.session.sid and hasattr(frappe.local, "request"):
domain = frappe.utils.get_host_name().split(":", 1)[0]
cookie = {
"name": "sid",
"value": frappe.session.sid,
"domain": domain,
"sameSite": "Strict",
}
_result, error = self.send("Network.enable")
if error:
raise RuntimeError(f"Error enabling network: {error}")
_result, error = self.send("Network.setCookie", cookie)
if error:
raise RuntimeError(f"Error setting cookie: {error}")
_result, error = self.send("Network.disable")
if error:
raise RuntimeError(f"Error disabling network: {error}")
def intercept_request_and_fulfill(self, url_pattern):
"""Starts intercepting network requests for the given target_id and URL pattern."""
data = {}
def on_request_paused_event(future, response):
"""Callback for when a request is paused (intercepted)."""
params = response.get("params")
if params and params.get("requestId"):
data["request_id"] = params["requestId"]
if not future.done():
future.set_result(data["request_id"])
# Start listening for requestPaused event
event = self.session.start_listener(
"Fetch.requestPaused", on_request_paused_event, self.session_id, self.target_id, self.frame_id
)
# Enable request interception for the specified URL pattern
self.session.send("Fetch.enable", {"patterns": [{"urlPattern": url_pattern}]})
def intercept_and_fulfill():
self.session.wait_for_event(event)
self.session.send(
"Fetch.fulfillRequest",
{"requestId": event[1].result(), "responseCode": 200},
return_future=True,
)
self.session.remove_listener("Fetch.requestPaused", event)
return intercept_and_fulfill
def intercept_request_for_local_resources(self, url_pattern="*"):
"""Starts intercepting network requests for the given target_id and URL pattern."""
data = {}
def on_request_paused_event(future, response):
"""Callback for when a request is paused (intercepted)."""
params = response.get("params")
if params and params.get("requestId"):
data["request_id"] = params["requestId"]
url = params["request"]["url"]
if url.startswith(frappe.request.host_url):
path = url.replace(frappe.request.host_url, "").split("?v", 1)[0]
if path.startswith("assets/") or path.startswith("files/"):
path = urllib.parse.unquote(path)
if path.startswith("files/"):
path = frappe.utils.get_site_path("public", path)
content = frappe.read_file(path, as_base64=True)
response_headers = []
# write logic to handle all file types as required
if path.endswith(".svg"):
response_headers.append({"name": "Content-Type", "value": "image/svg+xml"})
if content:
self.session.send(
"Fetch.fulfillRequest",
{
"requestId": data["request_id"],
"responseCode": 200, # actually hande the response code from the request
"responseHeaders": response_headers,
"body": content,
},
return_future=True,
)
return
self.session.send(
"Fetch.continueRequest",
{"requestId": data["request_id"]},
return_future=True,
)
# Start listening for requestPaused event
self.session.start_listener(
"Fetch.requestPaused", on_request_paused_event, self.session_id, self.target_id, self.frame_id
)
# Enable request interception for the specified URL pattern
self.session.send("Fetch.enable", {"patterns": [{"urlPattern": url_pattern}]})
def set_tab_url(self, url):
"""Navigate to a URL and fulfill the request with status code 200."""
# Intercept and fulfill request with 200 status code
wait_and_fulfill = self.intercept_request_and_fulfill(url)
# Now, navigate after intercepting the request
wait_start = self.wait_for_load(wait_for="load")
page_navigate = self.send("Page.navigate", {"url": url}, return_future=True)
wait_and_fulfill()
def wait_for_navigate():
self.session.wait_for_event(page_navigate, 3)
wait_start()
self.wait_for_navigate = wait_for_navigate
def evaluate(self, expression, await_promise=False):
self.send("Runtime.enable")
result, error = self.send(
"Runtime.evaluate", {"expression": expression, "awaitPromise": await_promise}
)
if error:
# retry if error in 500ms for 3 times (just safe guard as i had few edge cases where it failed).
# waiting for network is still slower than this.
for _i in range(3):
print(f"Error evaluating expression: {error}. Retrying in 500ms")
time.sleep(0.5)
result, error = self.send(
"Runtime.evaluate", {"expression": expression, "awaitPromise": await_promise}
)
if not error:
break
raise RuntimeError(f"Error evaluating expression: {error}")
self.send("Runtime.disable")
return result
# set wait_for to networkIdle if pdf is not rendering correctly.
# if you face header Height to be incorrect as some external script is changing elements.
# networkIdle is most stable option but make it a lot slower so avoiding for now. enable if not stable
def set_content(self, html, wait_for=None):
if not wait_for:
wait_for = ["load", "DOMContentLoaded"]
self.intercept_request_for_local_resources()
wait_start = self.wait_for_load(wait_for=wait_for)
self.send("Page.setDocumentContent", {"frameId": self._ensure_frame_id(), "html": html})
self.wait_for_set_content = wait_start
def wait_for_load(self, wait_for, timeout=60):
self.send("Page.setLifecycleEventsEnabled", {"enabled": True})
status = {}
if isinstance(wait_for, str):
status[wait_for] = False
if isinstance(wait_for, list):
for event in wait_for:
status[event] = False
def on_lifecycle_event(future, response):
params = response.get("params", {})
if params.get("name") in status.keys():
status[params.get("name")] = True
if all(status.values()):
if not future.done():
future.set_result(response)
event = self.session.start_listener(
"Page.lifecycleEvent", on_lifecycle_event, self.session_id, self.target_id, self.frame_id
)
def start_wait():
self.session.wait_for_event(event, timeout)
self.session.remove_listener("Page.lifecycleEvent", event)
return start_wait
def get_element_height(self, selector="body"):
try:
if not self.is_print_designer:
selector = ".wrapper"
self.send("DOM.enable")
doc_result, doc_error = self.send("DOM.getDocument")
if doc_error:
raise RuntimeError(f"Error getting document node: {doc_error}")
doc_node_id = doc_result["root"]["nodeId"]
result, error = self.send("DOM.querySelector", {"nodeId": doc_node_id, "selector": selector})
if error:
raise RuntimeError(f"Error querying selector: {error}")
node_id = result["nodeId"]
result, error = self.send("DOM.getBoxModel", {"nodeId": node_id})
if error:
raise RuntimeError(f"Error getting computed style: {error}")
height = result["model"]["height"]
finally:
self.send("DOM.disable")
return height
def add_page_size_css(self):
width = str(self.options["paperWidth"]) + "in"
height = str(self.options["paperHeight"]) + "in"
marginLeft = str(self.options["marginLeft"]) + "in"
marginRight = str(self.options["marginRight"]) + "in"
marginTop = str(self.options["marginTop"]) + "in"
marginBottom = str(self.options["marginBottom"]) + "in"
# Enable DOM and CSS agents
result, error = self.send("DOM.enable")
if error:
raise RuntimeError(f"Error enabling DOM: {error}")
result, error = self.send("CSS.enable")
if error:
raise RuntimeError(f"Error enabling CSS: {error}")
# Create a new stylesheet
result, error = self.send("CSS.createStyleSheet", {"frameId": self._ensure_frame_id()})
if error:
raise RuntimeError(f"Error creating stylesheet: {error}")
style_sheet_id = result["styleSheetId"]
# Define the CSS rule for the page size
css_rule = f"""
@page {{
size: {width} {height};
margin: {marginTop} {marginRight} {marginBottom} {marginLeft};
}}
"""
# Apply the CSS rule to the created stylesheet
result, error = self.send("CSS.setStyleSheetText", {"styleSheetId": style_sheet_id, "text": css_rule})
if error:
raise RuntimeError(f"Error setting stylesheet text: {error}")
self.send("CSS.disable")
self.send("DOM.disable")
def generate_pdf(self, wait_for_pdf=True, raw=False):
self.add_page_size_css()
if not wait_for_pdf:
self.wait_for_pdf = self.send("Page.printToPDF", self.options, return_future=True)
return
result, error = self.send("Page.printToPDF", self.options)
if error:
raise RuntimeError(f"Error generating PDF: {error}")
if "stream" not in result:
raise ValueError("Stream handle not returned from Page.printToPDF")
return self.get_pdf_from_stream(result["stream"], raw)
def get_pdf_stream_id(self):
# wait for task to complete
self.session.wait_for_event(self.wait_for_pdf)
# wait for event to complete
task = self.wait_for_pdf.result()
future = task.result()
stream_id = future["result"]["stream"]
return stream_id
def get_pdf_from_stream(self, stream_id, raw=False):
from io import BytesIO
from pypdf import PdfReader
pdf_data = b""
offset = 0
while True:
chunk_result, error = self.send("IO.read", {"handle": stream_id, "offset": offset, "size": 4096})
if error:
raise RuntimeError(f"Error reading PDF chunk: {error}")
chunk_data = chunk_result["data"]
# we don't use base64Encode option but added check anyway as it is one of the valid options.
if chunk_result.get("base64Encoded", False):
chunk_data = base64.b64decode(chunk_data)
pdf_data += chunk_data
offset += len(chunk_data)
if chunk_result.get("eof", False):
break
_result, error = self.send("IO.close", {"handle": stream_id})
if error:
raise RuntimeError(f"Error closing PDF stream: {error}")
if raw:
return pdf_data
return PdfReader(BytesIO(pdf_data))
def close(self):
self.session.send("Fetch.disable")
_result, error = self.send("Target.closeTarget", {"targetId": self.target_id})
if error:
raise RuntimeError(f"Error closing target: {error}")

View file

@ -0,0 +1,117 @@
class PDFTransformer:
def __init__(self, browser):
self.browser = browser
self.body_pdf = browser.body_pdf
self.is_print_designer = browser.is_print_designer
self._set_header_pdf()
self._set_footer_pdf()
if not self.header_pdf and not self.footer_pdf:
return
self.no_of_pages = len(self.body_pdf.pages)
self.encrypt_password = self.browser.options.get("password", None)
# if not header / footer then return body pdf
def _set_header_pdf(self):
self.header_pdf = None
if hasattr(self.browser, "header_pdf"):
self.header_pdf = self.browser.header_pdf
self.is_header_dynamic = self.browser.is_header_dynamic
def _set_footer_pdf(self):
self.footer_pdf = None
if hasattr(self.browser, "footer_pdf"):
self.footer_pdf = self.browser.footer_pdf
self.is_footer_dynamic = self.browser.is_footer_dynamic
def transform_pdf(self, output=None):
from pypdf import PdfWriter
header = self.header_pdf
body = self.body_pdf
footer = self.footer_pdf
if not header and not footer:
return body
body_height = body.pages[0].mediabox.top
body_transform = header_height = footer_height = header_body_top = 0
if footer:
footer_height = footer.pages[0].mediabox.top
body_transform = footer_height
if header:
header_height = header.pages[0].mediabox.top
header_transform = body_height + footer_height
header_body_top = header_height + body_height + footer_height
if header and not self.is_header_dynamic:
for h in header.pages:
self._transform(h, header_body_top, header_transform)
for p in body.pages:
if header_body_top:
self._transform(p, header_body_top, body_transform)
if header:
if self.is_header_dynamic:
p.merge_page(
self._transform(header.pages[p.page_number], header_body_top, header_transform)
)
elif self.is_print_designer:
if p.page_number == 0:
p.merge_page(header.pages[0])
elif p.page_number == self.no_of_pages - 1:
p.merge_page(header.pages[3])
elif p.page_number % 2 == 0:
p.merge_page(header.pages[2])
else:
p.merge_page(header.pages[1])
else:
p.merge_page(header.pages[0])
if footer:
if self.is_footer_dynamic:
p.merge_page(footer.pages[p.page_number])
elif self.is_print_designer:
if p.page_number == 0:
p.merge_page(footer.pages[0])
elif p.page_number == self.no_of_pages - 1:
p.merge_page(footer.pages[3])
elif p.page_number % 2 == 0:
p.merge_page(footer.pages[2])
else:
p.merge_page(footer.pages[1])
else:
p.merge_page(footer.pages[0])
if output:
output.append_pages_from_reader(body)
return output
writer = PdfWriter()
writer.append_pages_from_reader(body)
if self.encrypt_password:
writer.encrypt(self.encrypt_password)
return self.get_file_data_from_writer(writer)
def _transform(self, page, page_top, ty):
from pypdf import PdfWriter, Transformation
transform = Transformation().translate(ty=ty)
page.mediabox.upper_right = (page.mediabox.right, page_top)
page.add_transformation(transform)
return page
def get_file_data_from_writer(self, writer_obj):
from io import BytesIO
# https://docs.python.org/3/library/io.html
stream = BytesIO()
writer_obj.write(stream)
# Change the stream position to start of the stream
stream.seek(0)
# Read up to size bytes from the object and return them
return stream.read()

View file

@ -229,6 +229,9 @@ def download_pdf(
letterhead=None,
pdf_generator: Literal["wkhtmltopdf", "chrome"] | None = None,
):
if pdf_generator is None:
pdf_generator = "wkhtmltopdf"
doc = doc or frappe.get_doc(doctype, name)
validate_print_permission(doc)

View file

@ -1,8 +1,18 @@
import os
import re
from typing import Literal
import click
import frappe
from frappe.utils.data import cint, cstr
EXECUTABLE_PATHS = {
"linux": ["chrome-linux", "headless_shell"],
"darwin": ["chrome-mac", "headless_shell"],
"windows": ["chrome-win", "headless_shell.exe"],
}
def get_print(
doctype=None,
@ -144,3 +154,336 @@ def attach_print(
file_name = cstr(file_name).replace(" ", "").replace("/", "-") + ext
return {"fname": file_name, "fcontent": content}
def setup_chromium():
"""Setup Chromium at the bench level."""
# Load Chromium version from common_site_config.json or use default
try:
executable = find_or_download_chromium_executable()
click.echo(f"Chromium is already set up at {executable}")
except Exception as e:
click.echo(f"Failed to setup Chromium: {e}")
raise RuntimeError(f"Failed to setup Chromium: {e}")
return executable
def find_or_download_chromium_executable():
"""Finds the Chromium executable or downloads if not found."""
import platform
from pathlib import Path
bench_path = frappe.utils.get_bench_path()
"""Determine the path to the Chromium executable."""
chromium_dir = os.path.join(bench_path, "chromium")
platform_name = platform.system().lower()
if platform_name not in ["linux", "darwin", "windows"]:
click.echo(f"Unsupported platform: {platform_name}")
executable_name = EXECUTABLE_PATHS.get(platform_name)
# Construct the full path to the executable
exec_path = Path(chromium_dir).joinpath(*executable_name)
if not exec_path.exists():
click.echo("Chromium is not available. downloading...")
download_chromium()
if not exec_path.exists():
click.echo("Error while downloading chrome")
return str(exec_path)
def download_chromium():
import platform
import shutil
import zipfile
import requests
bench_path = frappe.utils.get_bench_path()
"""Download and extract Chromium for the specific version at the bench level."""
chromium_dir = os.path.join(bench_path, "chromium")
# Remove old Chromium directory if it exists
if os.path.exists(chromium_dir):
click.echo("Removing old Chromium directory...")
shutil.rmtree(chromium_dir, ignore_errors=True)
os.makedirs(chromium_dir, exist_ok=True)
download_url = get_chromium_download_url()
file_name = os.path.basename(download_url)
zip_path = os.path.join(chromium_dir, file_name)
try:
click.echo(f"Downloading Chromium from {download_url}...")
# playwright's requires a user agent
headers = {"User-Agent": "Wget/1.21.1"}
with requests.get(download_url, stream=True, timeout=(10, 60), headers=headers) as r:
r.raise_for_status() # Raise an error for bad status codes
total_size = int(r.headers.get("content-length", 0)) # Get total file size
bar = click.progressbar(length=total_size, label="Downloading Chromium")
with open(zip_path, "wb") as f:
for chunk in r.iter_content(chunk_size=65536):
f.write(chunk)
bar.update(len(chunk))
click.echo("Extracting Chromium...")
with zipfile.ZipFile(zip_path, "r") as zip_ref:
zip_ref.extractall(chromium_dir)
if os.path.exists(zip_path):
os.remove(zip_path)
# There should be only one directory
# Ensure the correct directory is renamed
extracted = os.listdir(chromium_dir)[0]
executable_path = EXECUTABLE_PATHS[platform.system().lower()]
chrome_folder_name = executable_path[0]
if extracted != chrome_folder_name:
extracted_dir = os.path.join(chromium_dir, extracted)
renamed_dir = os.path.join(chromium_dir, chrome_folder_name)
if os.path.exists(extracted_dir):
click.echo(f"Renaming {extracted_dir} to {renamed_dir}")
os.rename(extracted_dir, renamed_dir)
else:
raise RuntimeError(f"Failed to rename extracted directory. Expected {chrome_folder_name}.")
if os.path.exists(renamed_dir):
executable_shell = os.path.join(renamed_dir, "chrome-headless-shell")
if os.path.exists(executable_shell):
os.rename(executable_shell, os.path.join(renamed_dir, "headless_shell"))
else:
raise RuntimeError("Failed to rename executable. Expected chrome-headless-shell.")
# Make the `headless_shell` executable
exec_path = os.path.join(renamed_dir, executable_path[1])
make_chromium_executable(exec_path)
click.echo(f"Chromium is ready to use at: {chromium_dir}")
except requests.Timeout:
click.echo("Download timed out. Check your internet connection.")
raise RuntimeError("Download timed out.")
except requests.ConnectionError:
click.echo("Failed to connect to Chromium download server.")
raise RuntimeError("Connection error.")
except requests.RequestException as e:
click.echo(f"Failed to download Chromium: {e}")
raise RuntimeError(f"Failed to download Chromium: {e}")
except zipfile.BadZipFile as e:
click.echo(f"Failed to extract Chromium: {e}")
raise RuntimeError(f"Failed to extract Chromium: {e}")
def get_chromium_download_url():
# Avoid this unless it is going to run on a single type of platform and you have the correct binary hosted.
common_config = frappe.get_common_site_config()
chrome_download_url = common_config.get("chromium_download_url", None)
if chrome_download_url:
return chrome_download_url
"""
We are going to use chrome-for-testing builds but unfortunately it doesn't have linux arm64 https://github.com/GoogleChromeLabs/chrome-for-testing/issues/1
so we will use playwright's fallback builds for linux arm64
TODO: we will also use the fallback builds for windows arm
https://community.arm.com/arm-community-blogs/b/tools-software-ides-blog/posts/native-chromium-builds-windows-on-arm
"""
"""
To find the CHROME_VERSION AND CHROME_FALLBACK_VERSION, follow these steps:
1. Visit the GitHub Actions page for Playwright: https://github.com/microsoft/playwright/actions/workflows/roll_browser_into_playwright.yml
2. Open the latest job run.
3. Navigate to the "Roll to New Browser Version" step.
4. In the logs, look for a line similar to:
Downloading Chromium 133.0.6943.16 (playwright build v1155)
Here, the first number (e.g., 133.0.6943.16) is the CHROME_VERSION, and the second number (e.g., 1155) is the CHROME_FALLBACK_VERSION.
"""
# Using Google's chrome-for-testing-public builds for most platforms. (close to end user experience)
# For Linux ARM64, we use Playwright's Chromium builds due to the lack of official support.
download_path = {
"linux64": "%s/linux64/chrome-headless-shell-linux64.zip",
"mac-arm64": "%s/mac-arm64/chrome-headless-shell-mac-arm64.zip",
"mac-x64": "%s/mac-x64/chrome-headless-shell-mac-x64.zip",
"win32": "%s/win32/chrome-headless-shell-win32.zip",
"win64": "%s/win64/chrome-headless-shell-win64.zip",
}
linux_arm_download_path = {
"ubuntu20.04-arm64": "%s/chromium-headless-shell-linux-arm64.zip",
"ubuntu22.04-arm64": "%s/chromium-headless-shell-linux-arm64.zip",
"ubuntu24.04-arm64": "%s/chromium-headless-shell-linux-arm64.zip",
"debian11-arm64": "%s/chromium-headless-shell-linux-arm64.zip",
"debian12-arm64": "%s/chromium-headless-shell-linux-arm64.zip",
}
platform_key = calculate_platform()
version = "133.0.6943.35"
playwright_build_version = "1157"
base_url = "https://storage.googleapis.com/chrome-for-testing-public/"
playwright_base_url = "https://cdn.playwright.dev/dbazure/download/playwright/builds/chromium/"
# Overwrite with values from common_site_config.json ( escape hatch )
version = common_config.get("chromium_version", version)
playwright_build_version = common_config.get("playwright_chromium_version", playwright_build_version)
# make sure that you have all required flavours at correct urls
base_url = common_config.get("chromium_download_base_url", base_url)
playwright_base_url = common_config.get("playwright_chromium_download_base_url", playwright_base_url)
if platform_key in download_path:
relative_path = download_path[platform_key]
elif platform_key in linux_arm_download_path:
version = playwright_build_version
base_url = playwright_base_url
relative_path = linux_arm_download_path[platform_key]
else:
frappe.throw(
f"No download path configured or Chromium download not available for platform: {platform_key}"
)
return f"{base_url}{relative_path % version}"
def make_chromium_executable(executable):
"""Make the Chromium executable."""
if os.path.exists(executable):
# check if the file is executable
if os.access(executable, os.X_OK):
click.echo(f"Chromium executable is already executable: {executable}")
return
click.echo(f"Making Chromium executable: {executable}")
os.chmod(executable, 0o755) # Set executable permissions
click.echo(f"Chromium executable permissions set: {executable}")
else:
raise RuntimeError(f"Chromium executable not found: {executable}.")
def calculate_platform():
"""
Determines the host platform and returns it as a string.
Includes logic for Linux ARM, Linux x64, macOS (Intel and ARM), and Windows (32-bit and 64-bit).
Returns:
str: The detected platform string (e.g., 'linux64', 'mac-arm64', etc.).
"""
import platform
system = platform.system().lower()
arch = platform.machine().lower()
# Handle Linux ARM-specific logic
if system == "linux" and arch == "aarch64":
distro_info = get_linux_distribution_info()
distro_id = distro_info.get("id", "")
version = distro_info.get("version", "")
major_version = int(version.split(".")[0]) if version else 0
if distro_id == "ubuntu":
if major_version < 20:
return "ubuntu18.04-arm64"
if major_version < 22:
return "ubuntu20.04-arm64"
if major_version < 24:
return "ubuntu22.04-arm64"
if major_version < 26:
return "ubuntu24.04-arm64"
return "<unknown>"
if distro_id in ["debian", "raspbian"]:
if major_version < 11:
return "debian10-arm64"
if major_version < 12:
return "debian11-arm64"
return "debian12-arm64"
return "<unknown>"
# Handle other platforms
elif system == "linux" and arch == "x86_64":
return "linux64"
elif system == "darwin" and arch == "arm64":
return "mac-arm64"
elif system == "darwin" and arch == "x86_64":
return "mac-x64"
elif system == "windows" and arch == "x86":
return "win32"
elif system == "windows" and arch == "x86_64":
return "win64"
return "<unknown>"
def get_linux_distribution_info():
# not tested
"""Retrieve Linux distribution information using the `distro` library."""
import distro
if not distro:
return {"id": "", "version": ""}
return {"id": distro.id().lower(), "version": distro.version()}
def parse_float_and_unit(input_text, default_unit="px"):
if isinstance(input_text, int | float):
return {"value": input_text, "unit": default_unit}
if not isinstance(input_text, str):
return
number = float(re.search(r"[+-]?([0-9]*[.])?[0-9]+", input_text).group())
valid_units = [r"px", r"mm", r"cm", r"in"]
unit = [match.group() for rx in valid_units if (match := re.search(rx, input_text))]
return {"value": number, "unit": unit[0] if len(unit) == 1 else default_unit}
def convert_uom(
number: float,
from_uom: Literal["px", "mm", "cm", "in"] = "px",
to_uom: Literal["px", "mm", "cm", "in"] = "px",
only_number: bool = False,
) -> float:
unit_values = {
"px": 1,
"mm": 3.7795275591,
"cm": 37.795275591,
"in": 96,
}
from_px = (
{
"to_px": 1,
"to_mm": unit_values["px"] / unit_values["mm"],
"to_cm": unit_values["px"] / unit_values["cm"],
"to_in": unit_values["px"] / unit_values["in"],
},
)
from_mm = (
{
"to_mm": 1,
"to_px": unit_values["mm"] / unit_values["px"],
"to_cm": unit_values["mm"] / unit_values["cm"],
"to_in": unit_values["mm"] / unit_values["in"],
},
)
from_cm = (
{
"to_cm": 1,
"to_px": unit_values["cm"] / unit_values["px"],
"to_mm": unit_values["cm"] / unit_values["mm"],
"to_in": unit_values["cm"] / unit_values["in"],
},
)
from_in = {
"to_in": 1,
"to_px": unit_values["in"] / unit_values["px"],
"to_mm": unit_values["in"] / unit_values["mm"],
"to_cm": unit_values["in"] / unit_values["cm"],
}
converstion_factor = ({"from_px": from_px, "from_mm": from_mm, "from_cm": from_cm, "from_in": from_in},)
if only_number:
return round(number * converstion_factor[0][f"from_{from_uom}"][0][f"to_{to_uom}"], 3)
return f"{round(number * converstion_factor[0][f'from_{from_uom}'][0][f'to_{to_uom}'], 3)}{to_uom}"

View file

@ -120,6 +120,7 @@ frappe.ui.form.on("Web Form", {
depends_on: df.depends_on,
placeholder: df.placeholder,
max_length: df.length,
description: df.description,
mandatory_depends_on: df.mandatory_depends_on,
read_only_depends_on: df.read_only_depends_on,
});
@ -340,6 +341,7 @@ frappe.ui.form.on("Web Form Field", {
doc.read_only = df.read_only;
doc.depends_on = df.depends_on;
doc.placeholder = df.placeholder;
doc.description = df.description;
doc.mandatory_depends_on = df.mandatory_depends_on;
doc.max_length = df.length;
doc.read_only_depends_on = df.read_only_depends_on;

View file

@ -1,5 +1,6 @@
import mimetypes
import os
from pathlib import Path
from werkzeug.wrappers import Response
from werkzeug.wsgi import wrap_file
@ -34,9 +35,15 @@ class StaticPage(BaseRenderer):
if not self.is_valid_file_path():
return
for app in frappe.get_installed_apps():
file_path = frappe.get_app_path(app, "www") + "/" + self.path
if os.path.isfile(file_path) and is_binary_file(file_path):
self.file_path = file_path
app_path = Path(frappe.get_app_path(app, "www"))
requested_path = (app_path / self.path).resolve()
if (
requested_path.is_relative_to(app_path)
and requested_path.is_file()
and is_binary_file(requested_path)
):
self.file_path = requested_path
break
def can_render(self):
return self.is_valid_file_path() and self.file_path

View file

@ -27,7 +27,14 @@ def get_context(context, **dict_params):
@frappe.whitelist(allow_guest=True)
def get(doctype, txt=None, limit_start=0, limit=20, pathname=None, **kwargs):
def get(
doctype: str,
txt: str | None = None,
limit_start: int = 0,
limit: int = 20,
pathname: str | None = None,
**kwargs,
):
"""Return processed HTML page for a standard listing."""
limit_start = cint(limit_start)
raw_result = get_list_data(doctype, txt, limit_start, limit=limit + 1, **kwargs)
@ -76,7 +83,14 @@ def get(doctype, txt=None, limit_start=0, limit=20, pathname=None, **kwargs):
@frappe.whitelist(allow_guest=True)
def get_list_data(
doctype, txt=None, limit_start=0, fields=None, cmd=None, limit=20, web_form_name=None, **kwargs
doctype: str,
txt: str | None = None,
limit_start: int = 0,
fields: list | None = None,
cmd: str | None = None,
limit: int = 20,
web_form_name: str | None = None,
**kwargs,
):
"""Return processed HTML page for a standard listing."""
limit_start = cint(limit_start)

View file

@ -308,7 +308,7 @@ def set_title_values_for_link_and_dynamic_link_fields(
def set_title_values_for_table_and_multiselect_fields(meta: "Meta", doc: "Document") -> None:
for field in meta.get_table_fields():
for field in meta.get_table_fields(include_computed=True):
if not doc.get(field.fieldname):
continue

View file

@ -90,6 +90,8 @@ dependencies = [
"posthog~=5.0.0",
"vobject~=0.9.9",
"pycountry~=24.6.1",
"websockets"
]
[project.urls]