Merge branch 'develop' into fix-float-parse-number-format

This commit is contained in:
mergify[bot] 2026-02-22 07:43:45 +00:00 committed by GitHub
commit 4e9212a5a7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
298 changed files with 35119 additions and 32279 deletions

View file

@ -182,10 +182,7 @@ if __name__ == "__main__":
only_frontend_code_changed = len(list(filter(is_frontend_code, files_list))) == len(files_list)
updated_py_file_count = len(list(filter(is_server_side_code, files_list)))
only_py_changed = updated_py_file_count == len(files_list)
run_postgres = (
has_label(pr_number, "postgres", repo) or
matches_postgres_filenames(files_list)
)
run_postgres = has_label(pr_number, "postgres", repo)
# Check for Skip CI label and other conditions
if has_skip_ci_label(pr_number, repo):

View file

@ -1,5 +1,7 @@
**/hooks.py,frappe.gettext.extractors.navbar.extract
**/doctype/*/*.json,frappe.gettext.extractors.doctype.extract
**/desktop_icon/*.json,frappe.gettext.extractors.desktop_icon.extract
**/workspace_sidebar/*.json,frappe.gettext.extractors.workspace_sidebar.extract
**/workspace/*/*.json,frappe.gettext.extractors.workspace.extract
**/web_form/*/*.json,frappe.gettext.extractors.web_form.extract
**/onboarding_step/*/*.json,frappe.gettext.extractors.onboarding_step.extract

1 **/hooks.py frappe.gettext.extractors.navbar.extract
2 **/doctype/*/*.json frappe.gettext.extractors.doctype.extract
3 **/desktop_icon/*.json frappe.gettext.extractors.desktop_icon.extract
4 **/workspace_sidebar/*.json frappe.gettext.extractors.workspace_sidebar.extract
5 **/workspace/*/*.json frappe.gettext.extractors.workspace.extract
6 **/web_form/*/*.json frappe.gettext.extractors.web_form.extract
7 **/onboarding_step/*/*.json frappe.gettext.extractors.onboarding_step.extract

View file

@ -16,6 +16,7 @@ import importlib
import inspect
import json
import os
import re
import sys
import threading
import warnings
@ -76,6 +77,7 @@ local = Local()
cache: "RedisWrapper" | None = None
client_cache: "ClientCache" | None = None
STANDARD_USERS = ("Guest", "Administrator")
SITE_NAME_PATTERN = re.compile(r"^[a-zA-Z0-9._-]+$")
# this global may be subsequently changed by frappe.tests.utils.toggle_test_mode()
in_test = False
@ -144,6 +146,9 @@ def init(site: str, sites_path: str = ".", new_site: bool = False, force: bool =
if getattr(local, "initialised", None) and not force:
return
if site and not SITE_NAME_PATTERN.match(site):
raise ValueError(f"Invalid site name `{site}`")
local.error_log = []
local.message_log = []
local.debug_log = []
@ -1472,25 +1477,23 @@ def logger(
def get_desk_link(doctype, name, show_title_with_name=False, open_in_new_tab=False):
from urllib.parse import quote
from frappe.desk.utils import slug
from frappe.utils.data import quoted
meta = get_meta(doctype)
title = get_value(doctype, name, meta.get_title_field())
target_attr = ' target="_blank"' if open_in_new_tab else ""
# encode for href
encoded_name = quote(name)
if show_title_with_name and name != title:
html = '<a href="/desk/Form/{doctype}/{encoded_name}"{target} style="font-weight: bold;">{doctype_local} {name}: {title_local}</a>'
html = '<a href="/desk/{doctype}/{encoded_name}"{target} style="font-weight: bold;">{doctype_local} {name}: {title_local}</a>'
else:
html = '<a href="/desk/Form/{doctype}/{encoded_name}"{target} style="font-weight: bold;">{doctype_local} {title_local}</a>'
html = '<a href="/desk/{doctype}/{encoded_name}"{target} style="font-weight: bold;">{doctype_local} {title_local}</a>'
return html.format(
doctype=doctype,
doctype=quoted(slug(doctype)),
name=name,
encoded_name=encoded_name,
encoded_name=quoted(name),
doctype_local=_(doctype),
title_local=_(title),
target=target_attr,

View file

@ -88,7 +88,7 @@ def get_default_path():
@frappe.whitelist()
def set_app_as_default(app_name):
def set_app_as_default(app_name: str):
if frappe.db.get_value("User", frappe.session.user, "default_app") == app_name:
frappe.db.set_value("User", frappe.session.user, "default_app", "")
else:
@ -96,7 +96,7 @@ def set_app_as_default(app_name):
@frappe.whitelist()
def get_incomplete_setup_route(current_app, app_route):
def get_incomplete_setup_route(current_app: str, app_route: str):
pending_apps = get_apps_with_incomplete_dependencies(current_app)
if not pending_apps:

View file

@ -127,7 +127,7 @@ class LoginManager:
self.full_name = None
self.user_type = None
if frappe.local.form_dict.get("cmd") == "login" or frappe.local.request.path == "/api/method/login":
if frappe.local.request.path == "/api/method/login":
if self.login() is False:
return
self.resume = False
@ -176,9 +176,12 @@ class LoginManager:
self.set_user_info()
def get_user_info(self):
self.info = frappe.get_cached_value(
result = frappe.get_cached_value(
"User", self.user, ["user_type", "first_name", "last_name", "user_image"], as_dict=1
)
if result is None:
frappe.throw(_("User does not exist"), frappe.DoesNotExistError)
self.info = result
self.user_type = self.info.user_type
def setup_boot_cache(self):
@ -201,7 +204,7 @@ class LoginManager:
frappe.local.cookie_manager.set_cookie("system_user", "yes", deduplicate=True)
if not resume:
frappe.local.response["message"] = "Logged In"
frappe.local.response["home_page"] = get_default_path() or "/desk"
frappe.local.response["home_page"] = get_home_page() or "/desk"
if not resume:
frappe.response["full_name"] = self.full_name

View file

@ -199,7 +199,7 @@ def get_assignments(doc) -> list[dict]:
@frappe.whitelist()
def bulk_apply(doctype, docnames):
def bulk_apply(doctype: str, docnames: str | list[str]):
docnames = frappe.parse_json(docnames)
background = len(docnames) > 5

View file

@ -1,7 +1,7 @@
# Copyright (c) 2018, Frappe Technologies Pvt. Ltd. and contributors
# License: MIT. See LICENSE
from datetime import timedelta
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
@ -556,7 +556,13 @@ def get_auto_repeat_entries(date=None):
@frappe.whitelist()
def make_auto_repeat(doctype, docname, frequency="Daily", start_date=None, end_date=None):
def make_auto_repeat(
doctype: str,
docname: str | int,
frequency: str = "Daily",
start_date: str | datetime | None = None,
end_date: str | datetime | None = None,
):
if not start_date:
start_date = getdate(today())
doc = frappe.new_doc("Auto Repeat")
@ -573,7 +579,9 @@ def make_auto_repeat(doctype, docname, frequency="Daily", start_date=None, end_d
# method for reference_doctype filter
@frappe.whitelist()
@frappe.validate_and_sanitize_search_inputs
def get_auto_repeat_doctypes(doctype, txt, searchfield, start, page_len, filters):
def get_auto_repeat_doctypes(
doctype: str, txt: str, searchfield: str, start: int, page_len: int, filters: str | dict | list
):
res = frappe.get_all(
"Property Setter",
{

View file

@ -19,7 +19,7 @@ from frappe.desk.doctype.form_tour.form_tour import get_onboarding_ui_tours
from frappe.desk.doctype.route_history.route_history import frequently_visited_links
from frappe.desk.form.load import get_meta_bundle
from frappe.email.inbox import get_email_accounts
from frappe.integrations.frappe_providers.frappecloud_billing import is_fc_site
from frappe.integrations.frappe_providers.frappecloud_billing import current_site_info, is_fc_site
from frappe.model.base_document import get_controller
from frappe.permissions import has_permission
from frappe.query_builder import DocType
@ -125,6 +125,8 @@ def get_bootinfo():
bootinfo.setup_wizard_completed_apps = get_setup_wizard_completed_apps() or []
bootinfo.desktop_icon_urls = get_desktop_icon_urls()
bootinfo.desktop_icon_style = get_icon_style() or "Subtle"
if bootinfo.is_fc_site:
bootinfo.site_info = current_site_info()
return bootinfo
@ -537,51 +539,52 @@ def get_sidebar_items(allowed_workspaces):
from frappe import _
from frappe.desk.doctype.workspace_sidebar.workspace_sidebar import auto_generate_sidebar_from_module
sidebars = frappe.get_all("Workspace Sidebar", fields=["name", "header_icon"])
workspace_sidebars = frappe.get_all("Workspace Sidebar", fields=["name", "header_icon"])
module_sidebars = auto_generate_sidebar_from_module()
sidebars.extend(module_sidebars)
workspace_sidebars.extend(module_sidebars)
sidebar_items = {}
for s in sidebars:
sidebar_title = s.get("name")
for sidebar in workspace_sidebars:
sidebar_title = sidebar.get("name")
sidebar_doc = None
if sidebar_title:
w = frappe.get_doc("Workspace Sidebar", sidebar_title)
sidebar_doc = frappe.get_doc("Workspace Sidebar", sidebar_title)
else:
sidebar_title = s.title
w = s
sidebar_title = sidebar.title
sidebar_doc = sidebar
if (
frappe.session.user == "Administrator"
or w.module in w.user.permitted_modules
or sidebar_doc.module in sidebar_doc.user.allow_modules
or sidebar_title == "My Workspaces"
):
sidebar_items[sidebar_title.lower()] = {
"label": sidebar_title,
"items": [],
"header_icon": s.get("header_icon"),
"module": w.module,
"app": w.app,
"header_icon": sidebar.get("header_icon"),
"module": sidebar_doc.module,
"app": sidebar_doc.app,
}
for si in w.items:
for item in sidebar_doc.items:
workspace_sidebar = {
"label": _(si.label),
"link_to": si.link_to,
"link_type": si.link_type,
"type": si.type,
"icon": si.icon,
"child": si.child,
"collapsible": si.collapsible,
"indent": si.indent,
"keep_closed": si.keep_closed,
"display_depends_on": si.display_depends_on,
"url": si.url,
"show_arrow": si.show_arrow,
"filters": si.filters,
"route_options": si.route_options,
"tab": si.navigate_to_tab,
"label": _(item.label),
"link_to": item.link_to,
"link_type": item.link_type,
"type": item.type,
"icon": item.icon,
"child": item.child,
"collapsible": item.collapsible,
"indent": item.indent,
"keep_closed": item.keep_closed,
"display_depends_on": item.display_depends_on,
"url": item.url,
"show_arrow": item.show_arrow,
"filters": item.filters,
"route_options": item.route_options,
"tab": item.navigate_to_tab,
}
if si.link_type == "Report" and si.link_to and frappe.db.exists("Report", si.link_to):
if item.link_type == "Report" and item.link_to and frappe.db.exists("Report", item.link_to):
report_type, ref_doctype = frappe.db.get_value(
"Report", si.link_to, ["report_type", "ref_doctype"]
"Report", item.link_to, ["report_type", "ref_doctype"]
)
workspace_sidebar["report"] = {
"report_type": report_type,
@ -589,8 +592,8 @@ def get_sidebar_items(allowed_workspaces):
}
if (
"My Workspaces" in sidebar_title
or si.type == "Section Break"
or w.is_item_allowed(si.link_to, si.link_type, allowed_workspaces)
or item.type == "Section Break"
or sidebar_doc.is_item_allowed(item.link_to, item.link_type, allowed_workspaces)
):
sidebar_items[sidebar_title.lower()]["items"].append(workspace_sidebar)
add_user_specific_sidebar(sidebar_items)

View file

@ -2,7 +2,7 @@
# License: MIT. See LICENSE
import json
import os
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any
import frappe
import frappe.model
@ -25,18 +25,18 @@ Requests via FrappeClient are also handled here.
@frappe.whitelist()
def get_list(
doctype,
fields=None,
filters=None,
group_by=None,
order_by=None,
limit_start=None,
limit_page_length=20,
parent=None,
debug: bool = False,
as_dict: bool = True,
or_filters=None,
expand=None,
doctype: str,
fields: str | list[str | dict[str, Any]] | None = None,
filters: str | list | dict[str, Any] | None = None,
group_by: str | list[str] | None = None,
order_by: str | list[str] | None = None,
limit_start: int | str | None = None,
limit_page_length: int | str = 20,
parent: str | None = None,
debug: bool | int = False,
as_dict: bool | int = True,
or_filters: str | list[list] | dict[str, Any] | None = None,
expand: str | list[str] | None = None,
):
"""Return a list of records by filters, fields, ordering and limit.
@ -76,7 +76,12 @@ def get_list(
@frappe.whitelist()
def get_count(doctype, filters=None, debug=False, cache=False):
def get_count(
doctype: str,
filters: str | list | dict[str, Any] | None = None,
debug: int | bool = False,
cache: int | bool = False,
):
from frappe.desk.reportview import get_count
frappe.form_dict.doctype = doctype
@ -87,7 +92,12 @@ def get_count(doctype, filters=None, debug=False, cache=False):
@frappe.whitelist()
def get(doctype, name=None, filters=None, parent=None):
def get(
doctype: str,
name: str | int | None = None,
filters: str | list | dict[str, Any] | None = None,
parent: str | None = None,
):
"""Return a document by name or filters.
:param doctype: DocType of the document to be returned
@ -108,7 +118,14 @@ def get(doctype, name=None, filters=None, parent=None):
@frappe.whitelist()
def get_value(doctype, fieldname, filters=None, as_dict=True, debug=False, parent=None):
def get_value(
doctype: str,
fieldname: str | list[str] | dict[str, Any],
filters: str | list | dict[str, Any] | None = None,
as_dict: int | bool = True,
debug: int | bool = False,
parent: str | None = None,
):
"""Return a value from a document.
:param doctype: DocType to be queried
@ -156,7 +173,7 @@ def get_value(doctype, fieldname, filters=None, as_dict=True, debug=False, paren
@frappe.whitelist()
def get_single_value(doctype, field):
def get_single_value(doctype: str, field: str):
if not frappe.has_permission(doctype):
frappe.throw(_("No permission for {0}").format(_(doctype)), frappe.PermissionError)
@ -164,7 +181,7 @@ def get_single_value(doctype, field):
@frappe.whitelist(methods=["POST", "PUT"])
def set_value(doctype, name, fieldname, value=None):
def set_value(doctype: str, name: str | int, fieldname: str | dict[str, Any], value: Any | None = None):
"""Set a value using get_doc, group of values
:param doctype: DocType of the document
@ -201,7 +218,7 @@ def set_value(doctype, name, fieldname, value=None):
@frappe.whitelist(methods=["POST", "PUT"])
def insert(doc=None):
def insert(doc: str | dict[str, Any] | None = None):
"""Insert a document
:param doc: JSON or dict object to be inserted"""
@ -212,7 +229,7 @@ def insert(doc=None):
@frappe.whitelist(methods=["POST", "PUT"])
def insert_many(docs=None):
def insert_many(docs: str | list[dict[str, Any]] | None = None):
"""Insert multiple documents
:param docs: JSON or list of dict objects to be inserted in one request"""
@ -226,7 +243,7 @@ def insert_many(docs=None):
@frappe.whitelist(methods=["POST", "PUT"])
def save(doc):
def save(doc: str | dict[str, Any]):
"""Update (save) an existing document
:param doc: JSON or dict object with the properties of the document to be updated"""
@ -240,7 +257,7 @@ def save(doc):
@frappe.whitelist(methods=["POST", "PUT"])
def rename_doc(doctype, old_name, new_name, merge=False):
def rename_doc(doctype: str, old_name: str | int, new_name: str | int, merge: bool = False):
"""Rename document
:param doctype: DocType of the document to be renamed
@ -251,7 +268,7 @@ def rename_doc(doctype, old_name, new_name, merge=False):
@frappe.whitelist(methods=["POST", "PUT"])
def submit(doc):
def submit(doc: str | dict[str, Any]):
"""Submit a document
:param doc: JSON or dict object to be submitted remotely"""
@ -265,7 +282,7 @@ def submit(doc):
@frappe.whitelist(methods=["POST", "PUT"])
def cancel(doctype, name):
def cancel(doctype: str, name: str | int):
"""Cancel a document
:param doctype: DocType of the document to be cancelled
@ -277,7 +294,7 @@ def cancel(doctype, name):
@frappe.whitelist(methods=["DELETE", "POST"])
def delete(doctype, name):
def delete(doctype: str, name: str | int):
"""Delete a remote document
:param doctype: DocType of the document to be deleted
@ -286,7 +303,7 @@ def delete(doctype, name):
@frappe.whitelist(methods=["POST", "PUT"])
def bulk_update(docs):
def bulk_update(docs: str):
"""Bulk update documents
:param docs: JSON list of documents to be updated remotely. Each document must have `docname` property"""
@ -305,7 +322,7 @@ def bulk_update(docs):
@frappe.whitelist()
def has_permission(doctype: str, docname: str, perm_type: str = "read"):
def has_permission(doctype: str, docname: str | int, perm_type: str = "read"):
"""Return a JSON with data whether the document has the requested permission.
:param doctype: DocType of the document to be checked
@ -316,7 +333,7 @@ def has_permission(doctype: str, docname: str, perm_type: str = "read"):
@frappe.whitelist()
def get_doc_permissions(doctype: str, docname: str):
def get_doc_permissions(doctype: str, docname: str | int):
"""Return an evaluated document permissions dict like `{"read":1, "write":1}`.
:param doctype: DocType of the document to be evaluated
@ -327,7 +344,7 @@ def get_doc_permissions(doctype: str, docname: str):
@frappe.whitelist()
def get_password(doctype: str, name: str, fieldname: str):
def get_password(doctype: str, name: str | int, fieldname: str):
"""Return a password type property. Only applicable for System Managers
:param doctype: DocType of the document that holds the password
@ -351,14 +368,14 @@ def get_time_zone():
@frappe.whitelist(methods=["POST", "PUT"])
def attach_file(
filename=None,
filedata=None,
doctype=None,
docname=None,
folder=None,
decode_base64=False,
is_private=None,
docfield=None,
filename: str | None = None,
filedata: str | None = None,
doctype: str | None = None,
docname: str | int | None = None,
folder: str | None = None,
decode_base64: int | bool = False,
is_private: int | bool | None = None,
docfield: str | None = None,
):
"""Attach a file to Document
@ -396,7 +413,7 @@ def attach_file(
@frappe.whitelist()
@http_cache(max_age=10 * 60)
def is_document_amended(doctype: str, docname: str):
def is_document_amended(doctype: str, docname: str | int):
if frappe.permissions.has_permission(doctype):
try:
return frappe.db.exists(doctype, {"amended_from": docname})
@ -409,7 +426,7 @@ def is_document_amended(doctype: str, docname: str):
@frappe.whitelist(methods=["GET", "POST"])
def validate_link_and_fetch(
doctype: str,
docname: str,
docname: str | int,
fields_to_fetch: list[str] | str | None = None,
# search_widget parameters
query: str | None = None,

103
frappe/commands/execute.py Normal file
View file

@ -0,0 +1,103 @@
import json
import frappe
from frappe.exceptions import SiteNotSpecifiedError
from frappe.utils.bench_helper import CliCtxObj
def _execute(context: CliCtxObj, method, args=None, kwargs=None, profile=False, extra_args=None):
for site in context.sites:
ret = ""
try:
frappe.init(site)
frappe.connect()
if args:
try:
fn_args = eval(args)
except NameError:
fn_args = [args]
else:
fn_args = ()
if kwargs:
fn_kwargs = eval(kwargs)
else:
fn_kwargs = {}
if extra_args:
# parse extra_args
# if it starts with --, it is a kwarg
# otherwise it is an arg
# if it is a kwarg, the next argument is the value
# if the next argument starts with --, the value is True
# if there is no next argument, the value is True
# examples:
# bench execute method arg1 arg2 -> args=[arg1, arg2]
# bench execute method --a 1 --b 2 -> kwargs={a: 1, b: 2}
# bench execute method arg1 --a 1 -> args=[arg1], kwargs={a: 1}
# we need to convert values to python objects if possible
def parse_value(value):
try:
return json.loads(value)
except Exception:
return value
extra_args = list(extra_args)
while extra_args:
arg = extra_args.pop(0)
if arg.startswith("--"):
key = arg[2:]
if extra_args and not extra_args[0].startswith("--"):
value = parse_value(extra_args.pop(0))
else:
value = True
fn_kwargs[key] = value
else:
fn_args += (parse_value(arg),)
pr = None
if profile:
import cProfile
pr = cProfile.Profile()
pr.enable()
try:
fn = frappe.get_attr(method)
except Exception:
fn = None
if fn:
ret = fn(*fn_args, **fn_kwargs)
else:
# eval is safe here because input is from console
code = compile(method, "<bench execute>", "eval")
ret = eval(code, globals(), locals()) # nosemgrep
if callable(ret):
suffix = "(*fn_args, **fn_kwargs)"
code = compile(method + suffix, "<bench execute>", "eval")
ret = eval(code, globals(), locals()) # nosemgrep
if profile and pr:
import pstats
from io import StringIO
pr.disable()
s = StringIO()
pstats.Stats(pr, stream=s).sort_stats("cumulative").print_stats(0.5)
print(s.getvalue())
if frappe.db:
frappe.db.commit()
finally:
frappe.destroy()
if ret:
from frappe.utils.response import json_handler
print(json.dumps(ret, default=json_handler).strip('"'))
if not context.sites:
raise SiteNotSpecifiedError

View file

@ -244,6 +244,26 @@ class TestCommands(BaseTestCommands):
self.assertEqual(self.returncode, 0)
self.assertEqual(self.stdout, frappe.bold(text="DocType"))
# test 5: execute a command with extra args
self.execute("bench --site {site} execute frappe.bold DocType")
self.assertEqual(self.returncode, 0)
self.assertEqual(self.stdout, frappe.bold(text="DocType"))
# test 6: execute a command with extra kwargs
self.execute("bench --site {site} execute frappe.bold --text DocType")
self.assertEqual(self.returncode, 0)
self.assertEqual(self.stdout, frappe.bold(text="DocType"))
# test 7: execute a command with extra args and kwargs
self.execute("bench --site {site} execute frappe.utils.add_to_date '2024-01-01' --days 1")
self.assertEqual(self.returncode, 0)
self.assertEqual(self.stdout, "2024-01-02")
# test 8: execute a command with extra args and kwargs with types
self.execute("bench --site {site} execute frappe.utils.add_to_date --date '2024-01-01' --days 1")
self.assertEqual(self.returncode, 0)
self.assertEqual(self.stdout, "2024-01-02")
@skipIf(
frappe.conf.db_type == "sqlite",
"Not for SQLite for now",

View file

@ -247,70 +247,18 @@ def reset_perms(context: CliCtxObj):
raise SiteNotSpecifiedError
@click.command("execute")
@click.command("execute", context_settings=EXTRA_ARGS_CTX)
@click.argument("method")
@click.option("--args")
@click.option("--kwargs")
@click.option("--profile", is_flag=True, default=False)
@click.argument("extra_args", nargs=-1)
@pass_context
def execute(context: CliCtxObj, method, args=None, kwargs=None, profile=False):
def execute(context: CliCtxObj, method, args=None, kwargs=None, profile=False, extra_args=None):
"Execute a function"
for site in context.sites:
ret = ""
try:
frappe.init(site)
frappe.connect()
from frappe.commands.execute import _execute
if args:
try:
fn_args = eval(args)
except NameError:
fn_args = [args]
else:
fn_args = ()
if kwargs:
fn_kwargs = eval(kwargs)
else:
fn_kwargs = {}
if profile:
import cProfile
pr = cProfile.Profile()
pr.enable()
try:
ret = frappe.get_attr(method)(*fn_args, **fn_kwargs)
except Exception:
# eval is safe here because input is from console
code = compile(method, "<bench execute>", "eval")
ret = eval(code, globals(), locals()) # nosemgrep
if callable(ret):
suffix = "(*fn_args, **fn_kwargs)"
code = compile(method + suffix, "<bench execute>", "eval")
ret = eval(code, globals(), locals()) # nosemgrep
if profile:
import pstats
from io import StringIO
pr.disable()
s = StringIO()
pstats.Stats(pr, stream=s).sort_stats("cumulative").print_stats(0.5)
print(s.getvalue())
if frappe.db:
frappe.db.commit()
finally:
frappe.destroy()
if ret:
from frappe.utils.response import json_handler
print(json.dumps(ret, default=json_handler).strip('"'))
if not context.sites:
raise SiteNotSpecifiedError
_execute(context, method, args, kwargs, profile, extra_args)
@click.command("add-to-email-queue")

View file

@ -1,6 +1,8 @@
# Copyright (c) 2021, Frappe Technologies Pvt. Ltd. and Contributors
# License: MIT. See LICENSE
from typing import Any
import frappe
from frappe import _
@ -110,7 +112,7 @@ def delete_contact_and_address(doctype: str, docname: str) -> None:
@frappe.whitelist()
@frappe.validate_and_sanitize_search_inputs
def filter_dynamic_link_doctypes(
doctype, txt: str, searchfield, start, page_len, filters: dict
doctype: str, txt: str, searchfield: str, start: int, page_len: int, filters: dict[str, Any]
) -> list[list[str]]:
from frappe.permissions import get_doctypes_with_read

View file

@ -1,6 +1,8 @@
# Copyright (c) 2015, Frappe Technologies and contributors
# License: MIT. See LICENSE
from typing import Any
from jinja2 import TemplateSyntaxError
import frappe
@ -262,7 +264,9 @@ def get_company_address(company):
@frappe.whitelist()
@frappe.validate_and_sanitize_search_inputs
def address_query(doctype, txt, searchfield, start, page_len, filters):
def address_query(
doctype: str, txt: str, searchfield: str, start: int, page_len: int, filters: dict[str, Any]
):
from frappe.desk.search import search_widget
_filters = []

View file

@ -1,5 +1,7 @@
# Copyright (c) 2021, Frappe Technologies Pvt. Ltd. and Contributors
# License: MIT. See LICENSE
from typing import Any
import frappe
from frappe import _
from frappe.contacts.address_and_contact import set_link_title
@ -312,7 +314,7 @@ def invite_user(contact: str):
@frappe.whitelist()
def get_contact_details(contact):
def get_contact_details(contact: str):
contact = frappe.get_doc("Contact", contact)
contact.check_permission()
@ -341,7 +343,9 @@ def update_contact(doc, method):
@frappe.whitelist()
@frappe.validate_and_sanitize_search_inputs
def contact_query(doctype, txt, searchfield, start, page_len, filters):
def contact_query(
doctype: str, txt: str, searchfield: str, start: int, page_len: int, filters: dict[str, Any]
):
from frappe.desk.reportview import get_match_cond
doctype = "Contact"
@ -379,7 +383,7 @@ def contact_query(doctype, txt, searchfield, start, page_len, filters):
@frappe.whitelist()
def address_query(links):
def address_query(links: str):
import json
links = [

View file

@ -1,5 +1,7 @@
# Copyright (c) 2021, Frappe Technologies and contributors
# License: MIT. See LICENSE
from typing import Any
from tenacity import retry, retry_if_exception_type, stop_after_attempt
import frappe
@ -45,14 +47,14 @@ class AccessLog(Document):
reraise=True,
)
def make_access_log(
doctype=None,
document=None,
method=None,
file_type=None,
report_name=None,
filters=None,
page=None,
columns=None,
doctype: str | None = None,
document: str | int | None = None,
method: str | None = None,
file_type: str | None = None,
report_name: str | None = None,
filters: str | list | dict[str, Any] | None = None,
page: str | None = None,
columns: str | None = None,
):
access_log = frappe.get_doc(
{

View file

@ -5,6 +5,7 @@ import time
import frappe
from frappe.auth import CookieManager, LoginManager
from frappe.tests import IntegrationTestCase
from frappe.utils import set_request
class TestActivityLog(IntegrationTestCase):
@ -15,12 +16,12 @@ class TestActivityLog(IntegrationTestCase):
# test user login log
frappe.local.form_dict = frappe._dict(
{
"cmd": "login",
"sid": "Guest",
"pwd": self.ADMIN_PASSWORD or "admin",
"usr": "Administrator",
}
)
set_request(method="POST", path="/api/method/login")
frappe.local.request_ip = "127.0.0.1"
frappe.local.cookie_manager = CookieManager()
@ -60,8 +61,9 @@ class TestActivityLog(IntegrationTestCase):
update_system_settings({"allow_consecutive_login_attempts": 3, "allow_login_after_fail": 5})
frappe.local.form_dict = frappe._dict(
{"cmd": "login", "sid": "Guest", "pwd": self.ADMIN_PASSWORD, "usr": "Administrator"}
{"sid": "Guest", "pwd": self.ADMIN_PASSWORD, "usr": "Administrator"}
)
set_request(method="POST", path="/api/method/login")
frappe.local.request_ip = "127.0.0.1"
frappe.local.cookie_manager = CookieManager()

View file

@ -3,7 +3,8 @@
import json
from collections.abc import Iterable
from typing import TYPE_CHECKING
from datetime import datetime
from typing import TYPE_CHECKING, Any
import frappe
import frappe.email.smtp
@ -27,31 +28,32 @@ if TYPE_CHECKING:
@frappe.whitelist()
def make(
doctype=None,
name=None,
content=None,
subject=None,
sent_or_received="Sent",
sender=None,
sender_full_name=None,
recipients=None,
communication_medium="Email",
send_email=False,
print_html=None,
print_format=None,
attachments=None,
send_me_a_copy=False,
cc=None,
bcc=None,
read_receipt=None,
print_letterhead=True,
email_template=None,
communication_type=None,
send_after=None,
print_language=None,
now=False,
raw_html=False,
add_css=True,
doctype: str | None = None,
name: str | int | None = None,
content: str | None = None,
subject: str | None = None,
sent_or_received: str = "Sent",
sender: str | None = None,
sender_full_name: str | None = None,
recipients: str | list[str] | None = None,
communication_medium: str = "Email",
send_email: str | bool | int = False,
print_html: str | None = None,
print_format: str | None = None,
attachments: str | list[str | dict[str, Any]] | None = None,
send_me_a_copy: str | int | bool = False,
cc: str | list[str] | None = None,
bcc: str | list[str] | None = None,
read_receipt: str | int | bool | None = None,
print_letterhead: int | bool = True,
email_template: str | None = None,
communication_type: str | None = None,
send_after: str | datetime | None = None,
print_language: str | None = None,
now: int | bool = False,
raw_html: int | bool = False,
add_css: int | bool = True,
in_reply_to: str | None = None,
**kwargs,
) -> dict[str, str]:
"""Make a new communication. Checks for email permissions for specified Document.
@ -73,6 +75,7 @@ def make(
:param send_after: Send after the given datetime.
:param raw_html: Whether to use html version of email template
:param add_css: Add default CSS from hooks/email_css to the email template (default **True**)
:param in_reply_to: Name of the Communication document to which this communication is a reply.
"""
from frappe.utils.commands import warn
@ -86,10 +89,8 @@ def make(
if doctype and name:
frappe.has_permission(doctype, doc=name, ptype="email", throw=True)
if (
raw_html
and email_template
and not frappe.get_cached_value("Email Template", email_template, "use_html")
if raw_html and not (
email_template and frappe.get_cached_value("Email Template", email_template, "use_html")
):
warn(
_(
@ -127,6 +128,7 @@ def make(
now=now,
raw_html=raw_html,
add_css=add_css,
in_reply_to=in_reply_to,
)
@ -157,6 +159,7 @@ def _make(
now=False,
raw_html=False,
add_css=True,
in_reply_to=None,
) -> dict[str, str]:
"""Internal method to make a new communication that ignores Permission checks."""
@ -185,10 +188,11 @@ def _make(
"has_attachment": 1 if attachments else 0,
"communication_type": communication_type,
"send_after": send_after,
"in_reply_to": in_reply_to,
}
)
comm.flags.skip_add_signature = not add_signature or (
raw_html and frappe.get_cached_value("Email Template", email_template, "use_html")
raw_html and email_template and frappe.get_cached_value("Email Template", email_template, "use_html")
)
comm.insert(ignore_permissions=True)

View file

@ -311,6 +311,7 @@ class CommunicationEmailMixin:
"send_after": self.send_after,
"raw_html": raw_html,
"add_css": add_css,
"in_reply_to": self.in_reply_to,
}
def send_email(

View file

@ -4,6 +4,7 @@
import csv
import os
import re
from typing import Any
import frappe
import frappe.permissions
@ -30,15 +31,15 @@ def get_data_keys():
@frappe.whitelist()
def export_data(
doctype=None,
parent_doctype=None,
all_doctypes=True,
with_data=False,
select_columns=None,
file_type="CSV",
template=False,
filters=None,
export_without_column_meta=False,
doctype: str | list[str | dict[str, Any]] | None = None,
parent_doctype: str | None = None,
all_doctypes: bool | int | str = True,
with_data: bool | int | str = False,
select_columns: str | dict[str, list[str]] | None = None,
file_type: str = "CSV",
template: bool | str = False,
filters: str | dict[str, Any] | list | None = None,
export_without_column_meta: bool | str = False,
):
_doctype = doctype
if isinstance(_doctype, list):

View file

@ -2,6 +2,7 @@
# License: MIT. See LICENSE
import os
from typing import Any
from rq.command import send_stop_job_command
from rq.exceptions import InvalidJobOperation
@ -102,7 +103,7 @@ class DataImport(Document):
self.payload_count = len(payloads)
@frappe.whitelist()
def get_preview_from_template(self, import_file=None, google_sheets_url=None):
def get_preview_from_template(self, import_file: str | None = None, google_sheets_url: str | None = None):
if import_file:
self.import_file = import_file
self.set_delimiters_flag()
@ -203,7 +204,13 @@ def start_import(data_import):
@frappe.whitelist()
def download_template(doctype, export_fields=None, export_records=None, export_filters=None, file_type="CSV"):
def download_template(
doctype: str,
export_fields: str | dict[str, list[str]] | None = None,
export_records: str | None = None,
export_filters: str | dict[str, Any] | list[list[Any]] | None = None,
file_type: str = "CSV",
):
"""
Download template from Exporter
:param doctype: Document Type

View file

@ -93,15 +93,19 @@ class Importer:
return
# setup import log
import_log = (
frappe.get_all(
"Data Import Log",
fields=["row_indexes", "success", "log_index"],
filters={"data_import": self.data_import.name},
order_by="log_index",
# Only use import log for retry/resume when Data Import is persisted in DB.
# For bench data-import (CLI), the doc is never inserted, so we must not reuse logs
import_log = []
if self.data_import.name and frappe.db.exists("Data Import", self.data_import.name):
import_log = (
frappe.get_all(
"Data Import Log",
fields=["row_indexes", "success", "log_index"],
filters={"data_import": self.data_import.name},
order_by="log_index",
)
or []
)
or []
)
log_index = 0

View file

@ -38,7 +38,7 @@ class DeletedDocument(Document):
@frappe.whitelist()
def restore(name, alert=True):
def restore(name: str | int, alert: bool = True):
deleted = frappe.get_doc("Deleted Document", name)
if deleted.restored:
@ -69,7 +69,7 @@ def restore(name, alert=True):
@frappe.whitelist()
def bulk_restore(docnames):
def bulk_restore(docnames: str | list[str]):
docnames = frappe.parse_json(docnames)
message = _("Restoring Deleted Document")
restored, invalid, failed = [], [], []

View file

@ -2,4 +2,9 @@
// For license information, please see license.txt
frappe.views.calendar["{doctype}"] = {{
// field_map: {{
// start: "start_date",
// end: "end_date",
// }},
// gantt: true
}};

View file

@ -680,7 +680,7 @@ class DocType(Document):
where doctype=%s and field='name' and value = %s""",
(new, new, old),
)
else:
elif not self.is_virtual:
frappe.db.rename_table(old, new)
frappe.db.commit()

View file

@ -174,7 +174,7 @@ class DocumentNamingSettings(Document):
NamingSeries(series).validate()
@frappe.whitelist()
def get_options(self, doctype=None):
def get_options(self, doctype: str | None = None):
doctype = doctype or self.transaction_type
if not doctype:
return

View file

@ -3,7 +3,9 @@ frappe.ui.form.on("File", {
if (frm.doc.file_url) {
frm.add_custom_button(__("View File"), () => {
if (!frappe.utils.is_url(frm.doc.file_url)) {
window.open(window.location.origin + frm.doc.file_url);
window.open(
window.location.origin + frm.doc.file_url + "?fid=" + frm.doc.name
);
} else {
window.open(frm.doc.file_url);
}
@ -90,7 +92,7 @@ frappe.ui.form.on("File", {
},
download: function (frm) {
let file_url = frm.doc.file_url;
let file_url = frm.doc.file_url + "?fid=" + frm.doc.name;
if (frm.doc.file_name) {
file_url = file_url.replace(/#/g, "%23");
}

View file

@ -767,7 +767,7 @@ class File(Document):
max_file_size = get_max_file_size()
file_size = len(self._content or b"")
if file_size > max_file_size:
if not self.flags.skip_file_size_check and file_size > max_file_size:
msg = _("File size exceeded the maximum allowed size of {0} MB").format(max_file_size / 1048576)
if frappe.has_permission("System Settings", "write"):
msg += ".<br>" + _("You can increase the limit from System Settings.")

View file

@ -130,7 +130,7 @@ def has_unseen_error_log():
@frappe.whitelist()
@frappe.validate_and_sanitize_search_inputs
def get_log_doctypes(doctype, txt, searchfield, start, page_len, filters):
def get_log_doctypes(doctype: str, txt: str, searchfield: str, start: int, page_len: int, filters: list):
filters = filters or []
filters.extend(

View file

@ -6,6 +6,7 @@ import os
from pathlib import Path
import frappe
from frappe import _
from frappe.model.document import Document
from frappe.modules.export_file import delete_folder
@ -89,6 +90,10 @@ class ModuleDef(Document):
frappe.clear_cache()
frappe.setup_module_map()
def before_rename(self, old, new, merge=False):
if not self.custom:
frappe.throw(_("Only Custom Modules can be renamed."))
@frappe.whitelist()
def get_installed_apps():

View file

@ -165,7 +165,7 @@ def update_job_id(prepared_report):
@frappe.whitelist()
def make_prepared_report(report_name, filters=None):
def make_prepared_report(report_name: str, filters: dict[str, Any] | str | list | None = None):
"""run reports in background"""
prepared_report = frappe.get_doc(
{
@ -212,7 +212,7 @@ def process_filters_for_prepared_report(filters: dict[str, Any] | str) -> str:
@frappe.whitelist()
def get_reports_in_queued_state(report_name, filters):
def get_reports_in_queued_state(report_name: str, filters: dict[str, Any] | str | list):
return frappe.get_all(
"Prepared Report",
filters={
@ -252,7 +252,7 @@ def expire_stalled_report():
@frappe.whitelist()
def delete_prepared_reports(reports):
def delete_prepared_reports(reports: str | list[dict[str, Any]]):
reports = frappe.parse_json(reports)
for report in reports:
prepared_report = frappe.get_doc("Prepared Report", report["name"])
@ -284,7 +284,7 @@ def create_json_gz_file(data, dt, dn, report_name):
@frappe.whitelist()
def download_attachment(dn):
def download_attachment(dn: str):
pr = frappe.get_doc("Prepared Report", dn)
if not pr.has_permission("read"):
frappe.throw(frappe._("Cannot Download Report due to insufficient permissions"))
@ -330,7 +330,7 @@ def has_permission(doc, user):
@frappe.whitelist()
def enqueue_json_to_csv_conversion(prepared_report_name):
def enqueue_json_to_csv_conversion(prepared_report_name: str):
"""Call this to enqueue the conversion in background."""
enqueue(method=convert_json_to_csv, queue="long", prepared_report_name=prepared_report_name)

View file

@ -116,7 +116,7 @@ def serialize_request(request):
@frappe.whitelist()
def add_indexes(indexes):
def add_indexes(indexes: str):
frappe.only_for("Administrator")
indexes = json.loads(indexes)

View file

@ -120,7 +120,9 @@ def get_users(role):
# searches for active employees
@frappe.whitelist()
@frappe.validate_and_sanitize_search_inputs
def role_query(doctype, txt, searchfield, start, page_len, filters):
def role_query(
doctype: str, txt: str, searchfield: str, start: int, page_len: int, filters: list | dict | str
):
return frappe.get_all(
"Role",
limit_start=start,

View file

@ -241,7 +241,7 @@ def get_all_queued_jobs():
@frappe.whitelist()
def stop_job(job_id):
def stop_job(job_id: str):
frappe.get_doc("RQ Job", job_id).stop_job()

View file

@ -211,16 +211,21 @@ class ServerScript(Document):
safe_exec(self.script, script_filename=self.name)
def get_permission_query_conditions(self, user: str) -> list[str]:
def get_permission_query_conditions(self, user: str, active_child_tables=None) -> list[str]:
"""Specific to Permission Query Server Scripts.
Args:
user (str): Take user email to execute script and return list of conditions.
active_child_tables (list, optional): A list of child table names involved in the current SQL query.
Return:
list: Return list of conditions defined by rules in self.script.
"""
locals = {"user": user, "conditions": ""}
locals = {
"user": user,
"conditions": "",
"active_child_tables": active_child_tables or [],
}
safe_exec(self.script, None, locals, script_filename=self.name)
if locals["conditions"]:
return locals["conditions"]

View file

@ -2,6 +2,7 @@
# License: MIT. See LICENSE
import json
from typing import Any
import frappe
from frappe import _
@ -43,7 +44,7 @@ def get_session_default_values():
@frappe.whitelist()
def set_session_default_values(default_values):
def set_session_default_values(default_values: str | dict[str, Any]):
default_values = frappe.parse_json(default_values)
for entry in default_values:
try:

View file

@ -46,7 +46,7 @@ def validate_receiver_nos(receiver_list):
@frappe.whitelist()
def get_contact_number(contact_name, ref_doctype, ref_name):
def get_contact_number(contact_name: str, ref_doctype: str, ref_name: str):
"Return mobile number of the given contact."
number = frappe.db.sql(
"""select mobile_no, phone from tabContact
@ -62,7 +62,7 @@ def get_contact_number(contact_name, ref_doctype, ref_name):
@frappe.whitelist()
def send_sms(receiver_list, msg, sender_name="", success_msg=True):
def send_sms(receiver_list: str | list[str], msg: str, sender_name: str = "", success_msg: bool = True):
send_sms_hook_methods = frappe.get_hooks("send_sms")
if send_sms_hook_methods:
return frappe.get_attr(send_sms_hook_methods[-1])(receiver_list, msg, sender_name, success_msg)

View file

@ -192,7 +192,7 @@ def queue_submission(doc: Document, action: str, alert: bool = True):
@frappe.whitelist()
def get_latest_submissions(doctype, docname):
def get_latest_submissions(doctype: str, docname: str | int):
# NOTE: not used creation as orderby intentianlly as we have used update_modified=False everywhere
# hence assuming modified will be equal to creation for submission queue documents

View file

@ -440,9 +440,6 @@ frappe.ui.form.on("User Role Profile", {
frm.roles_editor.show();
}
});
if (frm.roles_editor) {
$(".deselect-all, .select-all").prop("disabled", true);
}
}
},
role_profiles_remove: function (frm) {
@ -450,7 +447,6 @@ frappe.ui.form.on("User Role Profile", {
if (frm.roles_editor) {
frm.roles_editor.disable = 0;
frm.roles_editor.show();
$(".deselect-all, .select-all").prop("disabled", false);
}
}
},

View file

@ -49,7 +49,6 @@
"mute_sounds",
"desk_theme",
"code_editor_type",
"banner_image",
"navigation_settings_section",
"search_bar",
"notifications",
@ -298,11 +297,6 @@
"label": "Location",
"no_copy": 1
},
{
"fieldname": "banner_image",
"fieldtype": "Attach Image",
"label": "Banner Image"
},
{
"fieldname": "column_break_22",
"fieldtype": "Column Break"

View file

@ -4,6 +4,7 @@
from collections.abc import Iterable
from datetime import timedelta
from functools import cached_property
from typing import Any
import frappe
import frappe.defaults
@ -74,7 +75,6 @@ class User(Document):
allowed_in_mentions: DF.Check
api_key: DF.Data | None
api_secret: DF.Password | None
banner_image: DF.AttachImage | None
bio: DF.SmallText | None
birth_date: DF.Date | None
block_modules: DF.Table[BlockModule]
@ -899,13 +899,7 @@ def get_all_roles():
@frappe.whitelist()
def get_roles(arg=None):
"""get roles for a user"""
return frappe.get_roles(frappe.form_dict.get("uid", frappe.session.user))
@frappe.whitelist()
def get_perm_info(role):
def get_perm_info(role: str):
"""get permission info"""
from frappe.permissions import get_all_perms
@ -966,7 +960,9 @@ def update_password(
@frappe.whitelist(allow_guest=True)
def test_password_strength(new_password: str, key=None, old_password=None, user_data: tuple | None = None):
def test_password_strength(
new_password: str, key: str | None = None, old_password: str | None = None, user_data: tuple | None = None
):
from frappe.utils.password_strength import test_password_strength as _test_password_strength
if key is not None or old_password is not None:
@ -1008,7 +1004,7 @@ def has_email_account(email: str):
@frappe.whitelist(allow_guest=False)
def get_email_awaiting(user):
def get_email_awaiting(user: str):
return frappe.get_all(
"User Email",
fields=["email_account", "email_id"],
@ -1069,7 +1065,7 @@ def reset_user_data(user):
@frappe.whitelist(methods=["POST"])
def verify_password(password):
def verify_password(password: str):
frappe.local.login_manager.check_password(frappe.session.user, password)
@ -1151,7 +1147,7 @@ def reset_password(user: str) -> str:
@frappe.whitelist()
@frappe.validate_and_sanitize_search_inputs
def user_query(doctype, txt, searchfield, start, page_len, filters):
def user_query(doctype: str, txt: str, searchfield: str, start: int, page_len: int, filters: dict[str, Any]):
doctype = "User"
list_filters = {
@ -1426,7 +1422,7 @@ def generate_keys(user: str):
@frappe.whitelist()
def switch_theme(theme):
def switch_theme(theme: str):
if theme in ["Dark", "Light", "Automatic"]:
frappe.db.set_value("User", frappe.session.user, "desk_theme", theme)
@ -1441,7 +1437,7 @@ def get_enabled_users():
@frappe.whitelist(methods=["POST"])
def impersonate(user: str, reason: str):
frappe.has_permission("User", "impersonate")
frappe.has_permission("User", "impersonate", throw=True)
impersonator = frappe.session.user
frappe.get_doc(
@ -1463,16 +1459,18 @@ def impersonate(user: str, reason: str):
notification.set("type", "Alert")
notification.insert(ignore_permissions=True)
# notify user via email too
user_email = frappe.db.get_value("User", user, "email")
email_message = _(
"User {0} has started an impersonation session as you. <br><br><b>Reason provided:</b> {1}"
).format(escape_html(impersonator), escape_html(reason))
outgoing_email_exists = frappe.db.exists("Email Account", {"default_outgoing": 1, "awaiting_password": 0})
if outgoing_email_exists:
user_email = frappe.db.get_value("User", user, "email")
email_message = _(
"User {0} has started an impersonation session as you. <br><br><b>Reason provided:</b> {1}"
).format(escape_html(impersonator), escape_html(reason))
frappe.sendmail(
recipients=[user_email],
subject=_("Security Alert: Your account is being impersonated"),
content=email_message,
)
frappe.sendmail(
recipients=[user_email],
subject=_("Security Alert: Your account is being impersonated"),
content=email_message,
)
frappe.local.login_manager.impersonate(user)

View file

@ -2,6 +2,7 @@
# License: MIT. See LICENSE
import json
from typing import Any
import frappe
from frappe import _
@ -85,7 +86,7 @@ def send_user_permissions(bootinfo):
@frappe.whitelist()
def get_user_permissions(user=None):
def get_user_permissions(user: str | None = None):
"""Get all users permissions for the user as a dict of doctype"""
# if this is called from client-side,
# user can access only his/her user permissions
@ -160,7 +161,9 @@ def user_permission_exists(user, allow, for_value, applicable_for=None):
@frappe.whitelist()
@frappe.validate_and_sanitize_search_inputs
def get_applicable_for_doctype_list(doctype, txt, searchfield, start, page_len, filters):
def get_applicable_for_doctype_list(
doctype: str, txt: str, searchfield: str, start: int, page_len: int, filters: dict[str, Any]
):
actual_doctype = filters.get("doctype")
linked_doctypes_map = get_linked_doctypes(actual_doctype, True)
@ -192,7 +195,7 @@ def get_permitted_documents(doctype):
@frappe.whitelist()
def check_applicable_doc_perm(user, doctype, docname):
def check_applicable_doc_perm(user: str, doctype: str, docname: str | int):
frappe.only_for("System Manager")
applicable = []
doc_exists = frappe.get_all(
@ -224,7 +227,7 @@ def check_applicable_doc_perm(user, doctype, docname):
@frappe.whitelist()
def clear_user_permissions(user, for_doctype):
def clear_user_permissions(user: str, for_doctype: str):
frappe.only_for("System Manager")
total = frappe.db.count("User Permission", {"user": user, "allow": for_doctype})
@ -242,7 +245,7 @@ def clear_user_permissions(user, for_doctype):
@frappe.whitelist()
def add_user_permissions(data):
def add_user_permissions(data: str | dict[str, Any]):
"""Add and update the user permissions"""
frappe.only_for("System Manager")
if isinstance(data, str):

View file

@ -84,13 +84,14 @@ class UserType(Document):
title=_("Permission Error"),
)
if not limit:
frappe.throw(
if limit is None:
frappe.msgprint(
_("The limit has not set for the user type {0} in the site config file.").format(
frappe.bold(self.name)
),
title=_("Set Limit"),
)
return
if self.user_doctypes and len(self.user_doctypes) > limit:
frappe.throw(
@ -218,7 +219,9 @@ def get_non_standard_user_types():
@frappe.whitelist()
@frappe.validate_and_sanitize_search_inputs
def get_user_linked_doctypes(doctype, txt, searchfield, start, page_len, filters):
def get_user_linked_doctypes(
doctype: str, txt: str, searchfield: str, start: int, page_len: int, filters: dict | list | str
):
modules = [d.get("module_name") for d in get_modules_from_app("frappe")]
filters = [
@ -254,7 +257,7 @@ def get_user_linked_doctypes(doctype, txt, searchfield, start, page_len, filters
@frappe.whitelist()
def get_user_id(parent):
def get_user_id(parent: str):
data = (
frappe.get_all(
"DocField",

View file

@ -113,13 +113,20 @@ def get_permissions(doctype: str | None = None, role: str | None = None):
@frappe.whitelist()
def add(parent, role, permlevel):
def add(parent: str, role: str, permlevel: int):
frappe.only_for("System Manager")
add_permission(parent, role, permlevel)
@frappe.whitelist()
def update(doctype: str, role: str, permlevel: int, ptype: str, value=None, if_owner=0) -> str | None:
def update(
doctype: str,
role: str,
permlevel: int,
ptype: str,
value: str | int | None = None,
if_owner: str | int = 0,
) -> str | None:
"""Update role permission params.
Args:
@ -152,7 +159,7 @@ def update(doctype: str, role: str, permlevel: int, ptype: str, value=None, if_o
@frappe.whitelist()
def remove(doctype, role, permlevel, if_owner=0):
def remove(doctype: str, role: str, permlevel: int, if_owner: str | int = 0):
frappe.only_for("System Manager")
setup_custom_perms(doctype)
@ -169,20 +176,20 @@ def remove(doctype, role, permlevel, if_owner=0):
@frappe.whitelist()
def reset(doctype):
def reset(doctype: str):
frappe.only_for("System Manager")
reset_perms(doctype)
clear_permissions_cache(doctype)
@frappe.whitelist()
def get_users_with_role(role):
def get_users_with_role(role: str):
frappe.only_for("System Manager")
return _get_user_with_role(role)
@frappe.whitelist()
def get_standard_permissions(doctype):
def get_standard_permissions(doctype: str):
frappe.only_for("System Manager")
meta = frappe.get_meta(doctype)
if meta.custom:

View file

@ -1,6 +1,8 @@
# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors
# License: MIT. See LICENSE
from typing import Any
import frappe
import frappe.utils.user
from frappe.model import data_fieldtypes
@ -44,7 +46,9 @@ def get_columns_and_fields(doctype):
@frappe.whitelist()
@frappe.validate_and_sanitize_search_inputs
def query_doctypes(doctype, txt, searchfield, start, page_len, filters):
def query_doctypes(
doctype: str, txt: str, searchfield: str, start: int, page_len: int, filters: dict[str, Any]
):
user = filters.get("user")
user_perms = frappe.utils.user.UserPermissions(user)
user_perms.build_permissions()

View file

@ -269,7 +269,7 @@ class CustomField(Document):
@frappe.whitelist()
def get_fields_label(doctype=None):
def get_fields_label(doctype: str | None = None):
meta = frappe.get_meta(doctype)
if doctype in core_doctypes_list:

View file

@ -710,7 +710,7 @@ def is_standard_or_system_generated_field(df):
@frappe.whitelist()
def get_link_filters_from_doc_without_customisations(doctype, fieldname):
def get_link_filters_from_doc_without_customisations(doctype: str, fieldname: str):
"""Get the filters of a link field from a doc without customisations
In backend the customisations are not applied.
Customisations are applied in the client side.

View file

@ -275,9 +275,11 @@ class Database:
frappe.log(f"Syntax error in query:\n{query} {values or ''}")
elif self.is_deadlocked(e):
self.db_type == "mariadb" and frappe.log_error("Query deadlocked", defer_insert=True)
raise frappe.QueryDeadlockError(e) from e
elif self.is_timedout(e):
self.db_type == "mariadb" and frappe.log_error("Query timed out", defer_insert=True)
raise frappe.QueryTimeoutError(e) from e
elif self.is_read_only_mode_error(e):
@ -632,6 +634,9 @@ class Database:
from frappe.model.utils import is_single_doctype
out = None
if isinstance(fieldname, list):
fieldname = tuple(fieldname)
if cache and isinstance(filters, str) and fieldname in self.value_cache[doctype][filters]:
return self.value_cache[doctype][filters][fieldname]

View file

@ -2,7 +2,7 @@ from pymysql.constants.ER import DUP_ENTRY
import frappe
from frappe import _
from frappe.database.schema import DBTable
from frappe.database.schema import DbColumn, DBTable
from frappe.utils.defaults import get_not_null_defaults
@ -96,6 +96,37 @@ class MariaDBTable(DBTable):
):
add_index_query.append("ADD INDEX `modified`(`modified`)")
# logic to drop unique constraint for fields deleted from a doctype
meta_columns = set(self.columns.keys())
db_columns = set(self.current_columns.keys())
for col in db_columns:
if (
col not in meta_columns
and col not in frappe.db.DEFAULT_COLUMNS
and col not in frappe.db.OPTIONAL_COLUMNS
):
has_unique = frappe.db.get_column_index(self.table_name, col, unique=True)
if not has_unique:
continue
current_col = self.current_columns.get(col)
deleted_col = DbColumn(
table=self,
fieldname=current_col.name,
fieldtype=current_col.type,
length=None,
default=None,
set_index=current_col.index,
options=None,
unique=False,
precision=None,
not_nullable=current_col.not_nullable,
)
self.drop_unique.append(deleted_col)
drop_index_query = []
for col in {*self.drop_index, *self.drop_unique}:

View file

@ -1,6 +1,6 @@
import frappe
from frappe import _
from frappe.database.schema import DBTable, get_definition
from frappe.database.schema import DbColumn, DBTable, get_definition
from frappe.utils import cint, flt
from frappe.utils.defaults import get_not_null_defaults
@ -131,6 +131,50 @@ class PostgresTable(DBTable):
index_name=col.fieldname, table_name=self.table_name, field=col.fieldname
)
# logic to drop unique constraint for fields deleted from a doctype
meta_columns = set(self.columns.keys())
db_columns = set(self.current_columns.keys())
for col in db_columns:
if (
col not in meta_columns
and col not in frappe.db.DEFAULT_COLUMNS
and col not in frappe.db.OPTIONAL_COLUMNS
):
has_unique_index = frappe.db.sql(
"""
SELECT 1
FROM pg_indexes
WHERE tablename = %s
AND indexname IN (%s, %s)
LIMIT 1
""",
(
self.table_name,
f"{self.table_name}_{col}_key",
f"unique_{col}",
),
)
if not has_unique_index:
continue
current_col = self.current_columns.get(col)
deleted_col = DbColumn(
table=self,
fieldname=current_col.name,
fieldtype=current_col.type,
length=None,
default=None,
set_index=current_col.index,
options=None,
unique=False,
precision=None,
not_nullable=current_col.not_nullable,
)
self.drop_unique.append(deleted_col)
drop_contraint_query = ""
for col in self.drop_index:
# primary key
@ -141,8 +185,35 @@ class PostgresTable(DBTable):
for col in self.drop_unique:
# primary key
if col.fieldname != "name":
# if index key exists
drop_contraint_query += f'DROP INDEX IF EXISTS "unique_{col.fieldname}" ;'
# drop unique constraint first if exists which automatically drops the underlying index also
unique_constraint_exists = frappe.db.sql(
"""
SELECT 1
FROM pg_constraint
WHERE conname = %s
""",
(f"{self.table_name}_{col.fieldname}_key",),
)
if unique_constraint_exists:
drop_contraint_query += f'ALTER TABLE "{self.table_name}" DROP CONSTRAINT IF EXISTS "{self.table_name}_{col.fieldname}_key" ;'
# drop the unique index backed by no constraint directly
unique_index_exists = frappe.db.sql(
"""
SELECT 1
FROM pg_indexes
WHERE tablename = %s
AND indexname = %s
""",
(
self.table_name,
f"unique_{col.fieldname}",
),
)
if unique_index_exists:
drop_contraint_query += f'DROP INDEX IF EXISTS "unique_{col.fieldname}" ;'
change_nullability = []
for col in self.change_nullability:

View file

@ -134,10 +134,6 @@ TAB_PATTERN = re.compile("^tab")
WORDS_PATTERN = re.compile(r"\w+")
COMMA_PATTERN = re.compile(r",\s*(?![^()]*\))")
# less restrictive version of frappe.core.doctype.doctype.doctype.START_WITH_LETTERS_PATTERN
# to allow table names like __Auth
TABLE_NAME_PATTERN = re.compile(r"^[\w -]*$", flags=re.ASCII)
# Pattern for validating simple field names (alphanumeric + underscore)
SIMPLE_FIELD_PATTERN = re.compile(r"^\w+$", flags=re.ASCII)
@ -266,13 +262,14 @@ class Engine:
self.field_aliases = set()
self.db_query_compat = db_query_compat
self.permitted_fields_cache = {} # Cache for get_permitted_fields results
self.is_aggregate_query = False
self._grouped_queries = set()
if isinstance(table, Table):
self.table = table
self.doctype = get_doctype_name(table.get_sql())
else:
self.doctype = table
self.validate_doctype()
self.table = qb.DocType(table)
if self.apply_permissions:
@ -313,19 +310,24 @@ class Engine:
if for_update:
self.query = self.query.for_update(skip_locked=skip_locked, nowait=not wait)
if any(isinstance(f, functions.AggregateFunction) for f in getattr(self, "fields", [])):
# check if any field in select is aggregated (done to prevent breaking queries in postgres due to order by rule)
self.is_aggregate_query = True
if group_by:
self.is_aggregate_query = True # for postgres (group by used with order by)
self.apply_group_by(group_by)
if order_by:
if not (
self.is_postgres and is_select and (distinct or group_by)
self.is_postgres and is_select and distinct
): # ignore in Postgres since order by fields need to appear in select distinct
self.apply_order_by(order_by)
else:
warnings.warn(
(
"ORDER BY fields have been ignored because PostgreSQL requires them to "
"appear in the SELECT list when using DISTINCT or GROUP BY."
"appear in the SELECT list when using with DISTINCT"
),
UserWarning,
stacklevel=2,
@ -340,16 +342,12 @@ class Engine:
self.query.immutable = True
return self.query
def validate_doctype(self):
if not TABLE_NAME_PATTERN.match(self.doctype):
frappe.throw(_("Invalid DocType: {0}").format(self.doctype))
def apply_fields(self, fields):
self.fields = self.parse_fields(fields)
# Track field aliases for use in group_by/order_by
for field in self.fields:
if isinstance(field, Field | DynamicTableField) and field.alias:
if isinstance(field, Field | DynamicTableField | AggregateFunction) and field.alias:
self.field_aliases.add(field.alias)
if self.apply_permissions:
@ -597,6 +595,20 @@ class Engine:
v.strip().strip("'") for v in get_between_date_filter(_value, df).split(" AND ")
)
# Handle empty lists for IN/NOT IN operators before conversion
# IN with empty list should return 0 results (always False)
# NOT IN with empty list should return all results (always True)
if _operator.lower() in ("in", "not in"):
if isinstance(_value, (list, tuple, set)) and len(_value) == 0:
if _operator.lower() == "in":
# Return a criterion that always evaluates to False (1=0)
# This ensures IN with empty list returns 0 results
return RawCriterion("1=0")
else: # not in
# Return a criterion that always evaluates to True (1=1)
# NOT IN with empty set matches all rows since nothing is excluded
return RawCriterion("1=1")
if not _value and isinstance(_value, list | tuple | set):
_value = ("",)
@ -807,7 +819,7 @@ class Engine:
if parsed := self._parse_backtick_field_notation(field):
table_name, field_name = parsed
self._check_field_permission(table_name, field_name)
self.check_filter_field_permission(table_name, field_name)
# Return query builder field reference
return frappe.qb.DocType(table_name)[field_name]
@ -830,7 +842,7 @@ class Engine:
parent_doctype_for_perm = (
dynamic_field.parent_doctype if isinstance(dynamic_field, ChildTableField) else None
)
self._check_field_permission(target_doctype, target_fieldname, parent_doctype_for_perm)
self.check_filter_field_permission(target_doctype, target_fieldname, parent_doctype_for_perm)
self.query = dynamic_field.apply_join(self.query, engine=self)
# Return the pypika Field object associated with the dynamic field
@ -874,7 +886,9 @@ class Engine:
# If it's not a child table, check permissions
if not parent_fieldname:
self._check_field_permission(target_doctype, target_fieldname, parent_doctype_for_perm)
self.check_filter_field_permission(
target_doctype, target_fieldname, parent_doctype_for_perm
)
return frappe.qb.DocType(target_doctype)[target_fieldname]
# Create a ChildTableField instance to handle join and field access
@ -888,7 +902,7 @@ class Engine:
# For permission check, the parent is the main doctype
parent_doctype_for_perm = self.doctype
self._check_field_permission(target_doctype, target_fieldname, parent_doctype_for_perm)
self.check_filter_field_permission(target_doctype, target_fieldname, parent_doctype_for_perm)
# Delegate join logic
self.query = child_field_handler.apply_join(self.query, engine=self)
@ -928,18 +942,32 @@ class Engine:
parent_fieldname=df.fieldname,
)
parent_doctype_for_perm = self.doctype
self._check_field_permission(
self.check_filter_field_permission(
df.options, target_fieldname, parent_doctype_for_perm
)
self.query = child_field_handler.apply_join(self.query, engine=self)
return child_field_handler.field
self._check_field_permission(target_doctype, target_fieldname, parent_doctype_for_perm)
self.check_filter_field_permission(target_doctype, target_fieldname, parent_doctype_for_perm)
# Convert string field name to pypika Field object for the specified/current doctype
return frappe.qb.DocType(target_doctype)[target_fieldname]
def _check_field_permission(self, doctype: str, fieldname: str, parent_doctype: str | None = None):
"""Check if the user has permission to access the given field"""
def check_select_field_permission(self, doctype: str, fieldname: str, parent_doctype: str | None = None):
"""Check if the user has permission to select the given field."""
self._check_field_permission(doctype, fieldname, parent_doctype, for_filtering=False)
def check_filter_field_permission(self, doctype: str, fieldname: str, parent_doctype: str | None = None):
"""Check if the user has permission to filter/order/group by the given field.
It allows all permlevel 0 fields for users with select permission,
and all permitted fields for users with read permission.
"""
self._check_field_permission(doctype, fieldname, parent_doctype, for_filtering=True)
def _check_field_permission(
self, doctype: str, fieldname: str, parent_doctype: str | None = None, for_filtering: bool = False
):
"""Check if the user has permission to access the given field."""
if not self.apply_permissions:
return
@ -961,7 +989,10 @@ class Engine:
frappe.PermissionError,
)
permitted_fields = self._get_cached_permitted_fields(doctype, parent_doctype, permission_type)
permission_source = (
self._get_filterable_fields if for_filtering else self._get_cached_permitted_fields
)
permitted_fields = permission_source(doctype, parent_doctype, permission_type)
if fieldname not in permitted_fields:
frappe.throw(
@ -987,6 +1018,42 @@ class Engine:
)
return self.permitted_fields_cache[cache_key]
def _get_filterable_fields(
self, doctype: str, parenttype: str | None = None, permission_type: str | None = None
) -> set:
"""Get fields that can be used in filters/order by/group by.
For users with only select permission on parent doctypes, this returns
all permlevel 0 fields (not just search fields which are used for selected fields).
For users with read permission, returns standard permitted fields.
"""
if permission_type is None:
permission_type = self.get_permission_type(doctype, parenttype)
if permission_type == "select":
meta = frappe.get_meta(doctype)
# Only allow filtering by all permlevel 0 fields for parent doctypes.
if meta.istable:
return set()
# for select permission on parent doctype, allow all permlevel 0 fields in filters
cache_key = (doctype, None, "_filterable_select")
if cache_key not in self.permitted_fields_cache:
if doctype in CORE_DOCTYPES:
# core doctypes have no restrictions - return all valid columns
self.permitted_fields_cache[cache_key] = set(meta.get_valid_columns())
else:
permlevel_0_fields = set(meta.default_fields) | OPTIONAL_FIELDS
for df in meta.get_fieldnames_with_value(with_field_meta=True, with_virtual_fields=False):
if df.permlevel == 0:
permlevel_0_fields.add(df.fieldname)
self.permitted_fields_cache[cache_key] = permlevel_0_fields
return self.permitted_fields_cache[cache_key]
else:
# for read permission, use standard permitted fields
return self._get_cached_permitted_fields(doctype, parenttype, permission_type)
def parse_string_field(self, field: str):
"""
Parses a field string into a pypika Field object.
@ -1023,11 +1090,6 @@ class Engine:
field_name = groups[3] # This will be the field name (e.g., 'field')
if table_name:
# Table name specified (e.g., `tabX`.`y` or tabX.y or `tabX Y`.`y`)
# Ensure the extracted table name is valid before creating DocType object
if not TABLE_NAME_PATTERN.match(table_name.lstrip("tab")):
frappe.throw(_("Invalid characters in table name: {0}").format(table_name))
doctype_name = table_name[3:] if table_name.startswith("tab") else table_name
table_obj = frappe.qb.DocType(doctype_name)
pypika_field = table_obj[field_name]
@ -1135,8 +1197,24 @@ class Engine:
# Note: Comma handling is done in parse_fields before this method is called
return self.parse_string_field(field)
def _normalize_postgres_order_field(self, field):
"""In PostgreSQL order_by fields need to either be in group_by or be aggregated
when used with select and group_by"""
current_sql = field.get_sql() if hasattr(field, "get_sql") else str(field)
if current_sql in self._grouped_queries:
return field
clean_name = current_sql.strip('"')
if clean_name in self.field_aliases:
return field
if not isinstance(field, functions.AggregateFunction):
return functions.Max(field)
return field
def apply_group_by(self, group_by: str | None = None):
parsed_group_by_fields = self._validate_group_by(group_by)
self._grouped_queries = {
f.get_sql() if hasattr(f, "get_sql") else str(f) for f in parsed_group_by_fields
}
self.query = self.query.groupby(*parsed_group_by_fields)
def apply_order_by(self, order_by: str | None):
@ -1146,7 +1224,12 @@ class Engine:
parsed_order_fields = self._validate_order_by(order_by)
for order_field, order_direction in parsed_order_fields:
self.query = self.query.orderby(order_field, order=order_direction)
if self.is_postgres and self.is_aggregate_query:
self.query = self.query.orderby(
self._normalize_postgres_order_field(order_field), order=order_direction
)
else:
self.query = self.query.orderby(order_field, order=order_direction)
def _apply_default_order_by(self):
"""Apply default ordering based on configured DocType metadata"""
@ -1165,14 +1248,24 @@ class Engine:
order_direction = Order.desc if spec_order == "desc" else Order.asc
else:
order_direction = Order.asc if spec_order == "asc" else Order.desc
self.query = self.query.orderby(field, order=order_direction)
if self.is_postgres and self.is_aggregate_query:
self.query = self.query.orderby(
self._normalize_postgres_order_field(field), order=order_direction
)
else:
self.query = self.query.orderby(field, order=order_direction)
else:
field = self.table[sort_field]
if self.db_query_compat:
order_direction = Order.desc if sort_order.lower() == "desc" else Order.asc
else:
order_direction = Order.asc if sort_order.lower() == "asc" else Order.desc
self.query = self.query.orderby(field, order=order_direction)
if self.is_postgres and self.is_aggregate_query:
self.query = self.query.orderby(
self._normalize_postgres_order_field(field), order=order_direction
)
else:
self.query = self.query.orderby(field, order=order_direction)
def _parse_backtick_field_notation(self, field_name: str) -> tuple[str, str] | None:
"""
@ -1209,7 +1302,7 @@ class Engine:
if "`" in field_name:
if parsed := self._parse_backtick_field_notation(field_name):
table_name, field_name = parsed
self._check_field_permission(table_name, field_name)
self.check_filter_field_permission(table_name, field_name)
return frappe.qb.DocType(table_name)[field_name]
# If parsing failed, fall through to error handling below
@ -1223,14 +1316,14 @@ class Engine:
if dynamic_field:
# Check permissions for dynamic field
if isinstance(dynamic_field, ChildTableField):
self._check_field_permission(
self.check_filter_field_permission(
dynamic_field.doctype, dynamic_field.fieldname, dynamic_field.parent_doctype
)
elif isinstance(dynamic_field, LinkTableField):
# Check permission for the link field in parent doctype
self._check_field_permission(self.doctype, dynamic_field.link_fieldname)
self.check_filter_field_permission(self.doctype, dynamic_field.link_fieldname)
# Check permission for the target field in linked doctype
self._check_field_permission(dynamic_field.doctype, dynamic_field.fieldname)
self.check_filter_field_permission(dynamic_field.doctype, dynamic_field.fieldname)
# Apply join for the dynamic field
self.query = dynamic_field.apply_join(self.query, engine=self)
@ -1246,7 +1339,7 @@ class Engine:
)
# Check permissions for simple field
self._check_field_permission(self.doctype, field_name)
self.check_filter_field_permission(self.doctype, field_name)
# Create Field object for simple field
return self.table[field_name]
@ -1533,6 +1626,16 @@ class Engine:
return where_condition
def get_queried_tables(self) -> list[str]:
"""Extract all table names involved in the current query."""
tables = []
for table in self.query._from:
tables.append(table.get_sql())
for join in self.query._joins:
tables.append(join.item.get_sql())
return list(set(tables))
def get_permission_query_conditions(self, doctype: str | None = None) -> list["RawCriterion"]:
"""Add permission query conditions from hooks and server scripts"""
from frappe.core.doctype.server_script.server_script_utils import get_server_script_map
@ -1546,10 +1649,20 @@ class Engine:
if c := frappe.call(frappe.get_attr(method), self.user, doctype=doctype):
conditions.append(RawCriterion(f"({c})"))
active_child_tables = []
current_tables = self.get_queried_tables()
if len(current_tables) > 1:
main_table_name = f"tab{self.doctype}"
for table_name in current_tables:
if table_name != main_table_name:
active_child_tables.append(table_name)
# Get conditions from server scripts
if permission_script_name := get_server_script_map().get("permission_query", {}).get(doctype):
script = frappe.get_doc("Server Script", permission_script_name)
if condition := script.get_permission_query_conditions(self.user):
if condition := script.get_permission_query_conditions(
self.user, active_child_tables=active_child_tables
):
conditions.append(RawCriterion(f"({condition})"))
return conditions
@ -2287,7 +2400,7 @@ class SQLFunctionParser:
elif "`" in arg:
if parsed := self.engine._parse_backtick_field_notation(arg):
table_name, field_name = parsed
self.engine._check_field_permission(table_name, field_name)
self.engine.check_select_field_permission(table_name, field_name)
return Table(f"tab{table_name}")[field_name]
else:
frappe.throw(
@ -2336,4 +2449,4 @@ class SQLFunctionParser:
def _check_function_field_permission(self, field_name: str):
if self.engine.apply_permissions and self.engine.doctype:
self.engine._check_field_permission(self.engine.doctype, field_name)
self.engine.check_select_field_permission(self.engine.doctype, field_name)

View file

@ -2,6 +2,7 @@
# License: MIT. See LICENSE
import json
from datetime import date
import frappe
from frappe import _
@ -10,7 +11,7 @@ from frappe.query_builder.terms import ValueWrapper
@frappe.whitelist()
def update_event(args, field_map):
def update_event(args: str, field_map: str):
"""Updates Event (called via calendar) based on passed `field_map`"""
args = frappe._dict(json.loads(args))
field_map = frappe._dict(json.loads(field_map))
@ -31,7 +32,14 @@ def get_event_conditions(doctype, filters=None):
@frappe.whitelist()
def get_events(doctype, start, end, field_map, filters=None, fields=None):
def get_events(
doctype: str,
start: date,
end: date,
field_map: str,
filters: str | None = None,
fields: str | list[str] | None = None,
):
field_map = frappe._dict(json.loads(field_map))
fields = frappe.parse_json(fields)

View file

@ -384,7 +384,7 @@ class Workspace:
@frappe.whitelist()
@frappe.read_only()
def get_desktop_page(page):
def get_desktop_page(page: str):
"""Apply permissions, customizations and return the configuration for a page on desk.
Args:
@ -681,7 +681,7 @@ def prepare_widget(config, doctype, parentfield):
@frappe.whitelist()
def update_onboarding_step(name, field, value):
def update_onboarding_step(name: str | int, field: str, value: int | str):
"""Update status of onboaridng step
Args:

View file

@ -1,6 +1,8 @@
# Copyright (c) 2015, Frappe Technologies and contributors
# License: MIT. See LICENSE
from typing import Any
import frappe
from frappe import _
from frappe.core.doctype.submission_queue.submission_queue import queue_submission
@ -46,7 +48,13 @@ class BulkUpdate(Document):
@frappe.whitelist()
def submit_cancel_or_update_docs(doctype, docnames, action="submit", data=None, task_id=None):
def submit_cancel_or_update_docs(
doctype: str,
docnames: str | list[str],
action: str = "submit",
data: str | dict[str, Any] | None = None,
task_id: str | None = None,
):
if isinstance(docnames, str):
docnames = frappe.parse_json(docnames)

View file

@ -27,7 +27,9 @@ class CustomHTMLBlock(Document):
@frappe.whitelist()
def get_custom_blocks_for_user(doctype, txt, searchfield, start, page_len, filters):
def get_custom_blocks_for_user(
doctype: str, txt: str, searchfield: str, start: int, page_len: int, filters: dict | str | list
):
# return logged in users private blocks and all public blocks
customHTMLBlock = DocType("Custom HTML Block")

View file

@ -85,7 +85,7 @@ def get_permission_query_conditions(user):
@frappe.whitelist()
def get_permitted_charts(dashboard_name):
def get_permitted_charts(dashboard_name: str):
permitted_charts = []
dashboard = frappe.get_doc("Dashboard", dashboard_name)
for chart in dashboard.charts:
@ -101,7 +101,7 @@ def get_permitted_charts(dashboard_name):
@frappe.whitelist()
def get_permitted_cards(dashboard_name):
def get_permitted_cards(dashboard_name: str):
dashboard = frappe.get_doc("Dashboard", dashboard_name)
return [card for card in dashboard.cards if frappe.has_permission("Number Card", doc=card.card)]

View file

@ -1,8 +1,9 @@
# Copyright (c) 2019, Frappe Technologies and contributors
# License: MIT. See LICENSE
import datetime
import json
from datetime import datetime
from typing import Any
import frappe
from frappe import _
@ -89,16 +90,16 @@ def has_permission(doc, ptype, user):
@frappe.whitelist()
@cache_source
def get(
chart_name=None,
chart=None,
no_cache=None,
filters=None,
from_date=None,
to_date=None,
timespan=None,
time_interval=None,
heatmap_year=None,
refresh=None,
chart_name: str | None = None,
chart: str | dict[str, Any] | None = None,
no_cache: bool | int | None = None,
filters: str | list | dict[str, Any] | None = None,
from_date: str | datetime | None = None,
to_date: str | datetime | None = None,
timespan: str | None = None,
time_interval: str | None = None,
heatmap_year: str | int | None = None,
refresh: bool | int | None = None,
):
if chart_name:
chart: DashboardChart = frappe.get_doc("Dashboard Chart", chart_name)
@ -139,7 +140,7 @@ def get(
@frappe.whitelist()
def create_dashboard_chart(args):
def create_dashboard_chart(args: str | dict[str, Any]):
args = frappe.parse_json(args)
doc = frappe.new_doc("Dashboard Chart")
@ -156,7 +157,7 @@ def create_dashboard_chart(args):
@frappe.whitelist()
def create_report_chart(args):
def create_report_chart(args: str | dict[str, Any]):
doc = create_dashboard_chart(args)
args = frappe.parse_json(args)
args.chart_name = doc.chart_name
@ -165,7 +166,7 @@ def create_report_chart(args):
@frappe.whitelist()
def add_chart_to_dashboard(args):
def add_chart_to_dashboard(args: str | dict[str, Any]):
args = frappe.parse_json(args)
dashboard = frappe.get_doc("Dashboard", args.dashboard)
@ -326,7 +327,9 @@ def get_result(data, timegrain, from_date, to_date, chart_type):
@frappe.whitelist()
@frappe.validate_and_sanitize_search_inputs
def get_charts_for_user(doctype, txt, searchfield, start, page_len, filters):
def get_charts_for_user(
doctype: str, txt: str, searchfield: str, start: int, page_len: int, filters: str | list | dict[str, Any]
):
or_filters = {"owner": frappe.session.user, "is_public": 1}
return frappe.db.get_list(
"Dashboard Chart", fields=["name"], filters=filters, or_filters=or_filters, as_list=1

View file

@ -2,6 +2,7 @@
# License: MIT. See LICENSE
import json
from typing import Any
import frappe
@ -26,7 +27,7 @@ class DashboardSettings(Document):
@frappe.whitelist()
def create_dashboard_settings(user):
def create_dashboard_settings(user: str):
if not frappe.db.exists("Dashboard Settings", user):
doc = frappe.new_doc("Dashboard Settings")
doc.name = user
@ -43,7 +44,7 @@ def get_permission_query_conditions(user):
@frappe.whitelist()
def save_chart_config(reset, config, chart_name):
def save_chart_config(reset: int | str | bool, config: str | dict[str, Any], chart_name: str):
reset = frappe.parse_json(reset)
doc = frappe.get_doc("Dashboard Settings", frappe.session.user)
chart_config = frappe.parse_json(doc.chart_config) or {}

View file

@ -147,11 +147,11 @@
"fieldname": "bg_color",
"fieldtype": "Select",
"label": "Background Color",
"options": "blue\ngray"
"options": "gray\nblue"
}
],
"links": [],
"modified": "2026-01-27 18:17:48.667070",
"modified": "2026-02-04 13:59:30.578370",
"modified_by": "Administrator",
"module": "Desk",
"name": "Desktop Icon",

View file

@ -24,7 +24,7 @@ class DesktopIcon(Document):
from frappe.types import DF
app: DF.Autocomplete | None
bg_color: DF.Literal["blue", "gray"]
bg_color: DF.Literal["gray", "blue"]
hidden: DF.Check
icon_image: DF.Attach | None
icon_type: DF.Literal["Link", "Folder", "App"]
@ -56,6 +56,11 @@ class DesktopIcon(Document):
def on_update(self):
self.export_desktop_icon()
if self.standard:
frappe.cache.delete_key("desktop_icons")
frappe.cache.delete_key("bootinfo")
else:
clear_desktop_icons_cache(user=self.owner)
def after_rename(self, old, new, merge):
delete_desktop_icon_file(self.app, old)
@ -111,12 +116,15 @@ class DesktopIcon(Document):
def check_app_permission(self):
for a in frappe.get_installed_apps():
if frappe.get_hooks(app_name=a)["app_title"][0] == self.label or self.app == a:
permission_method = frappe.get_hooks(app_name=a)["add_to_apps_screen"][0].get(
"has_permission", None
)
if permission_method:
return frappe.call(permission_method)
app_detail = frappe.get_hooks("add_to_apps_screen", app_name=a)
if len(app_detail) != 0:
permission_method = app_detail[0].get("has_permission", None)
if permission_method:
return frappe.call(permission_method)
else:
return True
else:
# App hooks.py doesn't have add_to_apps_screen
return True
# def is_permitted(self):
@ -312,3 +320,24 @@ def create_user_icons(user, data):
frappe.cache.hset("_user_settings", f"{'Desktop Icon'}::{user}", json.dumps(user_settings))
return json.dumps(user_settings)
return data
@frappe.whitelist()
def add_workspace_to_desktop(workspace: str):
sidebar = frappe.new_doc("Workspace Sidebar")
sidebar_item = frappe.new_doc("Workspace Sidebar Item")
sidebar_item.label = workspace
sidebar_item.type = "Link"
sidebar_item.link_to = workspace
sidebar_item.link_type = "Workspace"
sidebar.title = workspace
sidebar.append("items", sidebar_item)
sidebar.save()
new_icon = frappe.new_doc("Desktop Icon")
new_icon.label = workspace
new_icon.icon_type = "Link"
new_icon.link_to = workspace
new_icon.link_type = "Workspace Sidebar"
new_icon.insert()
return {"icon": new_icon.as_dict()}

View file

@ -4,6 +4,7 @@
import json
import frappe
from frappe.desk.doctype.desktop_icon.desktop_icon import add_workspace_to_desktop
from frappe.model.document import Document
@ -24,11 +25,10 @@ class DesktopLayout(Document):
@frappe.whitelist()
def save_layout(user, layout, new_icons):
def save_layout(user: str, layout: str, new_icons: str | None = None):
if not user:
user = frappe.session.user
layout = json.loads(layout)
new_icons = json.loads(new_icons)
desktop_layout = None
try:
desktop_layout = frappe.get_doc("Desktop Layout", frappe.session.user)
@ -40,16 +40,36 @@ def save_layout(user, layout, new_icons):
if layout:
desktop_layout.layout = json.dumps(layout)
desktop_layout.save()
for icon in new_icons:
desktop_icon = frappe.new_doc("Desktop Icon")
desktop_icon.update(icon)
desktop_icon.owner = frappe.session.user
desktop_icon.save()
if new_icons:
new_icons = json.loads(new_icons)
for icon in new_icons:
workspace = icon.get("workspace")
if workspace:
new_workspace = frappe.new_doc("Workspace")
new_workspace.update(workspace)
new_workspace.title = new_workspace.label
new_workspace.save()
return add_workspace_to_desktop(new_workspace.name)
desktop_icon = frappe.new_doc("Desktop Icon")
desktop_icon.update(icon)
desktop_icon.owner = frappe.session.user
desktop_icon.save()
return {"layout": layout}
@frappe.whitelist()
def get_layout():
"""Return the current user's saved desktop layout. Used on desk load to avoid stale cached HTML."""
try:
doc = frappe.get_doc("Desktop Layout", frappe.session.user)
if doc.layout:
return json.loads(doc.layout)
except frappe.DoesNotExistError:
frappe.clear_last_message()
return None
@frappe.whitelist()
def delete_layout():
return frappe.delete_doc_if_exists("Desktop Layout", frappe.session.user)

View file

@ -4,6 +4,7 @@
import json
from datetime import date, datetime
from typing import Any
import frappe
import frappe.share
@ -235,7 +236,7 @@ class Event(Document):
@frappe.whitelist()
def update_attending_status(event_name, attendee, status):
def update_attending_status(event_name: str, attendee: str, status: str):
event_doc = frappe.get_doc("Event", event_name)
if event_doc.owner == attendee == frappe.session.user:
@ -252,7 +253,7 @@ def update_attending_status(event_name, attendee, status):
@frappe.whitelist()
def delete_communication(event, reference_doctype, reference_docname):
def delete_communication(event: str | dict[str, Any], reference_doctype: str, reference_docname: str | int):
if isinstance(event, str):
event = json.loads(event)
@ -332,7 +333,11 @@ def send_event_digest():
@frappe.whitelist()
@http_cache(max_age=5 * 60, stale_while_revalidate=60 * 60)
def get_events(
start: date, end: date, user: str | None = None, for_reminder: bool = False, filters=None
start: date,
end: date,
user: str | None = None,
for_reminder: bool = False,
filters: str | list | dict[str, Any] | None = None,
) -> list[frappe._dict]:
user = user or frappe.session.user
type EventLikeDict = Event | frappe._dict

View file

@ -75,7 +75,7 @@ class FormTour(Document):
@frappe.whitelist()
def reset_tour(tour_name):
def reset_tour(tour_name: str):
for user in frappe.get_all("User", pluck="name"):
onboarding_status = frappe.parse_json(frappe.db.get_value("User", user, "onboarding_status"))
onboarding_status.pop(tour_name, None)
@ -88,7 +88,7 @@ def reset_tour(tour_name):
@frappe.whitelist()
def update_user_status(value, step):
def update_user_status(value: str, step: str):
from frappe.utils.telemetry import capture
step = frappe.parse_json(step)

View file

@ -66,7 +66,7 @@ def has_permission(doc, ptype, user):
@frappe.whitelist()
def get_kanban_boards(doctype):
def get_kanban_boards(doctype: str):
"""Get Kanban Boards for doctype to show in List View"""
return frappe.get_list(
"Kanban Board",
@ -76,7 +76,7 @@ def get_kanban_boards(doctype):
@frappe.whitelist()
def add_column(board_name, column_title):
def add_column(board_name: str, column_title: str):
"""Adds new column to Kanban Board"""
doc = frappe.get_doc("Kanban Board", board_name)
for col in doc.columns:
@ -89,7 +89,7 @@ def add_column(board_name, column_title):
@frappe.whitelist()
def archive_restore_column(board_name, column_title, status):
def archive_restore_column(board_name: str, column_title: str, status: str):
"""Set column's status to status"""
doc = frappe.get_doc("Kanban Board", board_name)
for col in doc.columns:
@ -101,7 +101,7 @@ def archive_restore_column(board_name, column_title, status):
@frappe.whitelist()
def update_order(board_name, order):
def update_order(board_name: str, order: str):
"""Save the order of cards in columns"""
board = frappe.get_doc("Kanban Board", board_name)
doctype = board.reference_doctype
@ -129,7 +129,14 @@ def update_order(board_name, order):
@frappe.whitelist()
def update_order_for_single_card(board_name, docname, from_colname, to_colname, old_index, new_index):
def update_order_for_single_card(
board_name: str,
docname: str,
from_colname: str,
to_colname: str,
old_index: str | int,
new_index: str | int,
):
"""Save the order of cards in columns"""
board = frappe.get_doc("Kanban Board", board_name)
doctype = board.reference_doctype
@ -171,7 +178,7 @@ def get_kanban_column_order_and_index(board, colname):
@frappe.whitelist()
def add_card(board_name, docname, colname):
def add_card(board_name: str, docname: str, colname: str):
board = frappe.get_doc("Kanban Board", board_name)
frappe.has_permission(board.reference_doctype, "write", throw=True)
@ -185,7 +192,7 @@ def add_card(board_name, docname, colname):
@frappe.whitelist()
def quick_kanban_board(doctype, board_name, field_name, project=None):
def quick_kanban_board(doctype: str, board_name: str, field_name: str, project: str | None = None):
"""Create new KanbanBoard quickly with default options"""
doc = frappe.new_doc("Kanban Board")
@ -228,7 +235,7 @@ def get_order_for_column(board, colname):
@frappe.whitelist()
def update_column_order(board_name, order):
def update_column_order(board_name: str, order: str):
"""Set the order of columns in Kanban Board"""
board = frappe.get_doc("Kanban Board", board_name)
order = json.loads(order)
@ -260,7 +267,7 @@ def update_column_order(board_name, order):
@frappe.whitelist()
def set_indicator(board_name, column_name, indicator):
def set_indicator(board_name: str, column_name: str, indicator: str):
"""Set the indicator color of column"""
board = frappe.get_doc("Kanban Board", board_name)

View file

@ -1,6 +1,8 @@
# Copyright (c) 2020, Frappe Technologies and contributors
# License: MIT. See LICENSE
from typing import Any
import frappe
from frappe.model.document import Document
@ -29,7 +31,9 @@ class ListViewSettings(Document):
@frappe.whitelist()
def save_listview_settings(doctype, listview_settings, removed_listview_fields):
def save_listview_settings(
doctype: str, listview_settings: str | dict[str, Any], removed_listview_fields: str | list[str]
):
listview_settings = frappe.parse_json(listview_settings)
removed_listview_fields = frappe.parse_json(removed_listview_fields)
@ -87,7 +91,7 @@ def set_in_list_view_property(doctype, field, value):
@frappe.whitelist()
def get_default_listview_fields(doctype):
def get_default_listview_fields(doctype: str):
meta = frappe.get_meta(doctype)
path = frappe.get_module_path(
frappe.scrub(meta.module), "doctype", frappe.scrub(meta.name), frappe.scrub(meta.name) + ".json"

View file

@ -23,7 +23,7 @@
"default": "1",
"fieldname": "enabled",
"fieldtype": "Check",
"label": "Enabled"
"label": "Enabled System Notification"
},
{
"fieldname": "subscribed_documents",
@ -98,7 +98,7 @@
"in_create": 1,
"index_web_pages_for_search": 1,
"links": [],
"modified": "2025-04-16 17:15:25.641232",
"modified": "2026-02-17 13:39:35.159083",
"modified_by": "Administrator",
"module": "Desk",
"name": "Notification Settings",

View file

@ -130,8 +130,8 @@ def has_permission(doc, ptype="read", user=None):
@frappe.whitelist()
def set_seen_value(value, user):
def set_seen_value(value: int, user: str):
if frappe.flags.read_only:
return
frappe.db.set_value("Notification Settings", user, "seen", value, update_modified=False)
frappe.db.set_value("Notification Settings", frappe.session.user, "seen", value, update_modified=False)

View file

@ -1,6 +1,9 @@
# Copyright (c) 2020, Frappe Technologies and contributors
# License: MIT. See LICENSE
from datetime import date, datetime
from typing import Any
import frappe
from frappe import _
from frappe.boot import get_allowed_report_names
@ -121,7 +124,11 @@ def has_permission(doc, ptype, user):
@frappe.whitelist()
def get_result(doc, filters, to_date=None):
def get_result(
doc: str | dict[str, Any] | Document,
filters: str | list | dict[str, Any],
to_date: str | datetime | date | None = None,
):
doc = frappe.parse_json(doc)
fields = []
sql_function_map = {
@ -158,7 +165,9 @@ def get_result(doc, filters, to_date=None):
@frappe.whitelist()
def get_percentage_difference(doc, filters, result):
def get_percentage_difference(
doc: str | dict[str, Any], filters: str | list | dict[str, Any], result: float | int | str
):
doc = frappe.parse_json(doc)
result = frappe.parse_json(result)
@ -194,7 +203,7 @@ def calculate_previous_result(doc, filters):
@frappe.whitelist()
def create_number_card(args):
def create_number_card(args: str | dict[str, Any]):
args = frappe.parse_json(args)
doc = frappe.new_doc("Number Card")
@ -205,7 +214,9 @@ def create_number_card(args):
@frappe.whitelist()
@frappe.validate_and_sanitize_search_inputs
def get_cards_for_user(doctype, txt, searchfield, start, page_len, filters):
def get_cards_for_user(
doctype: str, txt: str, searchfield: str, start: int, page_len: int, filters: str | list | dict[str, Any]
):
doctype = "Number Card"
meta = frappe.get_meta(doctype)
searchfields = meta.get_search_fields()
@ -229,7 +240,7 @@ def get_cards_for_user(doctype, txt, searchfield, start, page_len, filters):
@frappe.whitelist()
def create_report_number_card(args):
def create_report_number_card(args: str | dict[str, Any]):
card = create_number_card(args)
args = frappe.parse_json(args)
args.name = card.name
@ -238,7 +249,7 @@ def create_report_number_card(args):
@frappe.whitelist()
def add_card_to_dashboard(args):
def add_card_to_dashboard(args: str | dict[str, Any]):
args = frappe.parse_json(args)
dashboard = frappe.get_doc("Dashboard", args.dashboard)

View file

@ -50,7 +50,7 @@ class OnboardingStep(Document):
@frappe.whitelist()
def get_onboarding_steps(ob_steps):
def get_onboarding_steps(ob_steps: str):
steps = []
for s in json.loads(ob_steps):
doc = frappe.get_doc("Onboarding Step", s.get("step"))

View file

@ -1,6 +1,8 @@
# Copyright (c) 2022, Frappe Technologies and contributors
# License: MIT. See LICENSE
from typing import Any
import frappe
from frappe.deferred_insert import deferred_insert as _deferred_insert
from frappe.model.document import Document
@ -29,7 +31,7 @@ class RouteHistory(Document):
@frappe.whitelist()
def deferred_insert(routes):
def deferred_insert(routes: str | list[dict[str, Any]]):
routes = [
{
"user": frappe.session.user,

View file

@ -48,7 +48,7 @@ class SidebarItemGroup(Document):
@frappe.whitelist()
def get_reports(module_name=None):
def get_reports(module_name: str | None = None):
reports_info = []
if module_name:
sidebar_group = frappe.get_doc("Sidebar Item Group", module_name)

View file

@ -51,7 +51,7 @@ class SystemConsole(Document):
@frappe.whitelist(methods=["POST"])
def execute_code(doc):
def execute_code(doc: str):
console = frappe.get_doc(json.loads(doc))
console.run()
return console.as_dict()

View file

@ -33,7 +33,7 @@ def check_user_tags(dt):
@frappe.whitelist()
def add_tag(tag, dt, dn, color=None):
def add_tag(tag: str, dt: str, dn: str, color: str | None = None):
"adds a new tag to a record, and creates the Tag master"
DocTags(dt).add(dn, tag)
@ -41,7 +41,7 @@ def add_tag(tag, dt, dn, color=None):
@frappe.whitelist()
def add_tags(tags, dt, docs, color=None):
def add_tags(tags: str | list[str], dt: str, docs: str | list[str], color: str | None = None):
"adds a new tag to a record, and creates the Tag master"
tags = frappe.parse_json(tags)
docs = frappe.parse_json(docs)
@ -51,20 +51,20 @@ def add_tags(tags, dt, docs, color=None):
@frappe.whitelist()
def remove_tag(tag, dt, dn):
def remove_tag(tag: str, dt: str, dn: str):
"removes tag from the record"
DocTags(dt).remove(dn, tag)
@frappe.whitelist()
def get_tagged_docs(doctype, tag):
def get_tagged_docs(doctype: str, tag: str):
frappe.has_permission(doctype, throw=True)
doctype = DocType(doctype)
return (frappe.qb.from_(doctype).where(doctype._user_tags.like(tag)).select(doctype.name)).run()
@frappe.whitelist()
def get_tags(doctype, txt):
def get_tags(doctype: str, txt: str):
tag = frappe.get_list("Tag", filters=[["name", "like", f"%{txt}%"]])
tags = [t.name for t in tag]
@ -176,7 +176,7 @@ def update_tags(doc, tags):
@frappe.whitelist()
def get_documents_for_tag(tag):
def get_documents_for_tag(tag: str):
"""Search for given text in Tag Link.
:param tag: tag to be searched

View file

@ -173,5 +173,5 @@ def has_permission(doc, ptype="read", user=None):
@frappe.whitelist()
def new_todo(description):
def new_todo(description: str):
frappe.get_doc({"doctype": "ToDo", "description": description}).insert()

View file

@ -8,7 +8,6 @@ frappe.views.calendar["ToDo"] = {
id: "name",
title: "description",
allDay: "allDay",
progress: "progress",
},
gantt: true,
filters: [

View file

@ -8,7 +8,7 @@ frappe.ui.form.on("Workspace", {
refresh: function (frm) {
frm.enable_save();
frm.trigger("add_to_desktop");
let url = `/desk/${
frm.doc.public
? frappe.router.slug(frm.doc.title)
@ -44,6 +44,26 @@ frappe.ui.form.on("Workspace", {
frm.layout.show_message(message);
},
add_to_desktop: function (frm) {
if (frappe.app.sidebar.get_workspace_sidebars(frm.doc.title).length === 0) {
frm.add_custom_button(__("Add to Desktop"), function () {
frappe.call({
method: "frappe.desk.doctype.desktop_icon.desktop_icon.add_workspace_to_desktop",
args: {
workspace: frm.doc.name,
},
callback: function (r) {
if (r.message.status) {
frappe.toast({
message: __("Workspace added to desktop"),
indicator: "green",
});
}
},
});
});
}
},
disable_form: function (frm) {
frm.fields
.filter((field) => field.has_input)

View file

@ -125,16 +125,32 @@ class Workspace(Document):
self.name = doc.name = doc.label = doc.title
def on_trash(self):
if not self.module:
self.delete_sidebar()
self.delete_desktop_icon()
if self.public and not is_workspace_manager():
frappe.throw(_("You need to be Workspace Manager to delete a public workspace."))
self.delete_from_my_workspaces()
def delete_desktop_icon(self):
frappe.delete_doc_if_exists("Desktop Icon", self.title)
def delete_sidebar(self):
frappe.delete_doc_if_exists("Workspace Sidebar", self.title)
def delete_from_my_workspaces(self):
if not self.public:
if self.public:
return
try:
my_workspaces = frappe.get_doc("Workspace Sidebar", f"My Workspaces-{frappe.session.user}")
for w in my_workspaces.items:
if self.name == w.link_to:
frappe.delete_doc("Workspace Sidebar Item", w.name)
except frappe.DoesNotExistError:
frappe.clear_messages()
return
for w in my_workspaces.items:
if self.name == w.link_to:
frappe.delete_doc("Workspace Sidebar Item", w.name)
def after_delete(self):
if disable_saving_as_public():
@ -268,7 +284,7 @@ def get_report_type(report):
@frappe.whitelist()
def new_page(new_page):
def new_page(new_page: str):
if not loads(new_page):
return
@ -312,7 +328,7 @@ def new_page(new_page):
@frappe.whitelist()
def save_page(name, public, new_widgets, blocks):
def save_page(name: str, public: str | int, new_widgets: str, blocks: str):
public = frappe.parse_json(public)
doc = frappe.get_doc("Workspace", name)
@ -327,7 +343,7 @@ def save_page(name, public, new_widgets, blocks):
@frappe.whitelist()
def update_page(name, title, icon, indicator_color, parent, public):
def update_page(name: str, title: str, icon: str, indicator_color: str, parent: str, public: str | int):
public = frappe.parse_json(public)
doc = frappe.get_doc("Workspace", name)

View file

@ -195,7 +195,7 @@ def create_workspace_sidebar_for_workspaces():
@frappe.whitelist()
def add_sidebar_items(sidebar_title, sidebar_items):
def add_sidebar_items(sidebar_title: str, sidebar_items: str):
sidebar_items = loads(sidebar_items)
title = f"{sidebar_title}-{frappe.session.user}"
w = frappe.get_doc("Workspace Sidebar", sidebar_title)

View file

@ -4,6 +4,7 @@
"""assign/unassign to ToDo"""
import json
from typing import Any
import frappe
import frappe.share
@ -40,7 +41,7 @@ def get(args=None):
@frappe.whitelist()
def add(args=None, *, ignore_permissions=False):
def add(args: dict[str, Any] | None = None, *, ignore_permissions: bool | int = False):
"""add in someone's to do list
args = {
"assign_to": [],
@ -140,7 +141,7 @@ def add(args=None, *, ignore_permissions=False):
@frappe.whitelist()
def add_multiple(args=None):
def add_multiple(args: dict[str, Any] | None = None):
if not args:
args = frappe.local.form_dict
@ -174,12 +175,12 @@ def close_all_assignments(doctype, name, ignore_permissions=False):
@frappe.whitelist()
def remove(doctype, name, assign_to, ignore_permissions=False):
def remove(doctype: str, name: str | int, assign_to: str, ignore_permissions: bool | int = False):
return set_status(doctype, name, "", assign_to, status="Cancelled", ignore_permissions=ignore_permissions)
@frappe.whitelist()
def remove_multiple(doctype, names, ignore_permissions=False):
def remove_multiple(doctype: str, names: str, ignore_permissions: bool | int = False):
docname_list = json.loads(names)
for name in docname_list:
@ -193,7 +194,7 @@ def remove_multiple(doctype, names, ignore_permissions=False):
@frappe.whitelist()
def close(doctype: str, name: str, assign_to: str, ignore_permissions=False):
def close(doctype: str, name: str, assign_to: str, ignore_permissions: bool | int = False):
if assign_to != frappe.session.user:
frappe.throw(_("Only the assignee can complete this to-do."))

View file

@ -246,7 +246,7 @@ def is_document_followed(doctype, doc_name, user):
@frappe.whitelist()
def get_follow_users(doctype, doc_name):
def get_follow_users(doctype: str, doc_name: str):
return frappe.get_all(
"Document Follow", filters={"ref_doctype": doctype, "ref_docname": doc_name}, fields=["user"]
)

View file

@ -14,7 +14,9 @@ from frappe.modules import load_doctype_module
@frappe.whitelist()
def get_submitted_linked_docs(doctype: str, name: str, ignore_doctypes_on_cancel_all=None) -> list[tuple]:
def get_submitted_linked_docs(
doctype: str, name: str, ignore_doctypes_on_cancel_all: str | list[str] | None = None
) -> list[tuple]:
"""Get all the nested submitted documents those are present in referencing tables (dependent tables).
:param doctype: Document type
@ -365,7 +367,7 @@ def get_referencing_documents(
@frappe.whitelist()
def cancel_all_linked_docs(docs, ignore_doctypes_on_cancel_all=None):
def cancel_all_linked_docs(docs: str, ignore_doctypes_on_cancel_all: str | list[str] | None = None):
"""
Cancel all linked doctype, optionally ignore doctypes specified in a list.
@ -527,14 +529,14 @@ def get_linked_docs(doctype: str, name: str, linkinfo: dict | None = None) -> di
@frappe.whitelist()
def get(doctype, docname):
def get(doctype: str, docname: str):
frappe.has_permission(doctype, doc=docname, throw=True)
linked_doctypes = get_linked_doctypes(doctype=doctype)
return get_linked_docs(doctype=doctype, name=docname, linkinfo=linked_doctypes)
@frappe.whitelist()
def get_linked_doctypes(doctype, without_ignore_user_permissions_enabled=False):
def get_linked_doctypes(doctype: str, without_ignore_user_permissions_enabled: int | bool = False):
"""add list of doctypes this doctype is 'linked' with.
Example, for Customer:

View file

@ -3,6 +3,7 @@
import json
import typing
from typing import Any
from urllib.parse import quote_plus
import frappe
@ -12,16 +13,14 @@ import frappe.utils
from frappe import _, _dict
from frappe.core.doctype.permission_type.permission_type import get_doctype_ptype_map
from frappe.desk.form.document_follow import is_document_followed
from frappe.model.document import Document
from frappe.model.utils.user_settings import get_user_settings
from frappe.permissions import check_doctype_permission, get_doc_permissions, has_permission
from frappe.utils.data import cstr
if typing.TYPE_CHECKING:
from frappe.model.document import Document
@frappe.whitelist()
def getdoc(doctype, name):
def getdoc(doctype: str, name: str | int):
"""
Loads a doclist for a given document. This method is called directly from the client.
Requires "doctype", "name" as form variables.
@ -60,7 +59,7 @@ def getdoc(doctype, name):
@frappe.whitelist()
def getdoctype(doctype, with_parent=False):
def getdoctype(doctype: str, with_parent: int | bool = False):
"""load doctype"""
docs = []
@ -90,7 +89,11 @@ def get_meta_bundle(doctype):
@frappe.whitelist()
def get_docinfo(doc=None, doctype=None, name=None):
def get_docinfo(
doc: Document | dict | str | None = None,
doctype: str | None = None,
name: str | int | None = None,
):
from frappe.share import _get_users as get_docshares
if not doc:
@ -200,7 +203,7 @@ def get_versions(doc: "Document") -> list[dict]:
@frappe.whitelist()
def get_communications(doctype, name, start=0, limit=20):
def get_communications(doctype: str, name: str | int, start: str | int = 0, limit: str | int = 20):
from frappe.utils import cint
frappe.get_lazy_doc(doctype, name).check_permission()
@ -500,7 +503,7 @@ def update_user_info(docinfo, doc=None):
@frappe.whitelist()
def get_user_info_for_viewers(users):
def get_user_info_for_viewers(users: str):
user_info = {}
for user in json.loads(users):
frappe.utils.add_user_info(user, user_info)

View file

@ -13,7 +13,7 @@ from frappe.utils.telemetry import capture_doc
@frappe.whitelist(methods=["POST", "PUT"])
def savedocs(doc, action):
def savedocs(doc: str, action: str):
"""save / submit / update doclist"""
doc = frappe.get_doc(json.loads(doc))
capture_doc(doc, action)
@ -52,7 +52,12 @@ def savedocs(doc, action):
@frappe.whitelist(methods=["POST", "PUT"])
def cancel(doctype=None, name=None, workflow_state_fieldname=None, workflow_state=None):
def cancel(
doctype: str | None = None,
name: str | int | None = None,
workflow_state_fieldname: str | None = None,
workflow_state: str | None = None,
):
"""cancel a doclist"""
doc = frappe.get_doc(doctype, name)
capture_doc(doc, "Cancel")

View file

@ -49,7 +49,7 @@ def add_comment(
@frappe.whitelist()
def update_comment(name, content):
def update_comment(name: str | int, content: str):
"""allow only owner to update comment"""
doc = frappe.get_doc("Comment", name)
@ -77,40 +77,51 @@ def update_comment_publicity(name: str, publish: bool):
@frappe.whitelist()
def get_next(doctype, value, prev, filters=None, sort_order="desc", sort_field="creation"):
def get_next(
doctype: str,
value: str,
prev: str | int,
filters: dict | str | None = None,
sort_order: str = "desc",
sort_field: str = "creation",
):
prev = int(prev)
if not filters:
filters = []
if isinstance(filters, str):
filters = json.loads(filters)
# # condition based on sort order
condition = ">" if sort_order.lower() == "asc" else "<"
table = frappe.qb.DocType(doctype)
sort_column = table[sort_field]
name_column = table.name
current_sort_value = frappe.db.get_value(doctype, value, sort_field)
# switch the condition
if prev:
sort_order = "asc" if sort_order.lower() == "desc" else "desc"
condition = "<" if condition == ">" else ">"
is_ascending = sort_order.lower() == "asc"
if prev == is_ascending:
composite_condition = (sort_column < current_sort_value) | (
(sort_column == current_sort_value) & (name_column < value)
)
order = frappe.qb.desc
else:
composite_condition = (sort_column > current_sort_value) | (
(sort_column == current_sort_value) & (name_column > value)
)
order = frappe.qb.asc
# # add condition for next or prev item
filters.append([doctype, sort_field, condition, frappe.get_value(doctype, value, sort_field)])
res = frappe.get_list(
doctype,
fields=["name"],
filters=filters,
order_by=f"{sort_field} {sort_order}",
limit_start=0,
limit_page_length=1,
as_list=True,
query = (
frappe.qb.get_query(doctype, filters=filters, fields=["name"], ignore_permissions=False)
.orderby(sort_column, order=order)
.orderby(name_column, order=order)
.where(composite_condition)
.limit(1)
)
if not res:
frappe.msgprint(_("No further records"))
return None
else:
if res := query.run(as_list=True):
return res[0][0]
frappe.msgprint(_("No further records"))
return None
def get_pdf_link(doctype, docname, print_format="Standard", no_letterhead=0):
return f"/api/method/frappe.utils.print_format.download_pdf?doctype={doctype}&name={docname}&format={print_format}&no_letterhead={no_letterhead}"

View file

@ -7,7 +7,7 @@ import frappe
@frappe.whitelist()
def update_task(args, field_map):
def update_task(args: str, field_map: str):
"""Updates Doc (called via gantt) based on passed `field_map`"""
args = frappe._dict(json.loads(args))
field_map = frappe._dict(json.loads(field_map))

View file

@ -13,7 +13,7 @@ from frappe.utils import get_link_to_form
@frappe.whitelist()
def toggle_like(doctype, name, add=False):
def toggle_like(doctype: str, name: str, add: str | bool = False):
"""Adds / removes the current user in the `__liked_by` property of the given document.
If column does not exist, will add it in the database.

View file

@ -6,7 +6,7 @@ from frappe.www.printview import set_title_values_for_link_and_dynamic_link_fiel
@frappe.whitelist()
@http_cache(max_age=60 * 10)
def get_preview_data(doctype, docname):
def get_preview_data(doctype: str, docname: str | int):
preview_fields = []
meta = frappe.get_meta(doctype)
if not meta.show_preview_popup:

View file

@ -1,6 +1,8 @@
# Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and Contributors
# License: MIT. See LICENSE
from typing import Any
import frappe
from frappe.model import is_default_field
from frappe.query_builder import Order
@ -10,7 +12,7 @@ from frappe.query_builder.utils import DocType
@frappe.whitelist()
def get_list_settings(doctype):
def get_list_settings(doctype: str):
try:
return frappe.get_cached_doc("List View Settings", doctype)
except frappe.DoesNotExistError:
@ -18,7 +20,7 @@ def get_list_settings(doctype):
@frappe.whitelist()
def set_list_settings(doctype, values):
def set_list_settings(doctype: str, values: str | dict[str, Any]):
try:
doc = frappe.get_doc("List View Settings", doctype)
except frappe.DoesNotExistError:

View file

@ -242,7 +242,7 @@ def get_filters_for(doctype):
@frappe.whitelist()
@frappe.read_only()
def get_open_count(doctype: str, name: str, items=None):
def get_open_count(doctype: str, name: str | int, items: str | list[str] | None = None):
"""Get count for internal and external links for given transactions
:param doctype: Reference DocType

View file

@ -80,6 +80,12 @@
margin-top: 60px;
padding: 20px;
}
.icons-container:has(.sidebar-card){
margin-top: 20px;
.sidebar-card{
gap: 6px;
}
}
.modal
.modal-body .icons-container,.folder-icon .icons-container {
padding:0px;
@ -267,7 +273,7 @@
height: var(--folder-thumbnail-icon-height);
width: var(--folder-thumbnail-icon-height);
padding: 0px;
border-radius: 2px;
border-radius: 4px;
& .icon{
width: 5px;
height: 5px;
@ -500,31 +506,72 @@
justify-content: center;
}
.title-widget{
display: inline-block;
.title-widget {
display: inline-flex;
align-items: center;
justify-content: center;
min-height: 1.5rem;
cursor: text;
position: relative;
}
.title-input-label{
position: absolute;
top: 0px;
color: var(--neutral-white);
line-height: 22px;
z-index: 1;
pointers-events: none;
width: 100%;
text-align: center;
}
.title-input-wrapper{
position: relative;
display: inline-block;
.title-widget--read-only {
cursor: default;
}
.title-input-wrapper input{
border: 1px solid transparent;
width: 100%;
height: 100%;
background: none;
.title-widget--editable:hover .title-input-label {
opacity: 0.9;
}
.desktop-modal-heading .title-widget--read-only .title-input-label:hover {
background-color: transparent;
}
.desktop-modal-heading .title-widget .title-input-label {
color: var(--neutral-white);
}
font-size: var(--text-2xl);
line-height: 1.3;
padding: 2px 4px;
border-radius: 4px;
transition: background-color 0.15s ease;
}
.desktop-modal-heading .title-widget--editable:hover .title-input-label {
background-color: rgba(255, 255, 255, 0.08);
}
.title-input-wrapper {
display: inline-block;
min-width: 80px;
}
.desktop-modal-heading .title-input-wrapper .title-input {
color: var(--neutral-white);
font-size: var(--text-2xl);
line-height: 1.3;
background: rgba(255, 255, 255, 0.1);
border: 1px solid rgba(255, 255, 255, 0.25);
border-radius: 4px;
padding: 2px 8px;
outline: none;
min-width: 80px;
box-sizing: border-box;
}
.desktop-modal-heading .title-input-wrapper .title-input::placeholder {
color: rgba(255, 255, 255, 0.5);
}
.desktop-modal-heading .title-input-wrapper .title-input:focus {
border-color: rgba(255, 255, 255, 0.5);
background-color: rgba(255, 255, 255, 0.12);
}
.title-input-mirror {
position: absolute;
visibility: hidden;
white-space: pre;
font-size: var(--text-2xl);
font-family: inherit;
padding: 0 4px;
}

View file

@ -1,6 +1,6 @@
<!-- jinja -->
<div class="desktop-wrapper">
<header class="navbar navbar-expand navbar-container" role="navigation">
<header class="desktop-navbar navbar navbar-expand navbar-container" role="navigation">
<div class="navbar-home">
<img
id="brand-logo"
@ -8,20 +8,21 @@
alt="{{ _("App Logo") |e }}"
>
</div>
<div class="desktop-search-wrapper input-group search-bar">
<button
id="desktop-navbar-modal-search"
class="btn-reset flex justify-between desktop-navbar-modal-search"
title="Search"
>
<span class="desktop-search-icon">
<svg class="icon icon-sm"><use href="#icon-search"></use></svg>
{{ _("Search") }}
</span>
<span class="desktop-keyboard-shortcut">
</span>
</button>
</div>
{% if (show_search_bar) %}
<div class="desktop-search-wrapper input-group search-bar">
<button id="desktop-navbar-modal-search" class="btn-reset flex justify-between desktop-navbar-modal-search"
title="Search">
<span class="desktop-search-icon">
<svg class="icon icon-sm">
<use href="#icon-search"></use>
</svg>
{{ _("Search") }}
</span>
<span class="desktop-keyboard-shortcut">
</span>
</button>
</div>
{% endif %}
<div class="flex" style="gap:16px; align-items: center;">
<div class="desktop-notifications">
<div class="dropdown dropdown-notifications">
@ -64,5 +65,5 @@
{{ _("Save") }}
</button>
</div>
<div class="hidden" id="desktop-layout">{{ desktop_layout }}</pre>
<div class="hidden" id="desktop-layout">{{ desktop_layout | safe }}</div>
</div>

View file

@ -30,6 +30,9 @@ frappe.pages["desktop"].on_page_load = function (wrapper) {
// setup();
};
frappe.pages["desktop"].on_page_show = function (wrapper) {
frappe.pages["desktop"].desktop_page.update();
};
function get_workspaces_from_app_name(app_name) {
const app = frappe.boot.app_data.filter((a) => {
return a.app_title === app_name;
@ -68,15 +71,15 @@ function get_route(desktop_icon) {
} else if (first_link.link_type == "Workspace") {
let workspaces = frappe.workspaces[frappe.router.slug(first_link.link_to)];
if (workspaces) {
if (workspaces.public) {
route = "/desk/" + frappe.router.slug(first_link.link_to);
} else {
route = "/desk/private/" + frappe.router.slug(workspaces.title);
}
}
if (first_link.route) {
route = first_link.route;
let args = {
type: "workspace",
name: workspaces.title,
public: workspaces.public ? 1 : 0,
route_options: {
sidebar: desktop_icon.label,
},
};
route = frappe.utils.generate_route(args);
}
} else if (first_link.link_type === "URL") {
route = first_link.url;
@ -171,12 +174,9 @@ class DesktopPage {
this.page = page;
this.edit_mode = false;
this.desktop_menu_items = [];
this.make(this.page);
this.setup();
}
update() {
this.make(this.page);
this.setup();
this.make();
}
prepare() {
this.apps_icons = [];
@ -219,7 +219,7 @@ class DesktopPage {
let saved_layout = JSON.parse(localStorage.getItem(`${frappe.session.user}:desktop`));
if (!this.data && saved_layout) {
this.save_layout(saved_layout);
} else if (Object.keys(this.data).length != 0) {
} else if (this.data && Array.isArray(this.data) && this.data.length > 0) {
frappe.desktop_icons = this.data;
} else {
frappe.desktop_icons = frappe.boot.desktop_icons;
@ -245,10 +245,23 @@ class DesktopPage {
make() {
this.page.page_head.hide();
$(this.page.body).empty();
this.awesomebar_setup = false;
$(frappe.render_template("desktop")).appendTo(this.page.body);
if (!this.data) {
this.data = JSON.parse($("#desktop-layout").text());
if (this.data !== undefined) {
this.render();
} else {
const me = this;
frappe.call({
method: "frappe.desk.doctype.desktop_layout.desktop_layout.get_layout",
callback: function (r) {
me.data = r.message;
me.render();
},
});
}
}
render() {
this.sync_layout();
this.prepare();
this.wrapper = this.page.body.find(".desktop-container");
@ -264,8 +277,8 @@ class DesktopPage {
if (this.edit_mode) {
this.start_editing_layout();
}
this.setup();
}
setup() {
$(document).trigger("desktop_screen", { desktop: this });
this.setup_avatar();
@ -296,6 +309,9 @@ class DesktopPage {
{
label: "Edit Layout",
icon: "edit",
condition: function () {
return !me.edit_mode;
},
onClick: function () {
me.$desktop_edit_button.hide();
frappe.new_desktop_icons = JSON.parse(JSON.stringify(frappe.desktop_icons));
@ -358,23 +374,53 @@ class DesktopPage {
let grid = $($(".desktop-container .icons").get(0));
this.add_new_icon = `<div class="desktop-icon desktop-edit-mode add-new-icon" title="Add New Icon">
${frappe.utils.icon("plus", "lg")}
New Icon
<div>Workspace</div>
</div>`;
grid.append(this.add_new_icon);
$(".add-new-icon").on("click", function () {
frappe.ui.form.make_quick_entry(
"Desktop Icon",
function (icon) {
let d = new frappe.ui.Dialog({
title: "New Workspace",
fields: [
{
label: "Label",
fieldname: "label",
fieldtype: "Data",
},
{
label: "Public",
fieldname: "public",
fieldtype: "Check",
},
],
primary_action_label: "Create",
primary_action: function (values) {
let icon = frappe.model.get_new_doc("Desktop Icon");
icon.workspace = {
label: values.label,
public: values.public,
};
icon.link_type = "Workspace Sidebar";
icon.label = values.label;
frappe.new_desktop_icons.push(icon);
frappe.new_icons.push(icon);
frappe.pages["desktop"].desktop_page.update();
d.hide();
},
"",
"",
null,
true,
true
);
});
d.show();
// frappe.ui.form.make_quick_entry(
// "Desktop Icon",
// function (icon) {
// frappe.new_desktop_icons.push(icon);
// frappe.new_icons.push(icon);
// frappe.pages["desktop"].desktop_page.update();
// },
// "",
// "",
// null,
// true,
// true
// );
});
}
setup_edit_buttons() {
@ -432,7 +478,7 @@ class DesktopPage {
},
{
icon: "rotate-ccw",
label: "Reset to Default",
label: "Reset Desktop Layout",
onClick: function () {
reset_to_default();
window.location.reload();
@ -992,7 +1038,8 @@ class DesktopIcon {
let modal = frappe.desktop_utils.create_desktop_modal(me);
modal.setup(me.icon_title, me.child_icons, 4);
let $title = modal.modal.find(".modal-title");
let title = new InlineEditor($title, this.icon_data.label, function (
const edit_mode = frappe.pages["desktop"].desktop_page.edit_mode;
let title = new InlineEditor($title, this.icon_data.label, edit_mode, function (
old_value,
new_value
) {
@ -1004,6 +1051,10 @@ class DesktopIcon {
add_icons_to_folder(new_value, folder_icons);
frappe.pages["desktop"].desktop_page.update();
if (!frappe.pages["desktop"].desktop_page.edit_mode) {
frappe.pages["desktop"].desktop_page.save_layout(frappe.desktop_icons, []);
}
});
modal.show();
});
@ -1192,52 +1243,106 @@ class IconsPane {
}
class InlineEditor {
constructor(container, initialValue = "", onRename = () => {}) {
constructor(container, initialValue = "", editMode = false, onRename = () => {}) {
this.container = container;
this.initialValue = initialValue;
this.onRename = onRename;
this.currentValue = initialValue;
this.editMode = editMode;
this.onRename = typeof editMode === "function" ? editMode : onRename;
if (typeof editMode === "function") {
this.editMode = false;
}
this.isEditing = false;
this.render();
this.bindEvents();
}
render() {
const tooltip = this.editMode ? __("Click to edit") : "";
const editableClass = this.editMode ? "title-widget--editable" : "title-widget--read-only";
this.container.html(`
<div class="title-widget">
<div class="title-input-label">
<span>${__(this.initialValue)}</span>
</div>
<div class="title-input-wrapper">
<input class="title-input">
<div class="title-widget ${editableClass}" title="${tooltip}">
<span class="title-input-label">${__(this.currentValue)}</span>
<div class="title-input-wrapper" style="display: none;">
<input type="text" class="title-input" />
</div>
</div>
`);
this.$widget = this.container.find(".title-widget");
this.input = this.container.find(".title-input");
this.label = this.container.find(".title-input-label");
this.wrapper = this.container.find(".title-input-wrapper");
}
bindEvents() {
this.container.on("click", () => {
if (frappe.pages["desktop"].desktop_page.edit_mode) {
this.label.css("visibility", "hidden");
this.input.focus().select();
}
this.label.on("click", (e) => {
e.stopPropagation();
if (!frappe.pages["desktop"].desktop_page.edit_mode) return;
this.startEditing();
});
this.input.on("keydown", (event) => {
if (event.key === "Enter") {
const newValue = this.input.val().trim();
this.input.css("display", "none");
this.label.css("visibility", "visible");
this.label.find("span").text(newValue);
this.onRename(this.initialValue, newValue, this);
this.input.on("keydown", (e) => {
if (e.key === "Enter") {
e.preventDefault();
this.commit();
} else if (e.key === "Escape") {
e.preventDefault();
this.cancel();
}
});
this.input.on("blur", () => {
this.label.css("visibility", "visible");
this.commit();
});
this.input.on("input", () => {
this.resizeInput();
});
}
startEditing() {
if (this.isEditing) return;
this.isEditing = true;
this.initialValue = this.currentValue;
this.label.hide();
this.wrapper.show();
this.input.val(this.currentValue);
this.input.css("width", "4px");
this.resizeInput();
this.input.focus().select();
}
commit() {
if (!this.isEditing) return;
this.isEditing = false;
const newValue = this.input.val().trim();
const effective = newValue || this.initialValue;
this.label.text(effective).show();
this.wrapper.hide();
this.input.val(effective);
this.currentValue = effective;
if (effective !== this.initialValue) {
this.onRename(this.initialValue, effective, this);
}
}
cancel() {
if (!this.isEditing) return;
this.isEditing = false;
this.label.text(this.initialValue).show();
this.wrapper.hide();
this.input.val(this.currentValue);
this.input.blur();
}
resizeInput() {
const mirror = $("<span>")
.addClass("title-input-mirror")
.text(this.input.val() || "");
this.$widget.append(mirror);
const textWidth = mirror.get(0).offsetWidth;
mirror.remove();
this.input.css("width", Math.max(80, textWidth + 20) + "px");
}
}

View file

@ -14,8 +14,11 @@ def get_context(context):
brand_logo = frappe.get_hooks("app_logo_url", app_name="frappe")[0]
context.brand_logo = brand_logo
try:
context.desktop_layout = frappe.get_doc("Desktop Layout", frappe.session.user).layout or {}
layout = frappe.get_doc("Desktop Layout", frappe.session.user).layout
context.desktop_layout = layout if layout else "[]"
except frappe.DoesNotExistError:
frappe.clear_last_message()
context.desktop_layout = {}
context.show_search_bar = frappe.get_cached_value("User", frappe.session.user, "search_bar")
return context

View file

@ -2,6 +2,7 @@
# License: MIT. See LICENSE
import json
from typing import Any
import frappe
from frappe import _
@ -48,7 +49,7 @@ def get_setup_stages(args): # nosemgrep
@frappe.whitelist()
def setup_complete(args):
def setup_complete(args: str | dict[str, Any]):
"""Calls hooks for `setup_wizard_complete`, sets home page as `desktop`
and clears cache. If wizard breaks, calls `setup_wizard_exception` hook"""
@ -68,7 +69,9 @@ def setup_complete(args):
@frappe.whitelist()
def initialize_system_settings_and_user(system_settings_data, user_data):
def initialize_system_settings_and_user(
system_settings_data: str | dict[str, Any], user_data: str | dict[str, Any]
):
system_settings = frappe.get_single("System Settings")
if cint(system_settings.setup_complete):
@ -377,7 +380,7 @@ def disable_future_access():
@frappe.whitelist()
def load_messages(language):
def load_messages(language: str):
"""Load translation messages for given language from all `setup_wizard_requires`
javascript files"""
from frappe.translate import get_messages_for_boot
@ -391,16 +394,34 @@ def load_messages(language):
@frappe.whitelist()
def load_languages():
language_codes = frappe.db.sql(
"select language_code, language_name from tabLanguage order by name", as_dict=True
Language = frappe.qb.DocType("Language")
language_codes = (
frappe.qb.from_(Language)
.select(Language.language_code, Language.language_name)
.where(Language.enabled == 1)
.orderby(Language.language_code)
.run(as_dict=1)
)
language_opts = (
frappe.qb.from_(Language)
.select(
Language.language_name.as_("value"),
Language.language_name.as_("label"),
Language.language_code.as_("description"),
)
.where(Language.enabled == 1)
.orderby(Language.language_code)
.run(as_dict=1)
)
codes_to_names = {}
for d in language_codes:
codes_to_names[d.language_code] = d.language_name
return {
"default_language": frappe.db.get_value("Language", frappe.local.lang, "language_name")
or frappe.local.lang,
"languages": sorted(frappe.db.sql_list("select language_name from tabLanguage order by name")),
"languages": language_opts,
"codes_to_names": codes_to_names,
}

View file

@ -75,7 +75,13 @@ def get_report_result(report, filters):
@frappe.read_only()
def generate_report_result(
report, filters=None, user=None, custom_columns=None, is_tree=False, parent_field=None
report,
filters=None,
user=None,
custom_columns=None,
is_tree=False,
parent_field=None,
skip_total_calculation=False,
):
user = user or frappe.session.user
filters = filters or []
@ -110,12 +116,13 @@ def generate_report_result(
if result:
result = get_filtered_data(report.ref_doctype, columns, result, user)
if cint(report.add_total_row) and result and not skip_total_row:
has_total_row = cint(report.add_total_row) and result and not skip_total_row
if has_total_row and not skip_total_calculation:
result = add_total_row(result, columns, is_tree=is_tree, parent_field=parent_field)
if isinstance(filters, dict) and filters.get("translate_data"):
total_row = cint(report.add_total_row) and result and not skip_total_row
result = translate_report_data(result, total_row)
result = translate_report_data(result, has_total_row)
return {
"result": result,
@ -146,7 +153,7 @@ def normalize_result(result, columns):
@frappe.whitelist()
def get_script(report_name):
def get_script(report_name: str):
report = get_report_doc(report_name)
module = report.module or frappe.db.get_value("DocType", report.ref_doctype, "module")
@ -192,16 +199,17 @@ def get_reference_report(report):
@frappe.whitelist()
@frappe.read_only()
def run(
report_name,
filters=None,
user=None,
ignore_prepared_report=False,
custom_columns=None,
is_tree=False,
parent_field=None,
are_default_filters=True,
js_filters=None,
):
report_name: str,
filters: str | dict | None = None,
user: str | None = None,
ignore_prepared_report: bool = False,
custom_columns: str | list | None = None,
is_tree: bool = False,
parent_field: str | None = None,
are_default_filters: bool = True,
js_filters: str | list | None = None,
skip_total_calculation: bool = False,
) -> dict:
if not user:
user = frappe.session.user
validate_filters_permissions(report_name, filters, user, js_filters)
@ -217,8 +225,11 @@ def run(
if sbool(are_default_filters) and report.get("custom_filters"):
filters = report.custom_filters
is_prepared_report = report.prepared_report and not sbool(ignore_prepared_report) and not custom_columns
skip_total_calculation = sbool(skip_total_calculation)
try:
if report.prepared_report and not sbool(ignore_prepared_report) and not custom_columns:
if is_prepared_report:
if filters:
if isinstance(filters, str):
filters = json.loads(filters)
@ -228,14 +239,20 @@ def run(
dn = ""
result = get_prepared_report_result(report, filters, dn, user)
else:
result = generate_report_result(report, filters, user, custom_columns, is_tree, parent_field)
result = generate_report_result(
report, filters, user, custom_columns, is_tree, parent_field, skip_total_calculation
)
add_data_to_monitor(report=report.reference_report or report.name)
except Exception:
frappe.log_error("Report Error")
frappe.log_error("Report execution failed for: {}".format(report_name))
raise
result["add_total_row"] = report.add_total_row and not result.get("skip_total_row", False)
if skip_total_calculation and is_prepared_report:
# remove total row from result
result["result"] = result["result"][:-1]
if sbool(are_default_filters) and report.get("custom_filters"):
result["custom_filters"] = report.custom_filters
@ -246,7 +263,7 @@ def add_custom_column_data(custom_columns, result):
doctype_names_from_custom_field = []
for column in custom_columns:
if len(column["fieldname"].split("-")) > 1:
# length greater than 1, means that the column is a custom field with confilicting fieldname
# length greater than 1, means that the column is a custom field with conflicting fieldname
doctype_name = frappe.unscrub(column["fieldname"].split("-")[1])
doctype_names_from_custom_field.append(doctype_name)
column["fieldname"] = column["fieldname"].split("-")[0]
@ -259,7 +276,7 @@ def add_custom_column_data(custom_columns, result):
for row in result:
link_field = column.get("link_field")
# backwards compatibile `link_field`
# backwards compatible `link_field`
# old custom reports which use `str` should not break.
if isinstance(link_field, str):
link_field = frappe._dict({"fieldname": link_field, "names": []})
@ -367,13 +384,22 @@ def _export_query(form_params, csv_params, populate_response=True):
custom_columns = frappe.parse_json(form_params.custom_columns or "[]")
include_indentation = form_params.include_indentation
include_filters = form_params.include_filters
visible_idx = form_params.visible_idx
visible_idx = form_params.visible_idx or []
ignore_visible_idx = sbool(form_params.get("ignore_visible_idx"))
skip_all_rows_total = not ignore_visible_idx
include_hidden_columns = form_params.include_hidden_columns
if isinstance(visible_idx, str):
visible_idx = json.loads(visible_idx)
data = run(report_name, form_params.filters, custom_columns=custom_columns, are_default_filters=False)
data = run(
report_name,
form_params.filters,
custom_columns=custom_columns,
are_default_filters=False,
skip_total_calculation=skip_all_rows_total,
)
data = frappe._dict(data)
data.filters = form_params.applied_filters
@ -384,13 +410,19 @@ def _export_query(form_params, csv_params, populate_response=True):
)
return
# calculate total row only for visible rows
if skip_all_rows_total and cint(data.get("add_total_row")):
data["result"] = add_total_row(data.result, data.columns, visible_idx=visible_idx)
format_fields(data)
xlsx_data, column_widths, header_index = build_xlsx_data(
data,
visible_idx,
include_indentation,
include_filters=include_filters,
include_hidden_columns=include_hidden_columns,
ignore_visible_idx=ignore_visible_idx,
)
if file_format_type == "CSV":
@ -525,10 +557,14 @@ def build_xlsx_data(
column_widths.append(column_width)
result.append(column_data)
last_row_index = len(data.result) - 1
# build table from result
for row_idx, row in enumerate(data.result):
# only pick up rows that are visible in the report
if not ignore_visible_idx and row_idx not in visible_idx:
# only pick up rows that are visible in the report + total row if added
if not (
ignore_visible_idx or row_idx in visible_idx or (data.add_total_row and row_idx == last_row_index)
):
continue
row_data = []
@ -555,11 +591,31 @@ def build_xlsx_data(
return result, column_widths, header_index
def add_total_row(result, columns, meta=None, is_tree=False, parent_field=None):
def add_total_row(
result,
columns,
meta=None,
is_tree=False,
parent_field=None,
visible_idx: list[int] | None = None,
ignore_visible_idx: bool = False,
) -> list[dict | list[Any]]:
total_row = [""] * len(columns)
has_percent = []
for i, col in enumerate(columns):
if not visible_idx or len(visible_idx) == len(result):
# It's not possible to have same length and different content.
ignore_visible_idx = True
visible_idx_set = set()
else:
# Note: converted for faster lookups
ignore_visible_idx = False
visible_idx_set = set(visible_idx)
# all rows are dict or list/tuple, we can check the first row to decide the type
is_row_dict = isinstance(result[0], dict) if result else False
for col_idx, col in enumerate(columns):
fieldtype, options, fieldname = None, None, None
if isinstance(col, str):
if meta:
@ -582,10 +638,16 @@ def add_total_row(result, columns, meta=None, is_tree=False, parent_field=None):
fieldname = col.get("fieldname")
options = col.get("options")
for row in result:
if i >= len(row):
for row_idx, row in enumerate(result):
# Skip rows not in visible_idx when filtering is enabled
if not ignore_visible_idx and row_idx not in visible_idx_set:
continue
cell = row.get(fieldname) if isinstance(row, dict) else row[i]
# Skip if column index is out of bounds for list/tuple rows
if not is_row_dict and col_idx >= len(row):
continue
cell = row.get(fieldname) if is_row_dict else row[col_idx]
if fieldtype is None:
if isinstance(cell, int):
fieldtype = "Int"
@ -593,21 +655,23 @@ def add_total_row(result, columns, meta=None, is_tree=False, parent_field=None):
fieldtype = "Float"
if fieldtype in ["Currency", "Int", "Float", "Percent", "Duration"] and flt(cell):
if not (is_tree and row.get(parent_field)):
total_row[i] = flt(total_row[i]) + flt(cell)
total_row[col_idx] = flt(total_row[col_idx]) + flt(cell)
if fieldtype == "Percent" and i not in has_percent:
has_percent.append(i)
if fieldtype == "Percent" and col_idx not in has_percent:
has_percent.append(col_idx)
if fieldtype == "Time" and cell:
if not total_row[i]:
total_row[i] = timedelta(hours=0, minutes=0, seconds=0)
total_row[i] = total_row[i] + cell
if not total_row[col_idx]:
total_row[col_idx] = timedelta(hours=0, minutes=0, seconds=0)
total_row[col_idx] = total_row[col_idx] + cell
if fieldtype == "Link" and options == "Currency":
total_row[i] = result[0].get(fieldname) if isinstance(result[0], dict) else result[0][i]
total_row[col_idx] = result[0].get(fieldname) if is_row_dict else result[0][col_idx]
for i in has_percent:
total_row[i] = flt(total_row[i]) / len(result)
for col_idx in has_percent:
total_row[col_idx] = flt(total_row[col_idx]) / (
len(result) if ignore_visible_idx else len(visible_idx)
)
first_col_fieldtype = None
if isinstance(columns[0], str):
@ -625,7 +689,7 @@ def add_total_row(result, columns, meta=None, is_tree=False, parent_field=None):
@frappe.whitelist()
def get_data_for_custom_field(doctype, field, names=None):
def get_data_for_custom_field(doctype: str, field: str, names: str | list[str] | None = None):
if not frappe.has_permission(doctype, "read"):
frappe.throw(_("Not Permitted to read {0}").format(_(doctype)), frappe.PermissionError)
@ -643,7 +707,7 @@ def get_data_for_custom_report(columns, result):
for column in columns:
if link_field := column.get("link_field"):
# backwards compatibile `link_field`
# backwards compatible `link_field`
# old custom reports which use `str` should not break
if isinstance(link_field, str):
link_field = frappe._dict({"fieldname": link_field, "names": []})
@ -658,12 +722,15 @@ def get_data_for_custom_report(columns, result):
names.append(row.get(row_key))
names = list(set(names))
doc_field_value_map[(doctype, fieldname)] = get_data_for_custom_field(doctype, fieldname, names)
if names:
doc_field_value_map[(doctype, fieldname)] = get_data_for_custom_field(
doctype, fieldname, names
)
return doc_field_value_map
@frappe.whitelist()
def save_report(reference_report, report_name, columns, filters):
def save_report(reference_report: str, report_name: str, columns: str, filters: str):
report_doc = get_report_doc(reference_report)
docname = frappe.db.exists(
@ -971,7 +1038,7 @@ def validate_filters_permissions(report_name, filters=None, user=None, js_filter
)
def translate_report_data(data, total_row):
def translate_report_data(data, total_row: bool):
for d in data[:-1] if total_row else data:
for field, value in d.items():
if isinstance(value, str):

View file

@ -5,6 +5,7 @@
import json
from functools import lru_cache
from typing import Any
from sql_metadata import Parser
@ -319,7 +320,7 @@ def compress(data, args=None):
@frappe.whitelist(methods=["POST", "PUT"])
def save_report(name, doctype, report_settings):
def save_report(name: str | int, doctype: str, report_settings: str):
"""Save reports of type Report Builder from Report View"""
if frappe.db.exists("Report", name):
@ -349,7 +350,7 @@ def save_report(name, doctype, report_settings):
@frappe.whitelist(methods=["POST", "DELETE"])
def delete_report(name):
def delete_report(name: str | int):
"""Delete reports of type Report Builder from Report View"""
report = frappe.get_doc("Report", name)
@ -641,13 +642,17 @@ def delete_bulk(doctype, items):
)
else:
frappe.msgprint(
_("Deleted all documents successfully"), realtime=True, title=_("Bulk Operation Successful")
_(f"Deleted {len(items)} records from {doctype} doctype"),
realtime=True,
title=_("Bulk Operation Successful"),
)
@frappe.whitelist()
@frappe.read_only()
def get_sidebar_stats(stats, doctype, filters=None):
def get_sidebar_stats(
stats: str | list[str], doctype: str, filters: str | list | dict[str, Any] | None = None
):
if filters is None:
filters = []
@ -663,7 +668,7 @@ def get_sidebar_stats(stats, doctype, filters=None):
@frappe.whitelist()
@frappe.read_only()
def get_stats(stats, doctype, filters=None):
def get_stats(stats: str, doctype: str, filters: str | None = None):
"""get tag info"""
import json
@ -720,7 +725,7 @@ def get_stats(stats, doctype, filters=None):
@frappe.whitelist()
def get_filter_dashboard_data(stats, doctype, filters=None):
def get_filter_dashboard_data(stats: str, doctype: str, filters: str | None = None):
"""get tags info"""
import json

View file

@ -72,7 +72,7 @@ def search_widget(
start: int = 0,
page_length: int = 10,
filters: str | None | dict | list = None,
filter_fields=None,
filter_fields: str | None = None,
as_dict: bool = False,
reference_doctype: str | None = None,
ignore_user_permissions: bool = False,
@ -372,7 +372,7 @@ def relevance_sorter(key, query, as_dict):
@frappe.whitelist()
def get_names_for_mentions(search_term):
def get_names_for_mentions(search_term: str):
users_for_mentions = frappe.cache.get_value("users_for_mentions", get_users_for_mentions)
user_groups = frappe.cache.get_value("user_groups", get_user_groups)
@ -408,7 +408,7 @@ def get_user_groups():
@frappe.whitelist()
def get_link_title(doctype, docname):
def get_link_title(doctype: str, docname: str | int):
meta = frappe.get_meta(doctype)
if meta.show_title_field_in_link:

View file

@ -7,7 +7,7 @@ from frappe.query_builder import Field, functions
@frappe.whitelist()
def get_all_nodes(doctype, label, parent, tree_method, **filters):
def get_all_nodes(doctype: str, label: str, parent: str, tree_method: str | None, **filters):
"""Recursively gets all data from tree nodes"""
filters.pop("cmd", None)
@ -35,7 +35,7 @@ def get_all_nodes(doctype, label, parent, tree_method, **filters):
@frappe.whitelist()
def get_children(doctype, parent="", include_disabled=False, **filters):
def get_children(doctype: str, parent: str = "", include_disabled: str | int | bool = False, **filters):
if isinstance(include_disabled, str):
include_disabled = frappe.sbool(include_disabled)
return _get_children(doctype, parent, include_disabled=include_disabled)

View file

@ -16,7 +16,7 @@ def sendmail_to_system_managers(subject, content):
@frappe.whitelist()
def get_contact_list(txt, page_length=20, extra_filters: str | None = None) -> list[dict]:
def get_contact_list(txt: str, page_length: int = 20, extra_filters: str | None = None) -> list[dict]:
"""Return email ids for a multiselect field."""
if extra_filters:
extra_filters = frappe.parse_json(extra_filters)
@ -60,7 +60,7 @@ def get_system_managers():
@frappe.whitelist()
def relink(name, reference_doctype=None, reference_name=None):
def relink(name: str, reference_doctype: str | None = None, reference_name: str | None = None):
frappe.db.sql(
"""update
`tabCommunication`
@ -77,7 +77,9 @@ def relink(name, reference_doctype=None, reference_name=None):
@frappe.whitelist()
@frappe.validate_and_sanitize_search_inputs
def get_communication_doctype(doctype, txt, searchfield, start, page_len, filters):
def get_communication_doctype(
doctype: str, txt: str, searchfield: str, start: int, page_len: int, filters: str | list | dict
):
user_perms = frappe.utils.user.UserPermissions(frappe.session.user)
user_perms.build_permissions()
can_read = user_perms.can_read
@ -156,33 +158,35 @@ def sendmail(
"""Send email using user's default **Email Account** or global default **Email Account**.
:param recipients: List of recipients.
:param sender: Email sender. Default is current user or default outgoing account.
:param subject: Email Subject.
:param message: (or `content`) Email Content.
:param as_markdown: Convert content markdown to HTML.
:param delayed: Send via scheduled email sender **Email Queue**. Don't send immediately. Default is true
:param send_priority: Priority for Email Queue, default 1.
:param reference_doctype: (or `doctype`) Append as communication to this DocType.
:param reference_name: (or `name`) Append as communication to this document name.
:param unsubscribe_method: Unsubscribe url with options email, doctype, name. e.g. `/api/method/unsubscribe`
:param unsubscribe_params: Unsubscribe paramaters to be loaded on the unsubscribe_method [optional] (dict).
:param attachments: List of attachments.
:param reply_to: Reply-To Email Address.
:param message_id: Used for threading. If a reply is received to this email, Message-Id is sent back as In-Reply-To in received email.
:param in_reply_to: Used to send the Message-Id of a received email back as In-Reply-To.
:param send_after: Send after the given datetime.
:param expose_recipients: Display all recipients in the footer message - "This email was sent to"
:param communication: Communication link to be set in Email Queue record
:param inline_images: List of inline images as {"filename", "filecontent"}. All src properties will be replaced with random Content-Id
:param template: Name of html template from templates/emails folder
:param args: Arguments for rendering the template
:param header: Append header in email
:param with_container: Wraps email inside a styled container
:param x_priority: 1 = HIGHEST, 3 = NORMAL, 5 = LOWEST
:param email_headers: Additional headers to be added in the email, e.g. {"X-Custom-Header": "value"} or {"Custom-Header": "value"}. Automatically prepends "X-" to the header name if not present.
:param raw_html: Whether to treat email template as a complete HTML file
:param add_css: Whether to add CSS from hooks/email_css to the email template
:param recipients: List of recipients.
:param sender: Email sender. Default is current user or default outgoing account.
:param subject: Email Subject.
:param message: (or `content`) Email Content.
:param as_markdown: Convert content markdown to HTML.
:param delayed: Send via scheduled email sender **Email Queue**. Don't send immediately. Default is true
:param send_priority: Priority for Email Queue, default 1.
:param reference_doctype: (or `doctype`) Append as communication to this DocType.
:param reference_name: (or `name`) Append as communication to this document name.
:param unsubscribe_method: Unsubscribe url with options email, doctype, name. e.g. `/api/method/unsubscribe`
:param unsubscribe_params: Unsubscribe paramaters to be loaded on the unsubscribe_method [optional] (dict).
:param attachments: List of attachments.
:param reply_to: Reply-To Email Address.
:param message_id: Used for threading. If a reply is received to this email, Message-Id is sent back as In-Reply-To in received email.
:param in_reply_to: Used to send the Message-Id of a received email back as In-Reply-To.
:param send_after: Send after the given datetime.
:param expose_recipients: Controls recipient visibility. "header" shows all TO recipients in the To header.
"footer" adds "This email was sent to..." text in footer. None (default) hides TO recipients from each other.
Note: CC header is always visible regardless of this setting (as per email semantics).
:param communication: Communication link to be set in Email Queue record
:param inline_images: List of inline images as {"filename", "filecontent"}. All src properties will be replaced with random Content-Id
:param template: Name of html template from templates/emails folder
:param args: Arguments for rendering the template
:param header: Append header in email
:param with_container: Wraps email inside a styled container
:param x_priority: 1 = HIGHEST, 3 = NORMAL, 5 = LOWEST
:param email_headers: Additional headers to be added in the email, e.g. {"X-Custom-Header": "value"} or {"Custom-Header": "value"}. Automatically prepends "X-" to the header name if not present.
:param raw_html: Whether to treat email template as a complete HTML file
:param add_css: Whether to add CSS from hooks/email_css to the email template
"""
from frappe.utils.jinja import get_email_from_template

View file

@ -299,7 +299,7 @@ class AutoEmailReport(Document):
@frappe.whitelist()
def download(name):
def download(name: str):
"""Download report locally"""
auto_email_report = frappe.get_doc("Auto Email Report", name)
auto_email_report.check_permission()
@ -315,7 +315,7 @@ def download(name):
@frappe.whitelist()
def send_now(name):
def send_now(name: str):
"""Send Auto Email report now"""
auto_email_report = frappe.get_doc("Auto Email Report", name)
auto_email_report.check_permission()

Some files were not shown because too many files have changed in this diff Show more