fix: ruff fixes

Signed-off-by: Akhil Narang <me@akhilnarang.dev>
This commit is contained in:
Akhil Narang 2024-02-05 20:05:19 +05:30
parent fbb0e44791
commit 26ae0f3460
No known key found for this signature in database
GPG key ID: 9DCC61E211BF645F
128 changed files with 696 additions and 557 deletions

View file

@ -1,8 +1,8 @@
# Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and Contributors
# MIT License. See LICENSE
import json
import os
from pathlib import Path
import json
STANDARD_INCLUSIONS = ["*.py"]

View file

@ -1,7 +1,7 @@
import sys
import requests
from urllib.parse import urlparse
import requests
WEBSITE_REPOS = [
"erpnext_com",
@ -36,11 +36,7 @@ def is_documentation_link(word: str) -> bool:
def contains_documentation_link(body: str) -> bool:
return any(
is_documentation_link(word)
for line in body.splitlines()
for word in line.split()
)
return any(is_documentation_link(word) for line in body.splitlines() for word in line.split())
def check_pull_request(number: str) -> "tuple[int, str]":
@ -53,12 +49,7 @@ def check_pull_request(number: str) -> "tuple[int, str]":
head_sha = (payload.get("head") or {}).get("sha")
body = (payload.get("body") or "").lower()
if (
not title.startswith("feat")
or not head_sha
or "no-docs" in body
or "backport" in body
):
if not title.startswith("feat") or not head_sha or "no-docs" in body or "backport" in body:
return 0, "Skipping documentation checks... 🏃"
if contains_documentation_link(body):

View file

@ -6,11 +6,11 @@ import subprocess
import sys
import time
import urllib.request
from functools import lru_cache
from functools import cache
from urllib.error import HTTPError
@lru_cache(maxsize=None)
@cache
def fetch_pr_data(pr_number, repo, endpoint=""):
api_url = f"https://api.github.com/repos/{repo}/pulls/{pr_number}"
@ -83,9 +83,7 @@ def is_ci(file):
def is_frontend_code(file):
return file.lower().endswith(
(".css", ".scss", ".less", ".sass", ".styl", ".js", ".ts", ".vue", ".html")
)
return file.lower().endswith((".css", ".scss", ".less", ".sass", ".styl", ".js", ".ts", ".vue", ".html"))
def is_docs(file):

View file

@ -2,7 +2,9 @@ import re
import sys
errors_encounter = 0
pattern = re.compile(r"_\(([\"']{,3})(?P<message>((?!\1).)*)\1(\s*,\s*context\s*=\s*([\"'])(?P<py_context>((?!\5).)*)\5)*(\s*,(\s*?.*?\n*?)*(,\s*([\"'])(?P<js_context>((?!\11).)*)\11)*)*\)")
pattern = re.compile(
r"_\(([\"']{,3})(?P<message>((?!\1).)*)\1(\s*,\s*context\s*=\s*([\"'])(?P<py_context>((?!\5).)*)\5)*(\s*,(\s*?.*?\n*?)*(,\s*([\"'])(?P<js_context>((?!\11).)*)\11)*)*\)"
)
words_pattern = re.compile(r"_{1,2}\([\"'`]{1,3}.*?[a-zA-Z]")
start_pattern = re.compile(r"_{1,2}\([f\"'`]{1,3}")
f_string_pattern = re.compile(r"_\(f[\"']")
@ -10,44 +12,50 @@ starts_with_f_pattern = re.compile(r"_\(f")
# skip first argument
files = sys.argv[1:]
files_to_scan = [_file for _file in files if _file.endswith(('.py', '.js'))]
files_to_scan = [_file for _file in files if _file.endswith((".py", ".js"))]
for _file in files_to_scan:
with open(_file, 'r') as f:
print(f'Checking: {_file}')
with open(_file) as f:
print(f"Checking: {_file}")
file_lines = f.readlines()
for line_number, line in enumerate(file_lines, 1):
if 'frappe-lint: disable-translate' in line:
if "frappe-lint: disable-translate" in line:
continue
if start_matches := start_pattern.search(line):
if starts_with_f := starts_with_f_pattern.search(line):
if has_f_string := f_string_pattern.search(line):
errors_encounter += 1
print(f'\nF-strings are not supported for translations at line number {line_number}\n{line.strip()[:100]}')
print(
f"\nF-strings are not supported for translations at line number {line_number}\n{line.strip()[:100]}"
)
continue
match = pattern.search(line)
error_found = False
if not match and line.endswith((',\n', '[\n')):
if not match and line.endswith((",\n", "[\n")):
# concat remaining text to validate multiline pattern
line = "".join(file_lines[line_number - 1:])
line = line[start_matches.start() + 1:]
line = "".join(file_lines[line_number - 1 :])
line = line[start_matches.start() + 1 :]
match = pattern.match(line)
if not match:
error_found = True
print(f'\nTranslation syntax error at line number {line_number}\n{line.strip()[:100]}')
print(f"\nTranslation syntax error at line number {line_number}\n{line.strip()[:100]}")
if not error_found and not words_pattern.search(line):
error_found = True
print(f'\nTranslation is useless because it has no words at line number {line_number}\n{line.strip()[:100]}')
print(
f"\nTranslation is useless because it has no words at line number {line_number}\n{line.strip()[:100]}"
)
if error_found:
errors_encounter += 1
if errors_encounter > 0:
print('\nVisit "https://frappeframework.com/docs/user/en/translations" to learn about valid translation strings.')
print(
'\nVisit "https://frappeframework.com/docs/user/en/translations" to learn about valid translation strings.'
)
sys.exit(1)
else:
print('\nGood To Go!')
print("\nGood To Go!")

View file

@ -152,12 +152,12 @@ class _LazyTranslate:
return self.value
def __add__(self, other):
if isinstance(other, (str, _LazyTranslate)):
if isinstance(other, str | _LazyTranslate):
return self.value + str(other)
raise NotImplementedError
def __radd__(self, other):
if isinstance(other, (str, _LazyTranslate)):
if isinstance(other, str | _LazyTranslate):
return str(other) + self.value
return NotImplementedError
@ -1300,7 +1300,7 @@ def get_cached_value(doctype: str, name: str, fieldname: str = "name", as_dict:
values = [doc.get(f) for f in fieldname]
if as_dict:
return _dict(zip(fieldname, values))
return _dict(zip(fieldname, values, strict=False))
return values
@ -1644,7 +1644,7 @@ def _load_app_hooks(app_name: str | None = None):
raise
def _is_valid_hook(obj):
return not isinstance(obj, (types.ModuleType, types.FunctionType, type))
return not isinstance(obj, types.ModuleType | types.FunctionType | type)
for key, value in inspect.getmembers(app_hooks, predicate=_is_valid_hook):
if not key.startswith("_"):
@ -1904,7 +1904,7 @@ def copy_doc(doc: "Document", ignore_no_copy: bool = True) -> "Document":
if not ignore_no_copy:
remove_no_copy_fields(newdoc)
for i, d in enumerate(newdoc.get_all_children()):
for d in newdoc.get_all_children():
d.set("__islocal", 1)
for fieldname in fields_to_clear:
@ -2493,7 +2493,7 @@ def mock(type, size=1, locale="en"):
if type not in dir(fake):
raise ValueError("Not a valid mock type.")
else:
for i in range(size):
for _ in range(size):
data = getattr(fake, type)()
results.append(data)
@ -2507,7 +2507,7 @@ def validate_and_sanitize_search_inputs(fn):
def wrapper(*args, **kwargs):
from frappe.desk.search import sanitize_searchfield
kwargs.update(dict(zip(fn.__code__.co_varnames, args)))
kwargs.update(dict(zip(fn.__code__.co_varnames, args, strict=False)))
sanitize_searchfield(kwargs["searchfield"])
kwargs["start"] = cint(kwargs["start"])
kwargs["page_len"] = cint(kwargs["page_len"])
@ -2520,7 +2520,7 @@ def validate_and_sanitize_search_inputs(fn):
return wrapper
from frappe.utils.error import log_error # noqa: backward compatibility
from frappe.utils.error import log_error # noqa
if _tune_gc:
# generational GC gets triggered after certain allocs (g0) which is 700 by default.

View file

@ -376,7 +376,7 @@ class CookieManager:
}
def delete_cookie(self, to_delete):
if not isinstance(to_delete, (list, tuple)):
if not isinstance(to_delete, list | tuple):
to_delete = [to_delete]
self.to_delete.extend(to_delete)

View file

@ -104,7 +104,7 @@ class TestAutoAssign(FrappeTestCase):
frappe.db.delete("ToDo", {"name": d.name})
# add 5 more assignments
for i in range(5):
for _ in range(5):
_make_test_record(public=1)
# check if each user still has 10 assignments

View file

@ -177,9 +177,6 @@ def symlink(target, link_name, overwrite=False):
if not overwrite:
return os.symlink(target, link_name)
# os.replace() may fail if files are on different filesystems
link_dir = os.path.dirname(link_name)
# Create link to target with temporary filename
while True:
temp_link_name = f"tmp{frappe.generate_hash()}"

View file

@ -1036,7 +1036,7 @@ def _drop_site(
f"Error: The operation has stopped because backup of {site}'s database failed.",
f"Reason: {str(err)}\n",
"Fix the issue and try again.",
"Hint: Use 'bench drop-site {0} --force' to force the removal of {0}".format(site),
f"Hint: Use 'bench drop-site {site} --force' to force the removal of {site}",
]
click.echo("\n".join(messages))
sys.exit(1)

View file

@ -38,9 +38,7 @@ def new_language(context, lang_code, app):
frappe.connect()
frappe.translate.write_translations_file(app, lang_code)
print(
"File created at ./apps/{app}/{app}/translations/{lang_code}.csv".format(app=app, lang_code=lang_code)
)
print(f"File created at ./apps/{app}/{app}/translations/{lang_code}.csv")
print("You will need to add the language in frappe/geo/languages.json, if you haven't done it already.")

View file

@ -556,7 +556,7 @@ def jupyter(context):
os.mkdir(jupyter_notebooks_path)
bin_path = os.path.abspath("../env/bin")
print(
"""
f"""
Starting Jupyter notebook
Run the following in your first cell to connect notebook to frappe
```
@ -566,7 +566,7 @@ frappe.connect()
frappe.local.lang = frappe.db.get_default('lang')
frappe.db.connect()
```
""".format(site=site, sites_path=sites_path)
"""
)
os.execv(
f"{bin_path}/jupyter",

View file

@ -125,11 +125,10 @@ def get_preferred_address(doctype, name, preferred_key="is_primary_address"):
FROM
`tabAddress` addr, `tabDynamic Link` dl
WHERE
dl.parent = addr.name and dl.link_doctype = %s and
dl.link_name = %s and ifnull(addr.disabled, 0) = 0 and
%s = %s
"""
% ("%s", "%s", preferred_key, "%s"),
dl.parent = addr.name and dl.link_doctype = {} and
dl.link_name = {} and ifnull(addr.disabled, 0) = 0 and
{} = {}
""".format("%s", "%s", preferred_key, "%s"),
(doctype, name, 1),
as_dict=1,
)

View file

@ -256,7 +256,7 @@ def contact_query(doctype, txt, searchfield, start, page_len, filters):
link_name = filters.pop("link_name")
return frappe.db.sql(
"""select
f"""select
`tabContact`.name, `tabContact`.full_name, `tabContact`.company_name
from
`tabContact`, `tabDynamic Link`
@ -265,12 +265,12 @@ def contact_query(doctype, txt, searchfield, start, page_len, filters):
`tabDynamic Link`.parenttype = 'Contact' and
`tabDynamic Link`.link_doctype = %(link_doctype)s and
`tabDynamic Link`.link_name = %(link_name)s and
`tabContact`.`{key}` like %(txt)s
{mcond}
`tabContact`.`{searchfield}` like %(txt)s
{get_match_cond(doctype)}
order by
if(locate(%(_txt)s, `tabContact`.full_name), locate(%(_txt)s, `tabContact`.company_name), 99999),
`tabContact`.idx desc, `tabContact`.full_name
limit %(start)s, %(page_len)s """.format(mcond=get_match_cond(doctype), key=searchfield),
limit %(start)s, %(page_len)s """,
{
"txt": "%" + txt + "%",
"_txt": txt.replace("%", ""),

View file

@ -52,7 +52,6 @@ def get_columns(filters):
def get_data(filters):
data = []
reference_doctype = filters.get("reference_doctype")
reference_name = filters.get("reference_name")

View file

@ -53,7 +53,7 @@ class ActivityLog(Document):
def set_ip_address(self):
if self.operation in ("Login", "Logout"):
self.ip_address = getattr(frappe.local, "request_ip")
self.ip_address = frappe.local.request_ip
@staticmethod
def clear_old_logs(days=None):

View file

@ -271,7 +271,7 @@ def export_json(doctype, path, filters=None, or_filters=None, name=None, order_b
for key in del_keys:
if key in doc:
del doc[key]
for k, v in doc.items():
for v in doc.values():
if isinstance(v, list):
for child in v:
for key in del_keys + ("docstatus", "doctype", "modified", "name"):

View file

@ -1,7 +1,6 @@
# Copyright (c) 2020, Frappe Technologies Pvt. Ltd. and Contributors
# License: MIT. See LICENSE
import io
import json
import os
import re
@ -525,7 +524,6 @@ class ImportFile:
# subsequent rows that have blank values in parent columns
# are considered as child rows
parent_column_indexes = self.header.get_column_indexes(self.doctype)
parent_row_values = first_row.get_values(parent_column_indexes)
data_without_first_row = data[1:]
for row in data_without_first_row:
@ -656,7 +654,7 @@ class Row:
for key in frappe.model.default_fields + frappe.model.child_table_fields + ("__islocal",):
doc.pop(key, None)
for col, value in zip(columns, values):
for col, value in zip(columns, values, strict=False):
df = col.df
if value in INVALID_VALUES:
value = None
@ -751,7 +749,7 @@ class Row:
def parse_value(self, value, col):
df = col.df
if isinstance(value, (datetime, date)) and df.fieldtype in ["Date", "Datetime"]:
if isinstance(value, datetime | date) and df.fieldtype in ["Date", "Datetime"]:
return value
value = cstr(value)
@ -774,7 +772,7 @@ class Row:
return value
def get_date(self, value, column):
if isinstance(value, (datetime, date)):
if isinstance(value, datetime | date):
return value
date_format = column.date_format
@ -938,7 +936,7 @@ class Column:
"""
def guess_date_format(d):
if isinstance(d, (datetime, date, time)):
if isinstance(d, datetime | date | time):
if self.df.fieldtype == "Date":
return "%Y-%m-%d"
if self.df.fieldtype == "Datetime":
@ -1137,7 +1135,6 @@ def build_fields_dict_for_column_matching(parent_doctype):
label = (df.label or "").strip()
translated_label = _(label)
parent = df.parent or parent_doctype
if parent_doctype == doctype:
# for parent doctypes keys will be

View file

@ -1363,11 +1363,9 @@ def validate_fields(meta):
if not d.get("__islocal") and frappe.db.has_column(d.parent, d.fieldname):
has_non_unique_values = frappe.db.sql(
"""select `{fieldname}`, count(*)
from `tab{doctype}` where ifnull(`{fieldname}`, '') != ''
group by `{fieldname}` having count(*) > 1 limit 1""".format(
doctype=d.parent, fieldname=d.fieldname
)
f"""select `{d.fieldname}`, count(*)
from `tab{d.parent}` where ifnull(`{d.fieldname}`, '') != ''
group by `{d.fieldname}` having count(*) > 1 limit 1"""
)
if has_non_unique_values and has_non_unique_values[0][0]:
@ -1541,7 +1539,7 @@ def validate_fields(meta):
field.options = "\n".join(options_list)
def scrub_fetch_from(field):
if hasattr(field, "fetch_from") and getattr(field, "fetch_from"):
if hasattr(field, "fetch_from") and field.fetch_from:
field.fetch_from = field.fetch_from.strip("\n").strip()
def validate_data_field_type(docfield):
@ -1826,7 +1824,7 @@ def make_module_and_roles(doc, perm_fieldname="permissions"):
r.desk_access = 1
r.flags.ignore_mandatory = r.flags.ignore_permissions = True
r.insert()
except frappe.DoesNotExistError as e:
except frappe.DoesNotExistError:
pass
except frappe.db.ProgrammingError as e:
if frappe.db.is_table_missing(e):

View file

@ -52,15 +52,21 @@ _THROW_EXC = """
frappe.exceptions.ValidationError: what
"""
TEST_EXCEPTIONS = {
"erpnext (app)": _RAW_EXC,
"erpnext (app)": _THROW_EXC,
}
TEST_EXCEPTIONS = (
(
"erpnext (app)",
_RAW_EXC,
),
(
"erpnext (app)",
_THROW_EXC,
),
)
class TestExceptionSourceGuessing(FrappeTestCase):
@patch.object(frappe, "get_installed_apps", return_value=["frappe", "erpnext", "3pa"])
def test_exc_source_guessing(self, _installed_apps):
for source, exc in TEST_EXCEPTIONS.items():
for source, exc in TEST_EXCEPTIONS:
result = guess_exception_source(exc)
self.assertEqual(result, source)

View file

@ -137,7 +137,7 @@ class File(Document):
if not self.attached_to_doctype:
return
if not self.attached_to_name or not isinstance(self.attached_to_name, (str, int)):
if not self.attached_to_name or not isinstance(self.attached_to_name, str | int):
frappe.throw(_("Attached To Name must be a string or an integer"), frappe.ValidationError)
if self.attached_to_field and SPECIAL_CHAR_PATTERN.search(self.attached_to_field):

View file

@ -88,14 +88,13 @@ class Page(Document):
if not os.path.exists(path + ".js"):
with open(path + ".js", "w") as f:
f.write(
"""frappe.pages['%s'].on_page_load = function(wrapper) {
var page = frappe.ui.make_app_page({
f"""frappe.pages['{self.name}'].on_page_load = function(wrapper) {{
var page = frappe.ui.make_app_page({{
parent: wrapper,
title: '%s',
title: '{self.title}',
single_column: true
});
}"""
% (self.name, self.title)
}});
}}"""
)
def as_dict(self, no_nulls=False):

View file

@ -301,7 +301,7 @@ class Report(Document):
if filters:
for key, value in filters.items():
condition, _value = "=", value
if isinstance(value, (list, tuple)):
if isinstance(value, list | tuple):
condition, _value = value
_filters.append([key, condition, _value])
@ -360,7 +360,7 @@ class Report(Document):
def build_data_dict(self, result, columns):
data = []
for row in result:
if isinstance(row, (list, tuple)):
if isinstance(row, list | tuple):
_row = frappe._dict()
for i, val in enumerate(row):
_row[columns[i].get("fieldname")] = val

View file

@ -131,7 +131,12 @@ class TestReport(FrappeTestCase):
self.assertListEqual(["email"], [column.get("fieldname") for column in columns])
admin_dict = frappe.core.utils.find(result, lambda d: d["name"] == "Administrator")
self.assertDictEqual(
{"name": "Administrator", "user_type": "System User", "email": "admin@example.com"}, admin_dict
{
"name": "Administrator",
"user_type": "System User",
"email": "admin@example.com",
},
admin_dict,
)
def test_report_with_custom_column(self):
@ -156,10 +161,18 @@ class TestReport(FrappeTestCase):
)
result = response.get("result")
columns = response.get("columns")
self.assertListEqual(["name", "email", "user_type"], [column.get("fieldname") for column in columns])
self.assertListEqual(
["name", "email", "user_type"],
[column.get("fieldname") for column in columns],
)
admin_dict = frappe.core.utils.find(result, lambda d: d["name"] == "Administrator")
self.assertDictEqual(
{"name": "Administrator", "user_type": "System User", "email": "admin@example.com"}, admin_dict
{
"name": "Administrator",
"user_type": "System User",
"email": "admin@example.com",
},
admin_dict,
)
def test_report_permissions(self):
@ -167,9 +180,7 @@ class TestReport(FrappeTestCase):
frappe.db.delete("Has Role", {"parent": frappe.session.user, "role": "Test Has Role"})
frappe.db.commit()
if not frappe.db.exists("Role", "Test Has Role"):
role = frappe.get_doc({"doctype": "Role", "role_name": "Test Has Role"}).insert(
ignore_permissions=True
)
frappe.get_doc({"doctype": "Role", "role_name": "Test Has Role"}).insert(ignore_permissions=True)
if not frappe.db.exists("Report", "Test Report"):
report = frappe.get_doc(
@ -253,18 +264,18 @@ class TestReport(FrappeTestCase):
report.report_script = """
totals = {}
for user in frappe.get_all('User', fields = ['name', 'user_type', 'creation']):
if not user.user_type in totals:
totals[user.user_type] = 0
totals[user.user_type] = totals[user.user_type] + 1
if not user.user_type in totals:
totals[user.user_type] = 0
totals[user.user_type] = totals[user.user_type] + 1
data = [
[
{'fieldname': 'type', 'label': 'Type'},
{'fieldname': 'value', 'label': 'Value'}
],
[
{"type":key, "value": value} for key, value in totals.items()
]
[
{'fieldname': 'type', 'label': 'Type'},
{'fieldname': 'value', 'label': 'Value'}
],
[
{"type":key, "value": value} for key, value in totals.items()
]
]
"""
report.save()
@ -299,13 +310,13 @@ data = [
report.report_script = """
totals = {}
for user in frappe.get_all('User', fields = ['name', 'user_type', 'creation']):
if not user.user_type in totals:
totals[user.user_type] = 0
totals[user.user_type] = totals[user.user_type] + 1
if not user.user_type in totals:
totals[user.user_type] = 0
totals[user.user_type] = totals[user.user_type] + 1
result = [
{"type":key, "value": value} for key, value in totals.items()
]
{"type":key, "value": value} for key, value in totals.items()
]
"""
report.save()
@ -344,15 +355,40 @@ result = [
report_settings = {"tree": True, "parent_field": "parent_value"}
columns = [
{"fieldname": "parent_column", "label": "Parent Column", "fieldtype": "Data", "width": 10},
{"fieldname": "column_1", "label": "Column 1", "fieldtype": "Float", "width": 10},
{"fieldname": "column_2", "label": "Column 2", "fieldtype": "Float", "width": 10},
{
"fieldname": "parent_column",
"label": "Parent Column",
"fieldtype": "Data",
"width": 10,
},
{
"fieldname": "column_1",
"label": "Column 1",
"fieldtype": "Float",
"width": 10,
},
{
"fieldname": "column_2",
"label": "Column 2",
"fieldtype": "Float",
"width": 10,
},
]
result = [
{"parent_column": "Parent 1", "column_1": 200, "column_2": 150.50},
{"parent_column": "Child 1", "column_1": 100, "column_2": 75.25, "parent_value": "Parent 1"},
{"parent_column": "Child 2", "column_1": 100, "column_2": 75.25, "parent_value": "Parent 1"},
{
"parent_column": "Child 1",
"column_1": 100,
"column_2": 75.25,
"parent_value": "Parent 1",
},
{
"parent_column": "Child 2",
"column_1": 100,
"column_2": 75.25,
"parent_value": "Parent 1",
},
]
result = add_total_row(
@ -369,13 +405,13 @@ result = [
def test_cte_in_query_report(self):
cte_query = textwrap.dedent(
"""
with enabled_users as (
select name
from `tabUser`
where enabled = 1
)
select * from enabled_users;
"""
with enabled_users as (
select name
from `tabUser`
where enabled = 1
)
select * from enabled_users;
"""
)
report = frappe.get_doc(

View file

@ -210,7 +210,7 @@ class ServerScript(Document):
if key.startswith("_"):
continue
value = obj[key]
if isinstance(value, (NamespaceDict, dict)) and value:
if isinstance(value, NamespaceDict | dict) and value:
if key == "form_dict":
out.append(["form_dict", 7])
continue
@ -222,7 +222,7 @@ class ServerScript(Document):
score = 0
elif isinstance(value, ModuleType):
score = 10
elif isinstance(value, (FunctionType, MethodType)):
elif isinstance(value, FunctionType | MethodType):
score = 9
elif isinstance(value, type):
score = 8

View file

@ -218,7 +218,7 @@ frappe.qb.from_(todo).select(todo.name).where(todo.name == "{todo.name}").run()
name="test_nested_scripts_1",
script_type="API",
api_method="test_nested_scripts_1",
script=f"""log("nothing")""",
script="""log("nothing")""",
)
script.insert()
script.execute_method()
@ -228,7 +228,7 @@ frappe.qb.from_(todo).select(todo.name).where(todo.name == "{todo.name}").run()
name="test_nested_scripts_2",
script_type="API",
api_method="test_nested_scripts_2",
script=f"""frappe.call("test_nested_scripts_1")""",
script="""frappe.call("test_nested_scripts_1")""",
)
script.insert()
script.execute_method()

View file

@ -574,10 +574,9 @@ class User(Document):
has_fields = [d.get("name") for d in desc if d.get("name") in ["owner", "modified_by"]]
for field in has_fields:
frappe.db.sql(
"""UPDATE `%s`
SET `%s` = %s
WHERE `%s` = %s"""
% (tab, field, "%s", field, "%s"),
"""UPDATE `{}`
SET `{}` = {}
WHERE `{}` = {}""".format(tab, field, "%s", field, "%s"),
(new_name, old_name),
)
@ -620,7 +619,7 @@ class User(Document):
def ensure_unique_roles(self):
exists = []
for i, d in enumerate(self.get("roles")):
for d in self.get("roles"):
if (not d.role) or (d.role in exists):
self.get("roles").remove(d)
else:

View file

@ -36,11 +36,7 @@ def get_columns_and_fields(doctype):
if df.in_list_view and df.fieldtype in data_fieldtypes:
fields.append(f"`{df.fieldname}`")
fieldtype = f"Link/{df.options}" if df.fieldtype == "Link" else df.fieldtype
columns.append(
"{label}:{fieldtype}:{width}".format(
label=df.label, fieldtype=fieldtype, width=df.width or 100
)
)
columns.append(f"{df.label}:{fieldtype}:{df.width or 100}")
return columns, fields

View file

@ -592,11 +592,11 @@ class CustomizeForm(Document):
max_length = cint(frappe.db.type_map.get(df.fieldtype)[1])
fieldname = df.fieldname
docs = frappe.db.sql(
"""
f"""
SELECT name, {fieldname}, LENGTH({fieldname}) AS len
FROM `tab{doctype}`
FROM `tab{self.doc_type}`
WHERE LENGTH({fieldname}) > {max_length}
""".format(fieldname=fieldname, doctype=self.doc_type, max_length=max_length),
""",
as_dict=True,
)
label = df.label

View file

@ -35,7 +35,7 @@ def get_data():
v = delist(v)
if isinstance(v, (dict, list)):
if isinstance(v, dict | list):
try:
return frappe.as_json(v)
except Exception:

View file

@ -103,8 +103,8 @@ class Database:
def connect(self):
"""Connects to a database as set in `site_config.json`."""
self._conn: Union["MariadbConnection", "PostgresConnection"] = self.get_connection()
self._cursor: Union["MariadbCursor", "PostgresCursor"] = self._conn.cursor()
self._conn: "MariadbConnection" | "PostgresConnection" = self.get_connection()
self._cursor: "MariadbCursor" | "PostgresCursor" = self._conn.cursor()
try:
if execution_timeout := get_query_execution_timeout():
@ -183,7 +183,7 @@ class Database:
{"name": "a%", "owner":"test@example.com"})
"""
if isinstance(query, (MySQLQueryBuilder, PostgreSQLQueryBuilder)):
if isinstance(query, MySQLQueryBuilder | PostgreSQLQueryBuilder):
frappe.errprint("Use run method to execute SQL queries generated by Query Engine")
debug = debug or getattr(self, "debug", False)
@ -212,7 +212,7 @@ class Database:
if values == EmptyQueryValues:
values = None
elif not isinstance(values, (tuple, dict, list)):
elif not isinstance(values, tuple | dict | list):
values = (values,)
query, values = self._transform_query(query, values)
@ -302,7 +302,7 @@ class Database:
elif as_dict:
keys = [column[0] for column in self._cursor.description]
for row in result:
row = frappe._dict(zip(keys, row))
row = frappe._dict(zip(keys, row, strict=False))
if update:
row.update(update)
yield row
@ -371,7 +371,7 @@ class Database:
return query % {
k: frappe.db.escape(v) if isinstance(v, str) else v for k, v in values.items()
}
elif isinstance(values, (list, tuple)):
elif isinstance(values, list | tuple):
return query % tuple(frappe.db.escape(v) if isinstance(v, str) else v for v in values)
return query, values
@ -438,7 +438,7 @@ class Database:
if result:
keys = [column[0] for column in self._cursor.description]
return [frappe._dict(zip(keys, row)) for row in result]
return [frappe._dict(zip(keys, row, strict=False)) for row in result]
@staticmethod
def clear_db_table_cache(query):

View file

@ -291,18 +291,18 @@ class MariaDBDatabase(MariaDBConnectionUtil, MariaDBExceptionUtil, Database):
def create_global_search_table(self):
if "__global_search" not in self.get_tables():
self.sql(
"""create table __global_search(
f"""create table __global_search(
doctype varchar(100),
name varchar({0}),
title varchar({0}),
name varchar({self.VARCHAR_LEN}),
title varchar({self.VARCHAR_LEN}),
content text,
fulltext(content),
route varchar({0}),
route varchar({self.VARCHAR_LEN}),
published int(1) not null default 0,
unique `doctype_name` (doctype, name))
COLLATE=utf8mb4_unicode_ci
ENGINE=MyISAM
CHARACTER SET=utf8mb4""".format(self.VARCHAR_LEN)
CHARACTER SET=utf8mb4"""
)
def create_user_settings_table(self):
@ -322,7 +322,7 @@ class MariaDBDatabase(MariaDBConnectionUtil, MariaDBExceptionUtil, Database):
def get_table_columns_description(self, table_name):
"""Return list of columns with descriptions."""
return self.sql(
"""select
f"""select
column_name as 'name',
column_type as 'type',
column_default as 'default',
@ -338,7 +338,7 @@ class MariaDBDatabase(MariaDBConnectionUtil, MariaDBExceptionUtil, Database):
column_key = 'UNI' as 'unique',
(is_nullable = 'NO') AS 'not_nullable'
from information_schema.columns as columns
where table_name = '{table_name}' """.format(table_name=table_name),
where table_name = '{table_name}' """,
as_dict=1,
)
@ -359,8 +359,8 @@ class MariaDBDatabase(MariaDBConnectionUtil, MariaDBExceptionUtil, Database):
def has_index(self, table_name, index_name):
return self.sql(
"""SHOW INDEX FROM `{table_name}`
WHERE Key_name='{index_name}'""".format(table_name=table_name, index_name=index_name)
f"""SHOW INDEX FROM `{table_name}`
WHERE Key_name='{index_name}'"""
)
def get_column_index(self, table_name: str, fieldname: str, unique: bool = False) -> frappe._dict | None:
@ -401,9 +401,8 @@ class MariaDBDatabase(MariaDBConnectionUtil, MariaDBExceptionUtil, Database):
if not self.has_index(table_name, index_name):
self.commit()
self.sql(
"""ALTER TABLE `%s`
ADD INDEX `%s`(%s)"""
% (table_name, index_name, ", ".join(fields))
"""ALTER TABLE `{}`
ADD INDEX `{}`({})""".format(table_name, index_name, ", ".join(fields))
)
def add_unique(self, doctype, fields, constraint_name=None):
@ -419,9 +418,8 @@ class MariaDBDatabase(MariaDBConnectionUtil, MariaDBExceptionUtil, Database):
):
self.commit()
self.sql(
"""alter table `tab%s`
add unique `%s`(%s)"""
% (doctype, constraint_name, ", ".join(fields))
"""alter table `tab{}`
add unique `{}`({})""".format(doctype, constraint_name, ", ".join(fields))
)
def updatedb(self, doctype, meta=None):

View file

@ -113,10 +113,7 @@ def check_database_settings():
result = True
for key, expected_value in REQUIRED_MARIADB_CONFIG.items():
if mariadb_variables.get(key) != expected_value:
print(
"For key %s. Expected value %s, found value %s"
% (key, expected_value, mariadb_variables.get(key))
)
print(f"For key {key}. Expected value {expected_value}, found value {mariadb_variables.get(key)}")
result = False
if not result:

View file

@ -288,14 +288,14 @@ class PostgresDatabase(PostgresExceptionUtil, Database):
def create_global_search_table(self):
if "__global_search" not in self.get_tables():
self.sql(
"""create table "__global_search"(
f"""create table "__global_search"(
doctype varchar(100),
name varchar({0}),
title varchar({0}),
name varchar({self.VARCHAR_LEN}),
title varchar({self.VARCHAR_LEN}),
content text,
route varchar({0}),
route varchar({self.VARCHAR_LEN}),
published int not null default 0,
unique (doctype, name))""".format(self.VARCHAR_LEN)
unique (doctype, name))"""
)
def create_user_settings_table(self):
@ -338,8 +338,8 @@ class PostgresDatabase(PostgresExceptionUtil, Database):
def has_index(self, table_name, index_name):
return self.sql(
"""SELECT 1 FROM pg_indexes WHERE tablename='{table_name}'
and indexname='{index_name}' limit 1""".format(table_name=table_name, index_name=index_name)
f"""SELECT 1 FROM pg_indexes WHERE tablename='{table_name}'
and indexname='{index_name}' limit 1"""
)
def add_index(self, doctype: str, fields: list, index_name: str = None):
@ -368,16 +368,15 @@ class PostgresDatabase(PostgresExceptionUtil, Database):
):
self.commit()
self.sql(
"""ALTER TABLE `tab%s`
ADD CONSTRAINT %s UNIQUE (%s)"""
% (doctype, constraint_name, ", ".join(fields))
"""ALTER TABLE `tab{}`
ADD CONSTRAINT {} UNIQUE ({})""".format(doctype, constraint_name, ", ".join(fields))
)
def get_table_columns_description(self, table_name):
"""Return list of columns with description."""
# pylint: disable=W1401
return self.sql(
"""
f"""
SELECT a.column_name AS name,
CASE LOWER(a.data_type)
WHEN 'character varying' THEN CONCAT('varchar(', a.character_maximum_length ,')')
@ -398,7 +397,7 @@ class PostgresDatabase(PostgresExceptionUtil, Database):
ON SUBSTRING(b.indexdef, '(.*)') LIKE CONCAT('%', a.column_name, '%')
WHERE a.table_name = '{table_name}'
GROUP BY a.column_name, a.data_type, a.column_default, a.character_maximum_length, a.is_nullable;
""".format(table_name=table_name),
""",
as_dict=1,
)
@ -441,7 +440,7 @@ def modify_query(query):
def modify_values(values):
def modify_value(value):
if isinstance(value, (list, tuple)):
if isinstance(value, list | tuple):
value = tuple(modify_values(value))
elif isinstance(value, int):
@ -455,7 +454,7 @@ def modify_values(values):
if isinstance(values, dict):
for k, v in values.items():
values[k] = modify_value(v)
elif isinstance(values, (tuple, list)):
elif isinstance(values, tuple | list):
new_values = []
for val in values:
new_values.append(modify_value(val))

View file

@ -54,16 +54,14 @@ class PostgresTable(DBTable):
def create_indexes(self):
create_index_query = ""
for key, col in self.columns.items():
for col in self.columns.values():
if (
col.set_index
and col.fieldtype in frappe.db.type_map
and frappe.db.type_map.get(col.fieldtype)[0] not in ("text", "longtext")
):
create_index_query += (
'CREATE INDEX IF NOT EXISTS "{index_name}" ON `{table_name}`(`{field}`);'.format(
index_name=col.fieldname, table_name=self.table_name, field=col.fieldname
)
f'CREATE INDEX IF NOT EXISTS "{col.fieldname}" ON `{self.table_name}`(`{col.fieldname}`);'
)
if create_index_query:
# nosemgrep
@ -115,9 +113,7 @@ class PostgresTable(DBTable):
for col in self.add_index:
# if index key not exists
create_contraint_query += (
'CREATE INDEX IF NOT EXISTS "{index_name}" ON `{table_name}`(`{field}`);'.format(
index_name=col.fieldname, table_name=self.table_name, field=col.fieldname
)
f'CREATE INDEX IF NOT EXISTS "{col.fieldname}" ON `{self.table_name}`(`{col.fieldname}`);'
)
for col in self.add_unique:

View file

@ -99,7 +99,7 @@ class Engine:
# add fields
self.fields = self.parse_fields(fields)
if not self.fields:
self.fields = [getattr(self.table, "name")]
self.fields = [self.table.name]
self.query._child_queries = []
for field in self.fields:
@ -117,7 +117,7 @@ class Engine:
if filters is None:
return
if isinstance(filters, (str, int)):
if isinstance(filters, str | int):
filters = {"name": str(filters)}
if isinstance(filters, Criterion):
@ -126,14 +126,14 @@ class Engine:
elif isinstance(filters, dict):
self.apply_dict_filters(filters)
elif isinstance(filters, (list, tuple)):
if all(isinstance(d, (str, int)) for d in filters) and len(filters) > 0:
elif isinstance(filters, list | tuple):
if all(isinstance(d, str | int) for d in filters) and len(filters) > 0:
self.apply_dict_filters({"name": ("in", filters)})
else:
for filter in filters:
if isinstance(filter, (str, int, Criterion, dict)):
if isinstance(filter, str | int | Criterion | dict):
self.apply_filters(filter)
elif isinstance(filter, (list, tuple)):
elif isinstance(filter, list | tuple):
self.apply_list_filters(filter)
def apply_list_filters(self, filter: list):
@ -150,7 +150,7 @@ class Engine:
def apply_dict_filters(self, filters: dict[str, str | int | list]):
for field, value in filters.items():
operator = "="
if isinstance(value, (list, tuple)):
if isinstance(value, list | tuple):
operator, value = value
self._apply_filter(field, value, operator)
@ -187,7 +187,7 @@ class Engine:
if isinstance(_value, bool):
_value = int(_value)
elif not _value and isinstance(_value, (list, tuple)):
elif not _value and isinstance(_value, list | tuple):
_value = ("",)
# Nested set
@ -279,7 +279,7 @@ class Engine:
return MARIADB_SPECIFIC_COMMENT.sub("", stripped_field)
return stripped_field
if isinstance(fields, (list, tuple)):
if isinstance(fields, list | tuple):
return [_sanitize_field(field) for field in fields]
elif isinstance(fields, str):
return _sanitize_field(fields)
@ -304,10 +304,10 @@ class Engine:
if not fields:
return []
fields = self.sanitize_fields(fields)
if isinstance(fields, (list, tuple, set)) and None in fields and Field not in fields:
if isinstance(fields, list | tuple | set) and None in fields and Field not in fields:
return []
if not isinstance(fields, (list, tuple)):
if not isinstance(fields, list | tuple):
fields = [fields]
def parse_field(field: str):

View file

@ -144,9 +144,7 @@ class DBTable:
try:
# check for truncation
max_length = frappe.db.sql(
"""SELECT MAX(CHAR_LENGTH(`{fieldname}`)) FROM `tab{doctype}`""".format(
fieldname=col.fieldname, doctype=self.doctype
)
f"""SELECT MAX(CHAR_LENGTH(`{col.fieldname}`)) FROM `tab{self.doctype}`"""
)
except frappe.db.InternalError as e:

View file

@ -21,7 +21,7 @@ def get_user_default(key, user=None):
d = user_defaults.get(key, None)
if is_a_user_permission_key(key):
if d and isinstance(d, (list, tuple)) and len(d) == 1:
if d and isinstance(d, list | tuple) and len(d) == 1:
# Use User Permission value when only when it has a single value
d = d[0]
else:
@ -31,7 +31,7 @@ def get_user_default(key, user=None):
# If no default value is found, use the User Permission value
d = user_permission_default
value = isinstance(d, (list, tuple)) and d[0] or d
value = isinstance(d, list | tuple) and d[0] or d
if not_in_user_permission(key, value, user):
return
@ -61,14 +61,14 @@ def get_user_default_as_list(key, user=None):
d = user_defaults.get(key, None)
if is_a_user_permission_key(key):
if d and isinstance(d, (list, tuple)) and len(d) == 1:
if d and isinstance(d, list | tuple) and len(d) == 1:
# Use User Permission value when only when it has a single value
d = [d[0]]
else:
d = user_defaults.get(frappe.scrub(key), None)
d = list(filter(None, (not isinstance(d, (list, tuple))) and [d] or d))
d = list(filter(None, (not isinstance(d, list | tuple)) and [d] or d))
# filter default values if not found in user permission
return [value for value in d if not not_in_user_permission(key, value)]
@ -135,7 +135,7 @@ def add_global_default(key, value):
def get_global_default(key):
d = get_defaults().get(key, None)
value = isinstance(d, (list, tuple)) and d[0] or d
value = isinstance(d, list | tuple) and d[0] or d
if not_in_user_permission(key, value):
return

View file

@ -13,7 +13,7 @@ queue_prefix = "insert_queue_for_"
def deferred_insert(doctype: str, records: list[Union[dict, "Document"]] | str):
if isinstance(records, (dict, list)):
if isinstance(records, dict | list):
_records = json.dumps(records)
else:
_records = records

View file

@ -559,11 +559,11 @@ def save_new_widget(doc, page, blocks, new_widgets):
json_config = widgets and dumps(widgets, sort_keys=True, indent=4)
# Error log body
log = """
page: {}
config: {}
exception: {}
""".format(page, json_config, e)
log = f"""
page: {page}
config: {json_config}
exception: {e}
"""
doc.log_error("Could not save customization", log)
raise

View file

@ -124,9 +124,7 @@ def get_non_standard_warning_message(non_standard_docs_map):
def get_html(docs, doctype):
html = f"<p>{frappe.bold(doctype)}</p>"
for doc in docs:
html += '<div><a href="/app/Form/{doctype}/{doc}">{doc}</a></div>'.format(
doctype=doctype, doc=doc
)
html += f'<div><a href="/app/Form/{doctype}/{doc}">{doc}</a></div>'
html += "<br>"
return html

View file

@ -55,7 +55,7 @@ def get_permission_query_conditions(user):
module_condition = """`tabDashboard Chart`.`module` in ({allowed_modules})
or `tabDashboard Chart`.`module` is NULL""".format(allowed_modules=",".join(allowed_modules))
return """
return f"""
((`tabDashboard Chart`.`chart_type` in ('Count', 'Sum', 'Average')
and {doctype_condition})
or
@ -63,11 +63,7 @@ def get_permission_query_conditions(user):
and {report_condition}))
and
({module_condition})
""".format(
doctype_condition=doctype_condition,
report_condition=report_condition,
module_condition=module_condition,
)
"""
def has_permission(doc, ptype, user):
@ -248,9 +244,7 @@ def get_heatmap_chart_config(chart, filters, heatmap_year):
doctype,
fields=[
timestamp_field,
"{aggregate_function}({value_field})".format(
aggregate_function=aggregate_function, value_field=value_field
),
f"{aggregate_function}({value_field})",
],
filters=filters,
group_by=f"date({datefield})",
@ -307,7 +301,7 @@ def get_result(data, timegrain, from_date, to_date, chart_type):
result = [[date, 0] for date in dates]
data_index = 0
if data:
for i, d in enumerate(result):
for d in result:
count = 0
while data_index < len(data) and getdate(data[data_index][0]) <= d[0]:
d[1] += data[data_index][1]

View file

@ -221,9 +221,7 @@ def delete_communication(event, reference_doctype, reference_docname):
def get_permission_query_conditions(user):
if not user:
user = frappe.session.user
return """(`tabEvent`.`event_type`='Public' or `tabEvent`.`owner`={user})""".format(
user=frappe.db.escape(user),
)
return f"""(`tabEvent`.`event_type`='Public' or `tabEvent`.`owner`={frappe.db.escape(user)})"""
def has_permission(doc, user):

View file

@ -52,9 +52,7 @@ def get_permission_query_conditions(user):
if user == "Administrator":
return ""
return """(`tabKanban Board`.private=0 or `tabKanban Board`.owner={user})""".format(
user=frappe.db.escape(user)
)
return f"""(`tabKanban Board`.private=0 or `tabKanban Board`.owner={frappe.db.escape(user)})"""
def has_permission(doc, ptype, user):

View file

@ -101,11 +101,11 @@ def get_permission_query_conditions(user=None):
module_condition = """`tabNumber Card`.`module` in ({allowed_modules})
or `tabNumber Card`.`module` is NULL""".format(allowed_modules=",".join(allowed_modules))
return """
return f"""
{doctype_condition}
and
{module_condition}
""".format(doctype_condition=doctype_condition, module_condition=module_condition)
"""
def has_permission(doc, ptype, user):
@ -141,11 +141,7 @@ def get_result(doc, filters, to_date=None):
if function == "count":
fields = [f"{function}(*) as result"]
else:
fields = [
"{function}({based_on}) as result".format(
function=function, based_on=doc.aggregate_function_based_on
)
]
fields = [f"{function}({doc.aggregate_function_based_on}) as result"]
if not filters:
filters = []

View file

@ -214,7 +214,7 @@ def get_references_across_doctypes(
for k, v in references_by_dlink_fields.items():
references.setdefault(k, []).extend(v)
for doctype, links in references.items():
for links in references.values():
for link in links:
link["is_child"] = link["doctype"] in all_child_tables
return references

View file

@ -286,9 +286,9 @@ def get_communication_data(
conditions = ""
if after:
# find after a particular date
conditions += """
AND C.creation > {}
""".format(after)
conditions += f"""
AND C.creation > {after}
"""
if doctype == "User":
conditions += """
@ -296,23 +296,23 @@ def get_communication_data(
"""
# communications linked to reference_doctype
part1 = """
part1 = f"""
SELECT {fields}
FROM `tabCommunication` as C
WHERE C.communication_type IN ('Communication', 'Feedback', 'Automated Message')
AND (C.reference_doctype = %(doctype)s AND C.reference_name = %(name)s)
{conditions}
""".format(fields=fields, conditions=conditions)
"""
# communications linked in Timeline Links
part2 = """
part2 = f"""
SELECT {fields}
FROM `tabCommunication` as C
INNER JOIN `tabCommunication Link` ON C.name=`tabCommunication Link`.parent
WHERE C.communication_type IN ('Communication', 'Feedback', 'Automated Message')
AND `tabCommunication Link`.link_doctype = %(doctype)s AND `tabCommunication Link`.link_name = %(name)s
{conditions}
""".format(fields=fields, conditions=conditions)
"""
return frappe.db.sql(
"""

View file

@ -105,6 +105,4 @@ def get_next(doctype, value, prev, filters=None, sort_order="desc", sort_field="
def get_pdf_link(doctype, docname, print_format="Standard", no_letterhead=0):
return "/api/method/frappe.utils.print_format.download_pdf?doctype={doctype}&name={docname}&format={print_format}&no_letterhead={no_letterhead}".format(
doctype=doctype, docname=docname, print_format=print_format, no_letterhead=no_letterhead
)
return f"/api/method/frappe.utils.print_format.download_pdf?doctype={doctype}&name={docname}&format={print_format}&no_letterhead={no_letterhead}"

View file

@ -45,8 +45,6 @@ def get_energy_point_leaderboard(date_range, company=None, field=None, limit=Non
for user in energy_point_users:
user_id = user["name"]
user["name"] = get_fullname(user["name"])
user["formatted_name"] = '<a href="/app/user-profile/{}">{}</a>'.format(
user_id, get_fullname(user_id)
)
user["formatted_name"] = f'<a href="/app/user-profile/{user_id}">{get_fullname(user_id)}</a>'
return energy_point_users

View file

@ -283,7 +283,7 @@ def get_open_count(doctype, name, items=None):
try:
external_links_data_for_d = get_external_links(d, name, links)
out["external_links_found"].append(external_links_data_for_d)
except Exception as e:
except Exception:
out["external_links_found"].append({"doctype": d, "open_count": 0, "count": 0})
else:
external_links_data_for_d = get_external_links(d, name, links)

View file

@ -124,7 +124,7 @@ def normalize_result(result, columns):
# Convert to list of dicts from list of lists/tuples
data = []
column_names = [column["fieldname"] for column in columns]
if result and isinstance(result[0], (list, tuple)):
if result and isinstance(result[0], list | tuple):
for row in result:
row_obj = {}
for idx, column_name in enumerate(column_names):
@ -507,7 +507,7 @@ def get_data_for_custom_field(doctype, field, names=None):
filters = {}
if names:
if isinstance(names, (str, bytearray)):
if isinstance(names, str | bytearray):
names = frappe.json.loads(names)
filters.update({"name": ["in", names]})
@ -659,7 +659,7 @@ def has_match(
cell_value = None
if isinstance(row, dict):
cell_value = row.get(idx)
elif isinstance(row, (list, tuple)):
elif isinstance(row, list | tuple):
cell_value = row[idx]
if (
@ -691,10 +691,10 @@ def get_linked_doctypes(columns, data):
columns_dict = get_columns_dict(columns)
for idx, col in enumerate(columns):
for idx in range(len(columns)):
df = columns_dict[idx]
if df.get("fieldtype") == "Link":
if data and isinstance(data[0], (list, tuple)):
if data and isinstance(data[0], list | tuple):
linked_doctypes[df["options"]] = idx
else:
# dict
@ -705,7 +705,7 @@ def get_linked_doctypes(columns, data):
for row in data:
if row:
if len(row) != len(columns_with_value):
if isinstance(row, (list, tuple)):
if isinstance(row, list | tuple):
row = enumerate(row)
elif isinstance(row, dict):
row = row.items()

View file

@ -199,7 +199,7 @@ def get_meta_and_docfield(fieldname, data):
def update_wildcard_field_param(data):
if (isinstance(data.fields, str) and data.fields == "*") or (
isinstance(data.fields, (list, tuple)) and len(data.fields) == 1 and data.fields[0] == "*"
isinstance(data.fields, list | tuple) and len(data.fields) == 1 and data.fields[0] == "*"
):
data.fields = get_permitted_fields(data.doctype, parenttype=data.parenttype)
return True
@ -390,10 +390,10 @@ def append_totals_row(data):
for row in data:
for i in range(len(row)):
if isinstance(row[i], (float, int)):
if isinstance(row[i], float | int):
totals[i] = (totals[i] or 0) + row[i]
if not isinstance(totals[0], (int, float)):
if not isinstance(totals[0], int | float):
totals[0] = "Total"
data.append(totals)
@ -568,7 +568,7 @@ def get_stats(stats, doctype, filters=None):
except frappe.db.SQLError:
pass
except frappe.db.InternalError as e:
except frappe.db.InternalError:
# raised when _user_tags column is added on the fly
pass
@ -673,7 +673,7 @@ def get_filters_cond(doctype, filters, conditions, ignore_permissions=None, with
for f in filters:
if isinstance(f[1], str) and f[1][0] == "!":
flt.append([doctype, f[0], "!=", f[1][1:]])
elif isinstance(f[1], (list, tuple)) and f[1][0].lower() in (
elif isinstance(f[1], list | tuple) and f[1][0].lower() in (
"=",
">",
"<",

View file

@ -321,7 +321,7 @@ def send_daily():
continue
try:
auto_email_report.send()
except Exception as e:
except Exception:
auto_email_report.log_error(f"Failed to send {auto_email_report.name} Auto Email Report")

View file

@ -4,7 +4,6 @@
import email.utils
import functools
import imaplib
import socket
import time
from datetime import datetime, timedelta
from poplib import error_proto
@ -96,7 +95,15 @@ class EmailAccount(Document):
password: DF.Password | None
send_notification_to: DF.SmallText | None
send_unsubscribe_message: DF.Check
service: DF.Literal["", "GMail", "Sendgrid", "SparkPost", "Yahoo Mail", "Outlook.com", "Yandex.Mail"]
service: DF.Literal[
"",
"GMail",
"Sendgrid",
"SparkPost",
"Yahoo Mail",
"Outlook.com",
"Yandex.Mail",
]
signature: DF.TextEditor | None
smtp_port: DF.Data | None
smtp_server: DF.Data | None
@ -191,20 +198,27 @@ class EmailAccount(Document):
self.default_incoming = False
messages.append(
_("{} has been disabled. It can only be enabled if {} is checked.").format(
frappe.bold(_("Default Incoming")), frappe.bold(_("Enable Incoming"))
frappe.bold(_("Default Incoming")),
frappe.bold(_("Enable Incoming")),
)
)
if not self.enable_outgoing and self.default_outgoing:
self.default_outgoing = False
messages.append(
_("{} has been disabled. It can only be enabled if {} is checked.").format(
frappe.bold(_("Default Outgoing")), frappe.bold(_("Enable Outgoing"))
frappe.bold(_("Default Outgoing")),
frappe.bold(_("Enable Outgoing")),
)
)
if messages:
if len(messages) == 1:
(as_list, messages) = (0, messages[0])
frappe.msgprint(messages, as_list=as_list, indicator="orange", title=_("Defaults Updated"))
frappe.msgprint(
messages,
as_list=as_list,
indicator="orange",
title=_("Defaults Updated"),
)
def on_update(self):
"""Check there is only one default of each type."""
@ -284,7 +298,11 @@ class EmailAccount(Document):
"loginfailed",
]
other_error_codes = ["err[auth]", "errtemporaryerror", "loginviayourwebbrowser"]
other_error_codes = [
"err[auth]",
"errtemporaryerror",
"loginviayourwebbrowser",
]
all_error_codes = auth_error_codes + other_error_codes
@ -563,7 +581,15 @@ class EmailAccount(Document):
seen_status = messages.get("seen_status", {}).get(uid)
if self.email_sync_option != "UNSEEN" or seen_status != "SEEN":
# only append the emails with status != 'SEEN' if sync option is set to 'UNSEEN'
mails.append(InboundMail(message, self, frappe.safe_decode(uid), seen_status, append_to))
mails.append(
InboundMail(
message,
self,
frappe.safe_decode(uid),
seen_status,
append_to,
)
)
if not self.enable_incoming:
return []
@ -618,7 +644,9 @@ class EmailAccount(Document):
def send_auto_reply(self, communication, email):
"""Send auto reply if set."""
from frappe.core.doctype.communication.email import set_incoming_outgoing_accounts
from frappe.core.doctype.communication.email import (
set_incoming_outgoing_accounts,
)
if self.enable_auto_reply:
set_incoming_outgoing_accounts(communication)
@ -671,7 +699,10 @@ class EmailAccount(Document):
if not self.enable_incoming:
frappe.throw(_("Automatic Linking can be activated only if Incoming is enabled."))
if frappe.db.exists("Email Account", {"enable_automatic_linking": 1, "name": ("!=", self.name)}):
if frappe.db.exists(
"Email Account",
{"enable_automatic_linking": 1, "name": ("!=", self.name)},
):
frappe.throw(_("Automatic Linking can be activated only for one Email Account."))
def append_email_to_sent_folder(self, message):
@ -717,7 +748,9 @@ def notify_unreplied():
"""Sends email notifications if there are unreplied Communications
and `notify_if_unreplied` is set as true."""
for email_account in frappe.get_all(
"Email Account", "name", filters={"enable_incoming": 1, "notify_if_unreplied": 1}
"Email Account",
"name",
filters={"enable_incoming": 1, "notify_if_unreplied": 1},
):
email_account = frappe.get_doc("Email Account", email_account.name)
@ -774,7 +807,12 @@ def pull(now=False):
doctype = frappe.qb.DocType("Email Account")
email_accounts = (
frappe.qb.from_(doctype)
.select(doctype.name, doctype.auth_method, doctype.connected_app, doctype.connected_user)
.select(
doctype.name,
doctype.auth_method,
doctype.connected_app,
doctype.connected_user,
)
.where(doctype.enable_incoming == 1)
.where(doctype.awaiting_password == 0)
.run(as_dict=1)
@ -820,10 +858,9 @@ def pull_from_email_account(email_account):
def get_max_email_uid(email_account):
# get maximum uid of emails
max_uid = 1
"""get maximum uid of emails"""
result = frappe.get_all(
if result := frappe.get_all(
"Communication",
filters={
"communication_medium": "Email",
@ -831,12 +868,9 @@ def get_max_email_uid(email_account):
"email_account": email_account,
},
fields=["max(uid) as uid"],
)
if not result:
return 1
else:
):
return cint(result[0].get("uid", 0)) + 1
return 1
def setup_user_email_inbox(email_account, awaiting_password, email_id, enable_outgoing, used_oauth):
@ -868,7 +902,11 @@ def setup_user_email_inbox(email_account, awaiting_password, email_id, enable_ou
# check if inbox is alreay configured
user_inbox = (
frappe.db.get_value("User Email", {"email_account": email_account, "parent": user_name}, ["name"])
frappe.db.get_value(
"User Email",
{"email_account": email_account, "parent": user_name},
["name"],
)
or None
)
@ -895,7 +933,11 @@ def remove_user_email_inbox(email_account):
if not email_account:
return
users = frappe.get_all("User Email", filters={"email_account": email_account}, fields=["parent as name"])
users = frappe.get_all(
"User Email",
filters={"email_account": email_account},
fields=["parent as name"],
)
for user in users:
doc = frappe.get_doc("User", user.get("name"))

View file

@ -413,10 +413,13 @@ class TestEmailAccount(FrappeTestCase):
@patch("frappe.email.receive.EmailServer.select_imap_folder", return_value=True)
@patch("frappe.email.receive.EmailServer.logout", side_effect=lambda: None)
def mocked_get_inbound_mails(
email_account, messages={}, mocked_logout=None, mocked_select_imap_folder=None
email_account, messages=None, mocked_logout=None, mocked_select_imap_folder=None
):
from frappe.email.receive import EmailServer
if messages is None:
messages = {}
def get_mocked_messages(**kwargs):
return messages.get(kwargs["folder"], {})
@ -427,7 +430,12 @@ class TestEmailAccount(FrappeTestCase):
@patch("frappe.email.receive.EmailServer.select_imap_folder", return_value=True)
@patch("frappe.email.receive.EmailServer.logout", side_effect=lambda: None)
def mocked_email_receive(email_account, messages={}, mocked_logout=None, mocked_select_imap_folder=None):
def mocked_email_receive(
email_account, messages=None, mocked_logout=None, mocked_select_imap_folder=None
):
if messages is None:
messages = {}
def get_mocked_messages(**kwargs):
return messages.get(kwargs["folder"], {})

View file

@ -105,7 +105,7 @@ def import_from(name, doctype):
@frappe.whitelist()
def add_subscribers(name, email_list):
if not isinstance(email_list, (list, tuple)):
if not isinstance(email_list, list | tuple):
email_list = email_list.replace(",", "\n").split("\n")
template = frappe.db.get_value("Email Group", name, "welcome_email_template")

View file

@ -356,7 +356,7 @@ class FrappeClient:
def preprocess(self, params):
"""convert dicts, lists to json"""
for key, value in params.items():
if isinstance(value, (dict, list)):
if isinstance(value, dict | list):
params[key] = json.dumps(value)
return params

View file

@ -14,7 +14,7 @@ def extract(fileobj, *args, **kwargs):
module = get_module(fileobj.name)
if hasattr(module, "standard_navbar_items"):
standard_navbar_items = getattr(module, "standard_navbar_items")
standard_navbar_items = module.standard_navbar_items
for nav_item in standard_navbar_items:
if label := nav_item.get("item_label"):
item_type = nav_item.get("item_type")
@ -29,7 +29,7 @@ def extract(fileobj, *args, **kwargs):
)
if hasattr(module, "standard_help_items"):
standard_help_items = getattr(module, "standard_help_items")
standard_help_items = module.standard_help_items
for help_item in standard_help_items:
if label := help_item.get("item_label"):
item_type = nav_item.get("item_type")

View file

@ -834,10 +834,10 @@ def partial_restore(sql_file_path, verbose=False):
warn = click.style(
"Delete the tables you want to restore manually before attempting"
" partial restore operation for PostreSQL databases",
" partial restore operation for PostgreSQL databases",
fg="yellow",
)
warnings.warn(warn)
warnings.warn(warn, stacklevel=2)
else:
click.secho("Unsupported database type", fg="red")
return

View file

@ -106,7 +106,7 @@ def take_backup_to_dropbox(retry_count=0, upload_db_backup=True):
if isinstance(error_log, str):
error_message = error_log + "\n" + frappe.get_traceback()
else:
file_and_error = [" - ".join(f) for f in zip(did_not_upload, error_log)]
file_and_error = [" - ".join(f) for f in zip(did_not_upload, error_log, strict=False)]
error_message = "\n".join(file_and_error) + "\n" + frappe.get_traceback()
send_email(False, "Dropbox", "Dropbox Settings", "send_notifications_to", error_message)

View file

@ -489,7 +489,7 @@ class LDAP_TestCase:
@mock_ldap_connection
def test_get_ldap_attributes(self):
method_return = self.test_class.get_ldap_attributes()
self.assertTrue(type(method_return) is list)
self.assertTrue(isinstance(method_return, list))
@mock_ldap_connection
def test_fetch_ldap_groups(self):
@ -599,7 +599,7 @@ class LDAP_TestCase:
test_ldap_entry = self.connection.entries[0]
method_return = self.test_class.convert_ldap_entry_to_dict(test_ldap_entry)
self.assertTrue(type(method_return) is dict) # must be dict
self.assertTrue(isinstance(method_return, dict)) # must be dict
self.assertTrue(len(method_return) == 6) # there are 6 fields in mock_ldap for use

View file

@ -12,11 +12,7 @@ def frappecloud_migrator(local_site):
request = requests.get(request_url)
if request.status_code / 100 != 2:
print(
"Request exitted with Status Code: {}\nPayload: {}".format(
request.status_code, html2text(request.text)
)
)
print(f"Request exited with Status Code: {request.status_code}\nPayload: {html2text(request.text)}")
click.secho(
"Some errors occurred while recovering the migration script. Please contact us @ Frappe Cloud if this issue persists",
fg="yellow",

View file

@ -39,7 +39,7 @@ class GoogleOAuth:
self.domain = domain.lower()
self.scopes = (
" ".join(_SCOPES[self.domain])
if isinstance(_SCOPES[self.domain], (list, tuple))
if isinstance(_SCOPES[self.domain], list | tuple)
else _SCOPES[self.domain]
)

View file

@ -29,11 +29,11 @@ def send_email(success, service_name, doctype, email_field, error_status=None):
)
else:
subject = "[Warning] Backup Upload Failed"
message = """
message = f"""
<h3>Backup Upload Failed!</h3>
<p>Oops, your automated backup to {} failed.</p>
<p>Error message: {}</p>
<p>Please contact your system manager for more information.</p>""".format(service_name, error_status)
<p>Oops, your automated backup to {service_name} failed.</p>
<p>Error message: {error_status}</p>
<p>Please contact your system manager for more information.</p>"""
frappe.sendmail(recipients=recipients, subject=subject, message=message)

View file

@ -155,5 +155,5 @@ def get_json(obj):
def json_handler(obj):
if isinstance(obj, (datetime.date, datetime.timedelta, datetime.datetime)):
if isinstance(obj, datetime.date | datetime.timedelta | datetime.datetime):
return str(obj)

View file

@ -217,7 +217,7 @@ class BaseDocument:
value = self.__dict__.get(key, default)
if limit and isinstance(value, (list, tuple)) and len(value) > limit:
if limit and isinstance(value, list | tuple) and len(value) > limit:
value = value[:limit]
return value
@ -396,7 +396,7 @@ class BaseDocument:
value = None
if convert_dates_to_str and isinstance(
value, (datetime.datetime, datetime.date, datetime.time, datetime.timedelta)
value, datetime.datetime | datetime.date | datetime.time | datetime.timedelta
):
value = str(value)
@ -1189,7 +1189,7 @@ class BaseDocument:
if not doc:
doc = getattr(self, "parent_doc", None) or self
if (absolute_value or doc.get("absolute_value")) and isinstance(val, (int, float)):
if (absolute_value or doc.get("absolute_value")) and isinstance(val, int | float):
val = abs(self.get(fieldname))
return format_value(val, df=df, doc=doc, currency=currency, format=format)
@ -1297,7 +1297,7 @@ def _filter(data, filters, limit=None):
for f in filters:
fval = filters[f]
if not isinstance(fval, (tuple, list)):
if not isinstance(fval, tuple | list):
if fval is True:
fval = ("not None", fval)
elif fval is False:

View file

@ -222,15 +222,12 @@ class DatabaseQuery:
if frappe.db.db_type == "postgres" and args.order_by and args.group_by:
args = self.prepare_select_args(args)
query = (
"""select %(fields)s
from %(tables)s
%(conditions)s
%(group_by)s
%(order_by)s
%(limit)s"""
% args
)
query = """select {fields}
from {tables}
{conditions}
{group_by}
{order_by}
{limit}""".format(**args)
return frappe.db.sql(
query,
@ -1270,7 +1267,7 @@ def get_between_date_filter(value, df=None):
from_date = frappe.utils.nowdate()
to_date = frappe.utils.nowdate()
if value and isinstance(value, (list, tuple)):
if value and isinstance(value, list | tuple):
if len(value) >= 1:
from_date = value[0]
if len(value) >= 2:

View file

@ -354,8 +354,8 @@ def check_if_doc_is_dynamically_linked(doc, method="Delete"):
def raise_link_exists_exception(doc, reference_doctype, reference_docname, row=""):
doc_link = '<a href="/app/Form/{0}/{1}">{1}</a>'.format(doc.doctype, doc.name)
reference_link = '<a href="/app/Form/{0}/{1}">{1}</a>'.format(reference_doctype, reference_docname)
doc_link = f'<a href="/app/Form/{doc.doctype}/{doc.name}">{doc.name}</a>'
reference_link = f'<a href="/app/Form/{reference_doctype}/{reference_docname}">{reference_docname}</a>'
# hack to display Single doctype only once in message
if reference_doctype == reference_docname:
@ -407,12 +407,12 @@ def clear_references(
reference_name_field="reference_name",
):
frappe.db.sql(
"""update
`tab{0}`
f"""update
`tab{doctype}`
set
{1}=NULL, {2}=NULL
{reference_doctype_field}=NULL, {reference_name_field}=NULL
where
{1}=%s and {2}=%s""".format(doctype, reference_doctype_field, reference_name_field), # nosec
{reference_doctype_field}=%s and {reference_name_field}=%s""", # nosec
(reference_doctype, reference_name),
)

View file

@ -161,7 +161,7 @@ class Document(BaseDocument):
else:
get_value_kwargs = {"for_update": self.flags.for_update, "as_dict": True}
if not isinstance(self.name, (dict, list)):
if not isinstance(self.name, dict | list):
get_value_kwargs["order_by"] = None
d = frappe.db.get_value(
@ -883,7 +883,7 @@ class Document(BaseDocument):
if not missing:
return
for fieldname, msg in missing:
for idx, msg in missing: # noqa: B007
msgprint(msg)
if frappe.flags.print_messages:
@ -1252,7 +1252,7 @@ class Document(BaseDocument):
doc_to_compare = frappe.get_doc(self.doctype, amended_from)
version = frappe.new_doc("Version")
if is_useful_diff := version.update_version_info(doc_to_compare, self):
if version.update_version_info(doc_to_compare, self):
version.insert(ignore_permissions=True)
if not frappe.flags.in_migrate:
@ -1540,7 +1540,7 @@ class Document(BaseDocument):
file_lock.delete_lock(signature)
lock_exists = False
if timeout:
for i in range(timeout):
for _ in range(timeout):
time.sleep(1)
if not file_lock.lock_exists(signature):
lock_exists = False

View file

@ -147,7 +147,7 @@ class Meta(Document):
def serialize(doc):
out = {}
for key, value in doc.__dict__.items():
if isinstance(value, (list, tuple)):
if isinstance(value, list | tuple):
if not value or not isinstance(value[0], BaseDocument):
# non standard list object, skip
continue
@ -155,7 +155,7 @@ class Meta(Document):
value = [serialize(d) for d in value]
if (not no_nulls and value is None) or isinstance(
value, (str, int, float, datetime, list, tuple)
value, str | int | float | datetime | list | tuple
):
out[key] = value
@ -713,9 +713,7 @@ class Meta(Document):
module_name, "doctype", doctype, "templates", doctype + suffix + ".html"
)
if os.path.exists(template_path):
return "{module_name}/doctype/{doctype_name}/templates/{doctype_name}{suffix}.html".format(
module_name=module_name, doctype_name=doctype, suffix=suffix
)
return f"{module_name}/doctype/{doctype}/templates/{doctype}{suffix}.html"
return None
def is_nested_set(self):

View file

@ -465,12 +465,10 @@ def append_number_if_name_exists(doctype, value, fieldname="name", separator="-"
if exists:
last = frappe.db.sql(
"""SELECT `{fieldname}` FROM `tab{doctype}`
WHERE `{fieldname}` {regex_character} %s
f"""SELECT `{fieldname}` FROM `tab{doctype}`
WHERE `{fieldname}` {frappe.db.REGEX_CHARACTER} %s
ORDER BY length({fieldname}) DESC,
`{fieldname}` DESC LIMIT 1""".format(
doctype=doctype, fieldname=fieldname, regex_character=frappe.db.REGEX_CHARACTER
),
`{fieldname}` DESC LIMIT 1""",
regex,
)

View file

@ -47,7 +47,7 @@ def update_document_title(
# TODO: omit this after runtime type checking (ref: https://github.com/frappe/frappe/pull/14927)
for obj in [docname, updated_title, updated_name]:
if not isinstance(obj, (str, NoneType)):
if not isinstance(obj, str | NoneType):
frappe.throw(f"{obj=} must be of type str or None")
# handle bad API usages

View file

@ -3,7 +3,6 @@
import re
import frappe
from frappe import _
from frappe.build import html_to_js_template
from frappe.utils import cstr
from frappe.utils.caching import site_cache
@ -30,9 +29,8 @@ def set_default(doc, key):
frappe.db.set(doc, "is_default", 1)
frappe.db.sql(
"""update `tab%s` set `is_default`=0
where `%s`=%s and name!=%s"""
% (doc.doctype, key, "%s", "%s"),
"""update `tab{}` set `is_default`=0
where `{}`={} and name!={}""".format(doc.doctype, key, "%s", "%s"),
(doc.get(key), doc.name),
)
@ -62,7 +60,7 @@ def render_include(content):
content = cstr(content)
# try 5 levels of includes
for i in range(5):
for _ in range(5):
if "{% include" in content:
paths = INCLUDE_DIRECTIVE_PATTERN.findall(content)
if not paths:

View file

@ -27,9 +27,8 @@ def rename_field(doctype, old_fieldname, new_fieldname, validate=True):
if new_field.fieldtype in table_fields:
# change parentfield of table mentioned in options
frappe.db.sql(
"""update `tab%s` set parentfield=%s
where parentfield=%s"""
% (new_field.options.split("\n", 1)[0], "%s", "%s"),
"""update `tab{}` set parentfield={}
where parentfield={}""".format(new_field.options.split("\n", 1)[0], "%s", "%s"),
(new_fieldname, old_fieldname),
)
@ -142,9 +141,8 @@ def update_users_report_view_settings(doctype, ref_fieldname, new_fieldname):
if columns_modified:
frappe.db.sql(
"""update `tabDefaultValue` set defvalue=%s
where defkey=%s"""
% ("%s", "%s"),
"""update `tabDefaultValue` set defvalue={}
where defkey={}""".format("%s", "%s"),
(json.dumps(new_columns), key),
)

View file

@ -38,7 +38,7 @@ ignore_doctypes = [""]
def import_files(module, dt=None, dn=None, force=False, pre_process=None, reset_permissions=False):
if type(module) is list:
if isinstance(module, list):
return [
import_file(
m[0],
@ -209,11 +209,7 @@ def import_doc(
docdict["__islocal"] = 1
controller = get_controller(docdict["doctype"])
if (
controller
and hasattr(controller, "prepare_for_import")
and callable(getattr(controller, "prepare_for_import"))
):
if controller and hasattr(controller, "prepare_for_import") and callable(controller.prepare_for_import):
controller.prepare_for_import(docdict)
doc = frappe.get_doc(docdict)

View file

@ -143,7 +143,7 @@ def split_by_weight(work, weights, chunk_count):
chunk_no = 0
chunk_weight = 0
for task, weight in zip(work, weights):
for task, weight in zip(work, weights, strict=False):
if chunk_weight > expected_weight:
chunk_weight = 0
chunk_no += 1

View file

@ -12,7 +12,7 @@ def execute():
for record in duplicateRecords:
frappe.db.sql(
"""delete from `tabUser Permission`
where allow=%s and user=%s and for_value=%s limit {}""".format(record.count - 1),
f"""delete from `tabUser Permission`
where allow=%s and user=%s and for_value=%s limit {record.count - 1}""",
(record.allow, record.user, record.for_value),
)

View file

@ -12,8 +12,8 @@ def execute():
for user in users:
# get user_settings for each user
settings = frappe.db.sql(
"select * from `__UserSettings` \
where user={}".format(frappe.db.escape(user.user)),
f"select * from `__UserSettings` \
where user={frappe.db.escape(user.user)}",
as_dict=True,
)

View file

@ -22,7 +22,6 @@ def execute():
phone_values = []
for count, contact_detail in enumerate(contact_details):
phone_counter = 1
is_primary = 1
if contact_detail.email_id:
email_values.append(
(

View file

@ -18,7 +18,7 @@ def execute():
update_column_table_map.setdefault(field.TABLE_NAME, [])
update_column_table_map[field.TABLE_NAME].append(
"`{fieldname}`=COALESCE(`{fieldname}`, 0)".format(fieldname=field.COLUMN_NAME)
f"`{field.COLUMN_NAME}`=COALESCE(`{field.COLUMN_NAME}`, 0)"
)
for table in frappe.db.get_tables():

View file

@ -9,12 +9,12 @@ def execute():
for user in users:
user_settings = frappe.db.sql(
"""
f"""
select
* from `__UserSettings`
where
user='{user}'
""".format(user=user.user),
user='{user.user}'
""",
as_dict=True,
)

View file

@ -127,7 +127,7 @@ def has_permission(
meta = frappe.get_meta(doctype)
if doc:
if isinstance(doc, (str, int)):
if isinstance(doc, str | int):
doc = frappe.get_doc(meta.name, doc)
perm = get_doc_permissions(doc, user=user, ptype=ptype, debug=debug).get(ptype)
if not perm:

View file

@ -81,7 +81,7 @@ def get_current_stack_frames():
try:
current = inspect.currentframe()
frames = inspect.getouterframes(current, context=10)
for frame, filename, lineno, function, context, index in list(reversed(frames))[:-2]:
for frame, filename, lineno, function, context, index in list(reversed(frames))[:-2]: # noqa: B007
if "/apps/" in filename or SERVER_SCRIPT_FILE_PREFIX in filename:
yield {
"filename": TRACEBACK_PATH_PATTERN.sub("", filename),

View file

@ -257,7 +257,7 @@ def get_user_energy_and_review_points(user=None, from_date=None, as_dict=True):
values.from_date = from_date
points_list = frappe.db.sql(
"""
f"""
SELECT
SUM(CASE WHEN `type` != 'Review' THEN `points` ELSE 0 END) AS energy_points,
SUM(CASE WHEN `type` = 'Review' THEN `points` ELSE 0 END) AS review_points,
@ -271,7 +271,7 @@ def get_user_energy_and_review_points(user=None, from_date=None, as_dict=True):
{conditions}
GROUP BY `user`
ORDER BY `energy_points` DESC
""".format(conditions=conditions, given_points_condition=given_points_condition),
""",
values=values,
as_dict=1,
)

View file

@ -77,7 +77,7 @@ class EnergyPointRule(Document):
{"points": points, "user": user, "rule": rule},
self.apply_only_once,
)
except Exception as e:
except Exception:
self.log_error("Energy points failed")
def rule_condition_satisfied(self, doc):

View file

@ -215,7 +215,7 @@ def run_tests_for_doctype(
junit_xml_output=False,
):
modules = []
if not isinstance(doctypes, (list, tuple)):
if not isinstance(doctypes, list | tuple):
doctypes = [doctypes]
for doctype in doctypes:
@ -268,7 +268,7 @@ def _run_unittest(
test_suite = unittest.TestSuite()
if not isinstance(modules, (list, tuple)):
if not isinstance(modules, list | tuple):
modules = [modules]
for module in modules:

View file

@ -54,7 +54,9 @@ def patch_request_header(key, *args, **kwargs):
class ThreadWithReturnValue(Thread):
def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, *, site=None):
def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, site=None):
if kwargs is None:
kwargs = {}
Thread.__init__(self, group, target, name, args, kwargs)
self._return = None
self.site = site or _site

View file

@ -15,7 +15,7 @@ register_with_external_service = MagicMock(return_value=True)
def request_specific_api(a: list | tuple | dict | int, b: int) -> int:
# API that takes very long to return a result
todays_value = external_service()
if not isinstance(a, (int, float)):
if not isinstance(a, int | float):
a = 1
return a**b * todays_value
@ -44,7 +44,9 @@ class TestCachingUtils(FrappeTestCase):
frappe.get_last_doc("DocType"),
frappe._dict(),
]
same_output_received = lambda: all([x for x in set(retval) if x == retval[0]])
def same_output_received():
return all([x for x in set(retval) if x == retval[0]])
# ensure that external service was called only once
# thereby return value of request_specific_api is cached

View file

@ -2,7 +2,6 @@
# License: MIT. See LICENSE
import datetime
import inspect
from math import ceil
from random import choice
from unittest.mock import patch
@ -17,7 +16,7 @@ from frappe.query_builder import Field
from frappe.query_builder.functions import Concat_ws
from frappe.tests.test_query_builder import db_type_is, run_only_if
from frappe.tests.utils import FrappeTestCase
from frappe.utils import add_days, cint, now, random_string, set_request
from frappe.utils import add_days, now, random_string, set_request
from frappe.utils.testutils import clear_custom_fields
@ -437,7 +436,7 @@ class TestDB(FrappeTestCase):
frappe.db.MAX_WRITES_PER_TRANSACTION = 1
note = frappe.get_last_doc("ToDo")
note.description = "changed"
with self.assertRaises(frappe.TooManyWritesError) as tmw:
with self.assertRaises(frappe.TooManyWritesError):
note.save()
frappe.db.MAX_WRITES_PER_TRANSACTION = Database.MAX_WRITES_PER_TRANSACTION

View file

@ -57,7 +57,11 @@ class TestDBQuery(FrappeTestCase):
db_query = DatabaseQuery("DocType")
add_custom_field("DocType", "test_tab_field", "Data")
db_query.fields = ["tabNote.creation", "test_tab_field", "tabDocType.test_tab_field"]
db_query.fields = [
"tabNote.creation",
"test_tab_field",
"tabDocType.test_tab_field",
]
db_query.extract_tables()
self.assertIn("`tabNote`", db_query.tables)
self.assertIn("`tabDocType`", db_query.tables)
@ -148,7 +152,10 @@ class TestDBQuery(FrappeTestCase):
frappe.get_doc(
doctype="Parent DocType 1",
title="test",
child=[{"title": "parent 1 child record 1"}, {"title": "parent 1 child record 2"}],
child=[
{"title": "parent 1 child record 1"},
{"title": "parent 1 child record 2"},
],
__newname="test_parent",
).insert(ignore_if_duplicate=True)
frappe.get_doc(
@ -295,7 +302,8 @@ class TestDBQuery(FrappeTestCase):
# if both from and to_date values are passed
data = DatabaseQuery("Event").execute(
filters={"starts_on": ["between", ["2016-07-06", "2016-07-07"]]}, fields=["name"]
filters={"starts_on": ["between", ["2016-07-06", "2016-07-07"]]},
fields=["name"],
)
self.assertIn({"name": event2.name}, data)
@ -316,7 +324,8 @@ class TestDBQuery(FrappeTestCase):
# test between is formatted for creation column
data = DatabaseQuery("Event").execute(
filters={"creation": ["between", ["2016-07-06", "2016-07-07"]]}, fields=["name"]
filters={"creation": ["between", ["2016-07-06", "2016-07-07"]]},
fields=["name"],
)
def test_between_filters_date_bounds(self):
@ -368,7 +377,10 @@ class TestDBQuery(FrappeTestCase):
self.assertRaises(
frappe.DataError,
DatabaseQuery("DocType").execute,
fields=["name", "issingle, IF(issingle=1, (select name from tabUser), count(name))"],
fields=[
"name",
"issingle, IF(issingle=1, (select name from tabUser), count(name))",
],
limit_start=0,
limit_page_length=1,
)
@ -392,7 +404,10 @@ class TestDBQuery(FrappeTestCase):
self.assertRaises(
frappe.DataError,
DatabaseQuery("DocType").execute,
fields=["name", "issingle, IF(issingle=1, (SELECT name from tabUser), count(*))"],
fields=[
"name",
"issingle, IF(issingle=1, (SELECT name from tabUser), count(*))",
],
limit_start=0,
limit_page_length=1,
)
@ -466,14 +481,20 @@ class TestDBQuery(FrappeTestCase):
self.assertTrue("_relevance" in data[0])
data = DatabaseQuery("DocType").execute(
fields=["name", "issingle", "date(creation) as creation"], limit_start=0, limit_page_length=1
fields=["name", "issingle", "date(creation) as creation"],
limit_start=0,
limit_page_length=1,
)
self.assertTrue("creation" in data[0])
if frappe.db.db_type != "postgres":
# datediff function does not exist in postgres
data = DatabaseQuery("DocType").execute(
fields=["name", "issingle", "datediff(modified, creation) as date_diff"],
fields=[
"name",
"issingle",
"datediff(modified, creation) as date_diff",
],
limit_start=0,
limit_page_length=1,
)
@ -481,14 +502,22 @@ class TestDBQuery(FrappeTestCase):
with self.assertRaises(frappe.DataError):
DatabaseQuery("DocType").execute(
fields=["name", "issingle", "if (issingle=1, (select name from tabUser), count(name))"],
fields=[
"name",
"issingle",
"if (issingle=1, (select name from tabUser), count(name))",
],
limit_start=0,
limit_page_length=1,
)
with self.assertRaises(frappe.DataError):
DatabaseQuery("DocType").execute(
fields=["name", "issingle", "if(issingle=1, (select name from tabUser), count(name))"],
fields=[
"name",
"issingle",
"if(issingle=1, (select name from tabUser), count(name))",
],
limit_start=0,
limit_page_length=1,
)
@ -578,7 +607,10 @@ class TestDBQuery(FrappeTestCase):
DatabaseQuery("DocType").execute,
fields=["name"],
filters={"editable_grid,": 1},
or_filters=[["DocType", "istable", "=", 1], ["DocType", "beta and 1=1", "=", 0]],
or_filters=[
["DocType", "istable", "=", 1],
["DocType", "beta and 1=1", "=", 0],
],
limit_start=0,
limit_page_length=1,
)
@ -600,13 +632,18 @@ class TestDBQuery(FrappeTestCase):
self.assertTrue("Role Permission for Page and Report" in [d["name"] for d in out])
out = DatabaseQuery("DocType").execute(
fields=["name"], filters={"track_changes": 1, "module": "Core"}, order_by="creation"
fields=["name"],
filters={"track_changes": 1, "module": "Core"},
order_by="creation",
)
self.assertTrue("File" in [d["name"] for d in out])
out = DatabaseQuery("DocType").execute(
fields=["name"],
filters=[["DocType", "ifnull(track_changes, 0)", "=", 0], ["DocType", "module", "=", "Core"]],
filters=[
["DocType", "ifnull(track_changes, 0)", "=", 0],
["DocType", "module", "=", "Core"],
],
order_by="creation",
)
self.assertTrue("DefaultValue" in [d["name"] for d in out])
@ -743,7 +780,7 @@ class TestDBQuery(FrappeTestCase):
def test_set_field_tables(self):
# Tests _in_standard_sql_methods method in test_set_field_tables
# The following query will break if the above method is broken
data = frappe.db.get_list(
frappe.db.get_list(
"Web Form",
filters=[["Web Form Field", "reqd", "=", 1]],
fields=["count(*) as count"],
@ -755,7 +792,7 @@ class TestDBQuery(FrappeTestCase):
try:
frappe.get_list("Prepared Report", ["*"])
frappe.get_list("Scheduled Job Type", ["*"])
except Exception as e:
except Exception:
print(frappe.get_traceback())
self.fail("get_list not working with virtual field")
@ -805,21 +842,30 @@ class TestDBQuery(FrappeTestCase):
def test_permlevel_fields(self):
with setup_patched_blog_post(), setup_test_user(set_user=True):
data = frappe.get_list(
"Blog Post", filters={"published": 1}, fields=["name", "published"], limit=1
"Blog Post",
filters={"published": 1},
fields=["name", "published"],
limit=1,
)
self.assertFalse("published" in data[0])
self.assertTrue("name" in data[0])
self.assertEqual(len(data[0]), 1)
data = frappe.get_list(
"Blog Post", filters={"published": 1}, fields=["name", "`published`"], limit=1
"Blog Post",
filters={"published": 1},
fields=["name", "`published`"],
limit=1,
)
self.assertFalse("published" in data[0])
self.assertTrue("name" in data[0])
self.assertEqual(len(data[0]), 1)
data = frappe.get_list(
"Blog Post", filters={"published": 1}, fields=["name", "`tabBlog Post`.`published`"], limit=1
"Blog Post",
filters={"published": 1},
fields=["name", "`tabBlog Post`.`published`"],
limit=1,
)
self.assertFalse("published" in data[0])
self.assertTrue("name" in data[0])
@ -836,13 +882,19 @@ class TestDBQuery(FrappeTestCase):
self.assertEqual(len(data[0]), 1)
data = frappe.get_list(
"Blog Post", filters={"published": 1}, fields=["name", "MAX(`published`)"], limit=1
"Blog Post",
filters={"published": 1},
fields=["name", "MAX(`published`)"],
limit=1,
)
self.assertTrue("name" in data[0])
self.assertEqual(len(data[0]), 1)
data = frappe.get_list(
"Blog Post", filters={"published": 1}, fields=["name", "LAST(published)"], limit=1
"Blog Post",
filters={"published": 1},
fields=["name", "LAST(published)"],
limit=1,
)
self.assertTrue("name" in data[0])
self.assertEqual(len(data[0]), 1)
@ -858,12 +910,20 @@ class TestDBQuery(FrappeTestCase):
self.assertEqual(len(data[0]), 2)
data = frappe.get_list(
"Blog Post", filters={"published": 1}, fields=["name", "now() abhi"], limit=1
"Blog Post",
filters={"published": 1},
fields=["name", "now() abhi"],
limit=1,
)
self.assertIsInstance(data[0]["abhi"], datetime.datetime)
self.assertEqual(len(data[0]), 2)
data = frappe.get_list("Blog Post", filters={"published": 1}, fields=["name", "'LABEL'"], limit=1)
data = frappe.get_list(
"Blog Post",
filters={"published": 1},
fields=["name", "'LABEL'"],
limit=1,
)
self.assertTrue("name" in data[0])
self.assertTrue("LABEL" in data[0].values())
self.assertEqual(len(data[0]), 2)
@ -892,7 +952,11 @@ class TestDBQuery(FrappeTestCase):
data = frappe.get_list(
"Blog Post",
fields=["name", "blogger.full_name as blogger_full_name", "blog_category.description"],
fields=[
"name",
"blogger.full_name as blogger_full_name",
"blog_category.description",
],
limit=1,
)
self.assertTrue("name" in data[0])
@ -906,7 +970,11 @@ class TestDBQuery(FrappeTestCase):
dt = new_doctype("autoinc_dt_test", autoname="autoincrement").insert(ignore_permissions=True)
query = DatabaseQuery("autoinc_dt_test").execute(
fields=["locate('1', `tabautoinc_dt_test`.`name`)", "name", "locate('1', name)"],
fields=[
"locate('1', `tabautoinc_dt_test`.`name`)",
"name",
"locate('1', name)",
],
filters={"name": 1},
run=False,
)
@ -929,7 +997,9 @@ class TestDBQuery(FrappeTestCase):
frappe.delete_doc_if_exists("DocType", "table_dt")
table_dt = new_doctype(
"table_dt", istable=1, fields=[{"label": "1field", "fieldname": "2field", "fieldtype": "Data"}]
"table_dt",
istable=1,
fields=[{"label": "1field", "fieldname": "2field", "fieldtype": "Data"}],
).insert()
dt = new_doctype(
@ -974,7 +1044,9 @@ class TestDBQuery(FrappeTestCase):
table_dt.delete()
def test_permission_query_condition(self):
from frappe.desk.doctype.dashboard_settings.dashboard_settings import create_dashboard_settings
from frappe.desk.doctype.dashboard_settings.dashboard_settings import (
create_dashboard_settings,
)
self.doctype = "Dashboard Settings"
self.user = "test'5@example.com"
@ -984,11 +1056,11 @@ class TestDBQuery(FrappeTestCase):
create_dashboard_settings(self.user)
dashboard_settings = frappe.db.sql(
"""
f"""
SELECT name
FROM `tabDashboard Settings`
WHERE {condition}
""".format(condition=permission_query_conditions),
WHERE {permission_query_conditions}
""",
as_dict=1,
)[0]
@ -1006,7 +1078,10 @@ class TestDBQuery(FrappeTestCase):
def get_list(args):
...
with patch("frappe.controllers", new={frappe.local.site: {"Virtual DocType": VirtualDocType}}):
with patch(
"frappe.controllers",
new={frappe.local.site: {"Virtual DocType": VirtualDocType}},
):
VirtualDocType.get_list = MagicMock()
frappe.get_all("Virtual DocType", filters={"name": "test"}, fields=["name"], limit=1)
@ -1120,7 +1195,11 @@ class TestReportView(FrappeTestCase):
)
list_filter_response = execute_cmd("frappe.desk.reportview.get_count")
frappe.local.form_dict = frappe._dict(
{"doctype": "DocType", "filters": {"show_title_field_in_link": 1}, "distinct": "true"}
{
"doctype": "DocType",
"filters": {"show_title_field_in_link": 1},
"distinct": "true",
}
)
dict_filter_response = execute_cmd("frappe.desk.reportview.get_count")
self.assertIsInstance(list_filter_response, int)

View file

@ -316,7 +316,7 @@ class TestOAuth20(FrappeRequestTestCase):
frappe.db.commit()
def test_openid_code_id_token(self):
client = update_client_for_auth_code_grant(self.client_id)
update_client_for_auth_code_grant(self.client_id)
nonce = frappe.generate_hash()
# Go to Authorize url

View file

@ -164,7 +164,7 @@ def check_patch_files(app):
missing_patches.append(module)
if missing_patches:
raise Exception(f"Patches missing in patch.txt: \n" + "\n".join(missing_patches))
raise Exception("Patches missing in patch.txt: \n" + "\n".join(missing_patches))
def _get_dotted_path(file: Path, app) -> str:

View file

@ -145,7 +145,7 @@ class TestPerformance(FrappeTestCase):
self.assertGreaterEqual(
rps,
EXPECTED_RPS * (1 - FAILURE_THREASHOLD),
f"Possible performance regression in basic /api/Resource list requests",
"Possible performance regression in basic /api/Resource list requests",
)
def test_homepage_resolver(self):

View file

@ -3,7 +3,6 @@
import frappe
import frappe.utils
from frappe.core.doctype.doctype.test_doctype import new_doctype
from frappe.desk.query_report import build_xlsx_data, export_query, run
from frappe.tests.utils import FrappeTestCase
from frappe.utils.xlsxutils import make_xlsx
@ -138,7 +137,7 @@ class TestQueryReport(FrappeTestCase):
{"label": "First Name", "fieldname": "first_name", "fieldtype": "Data"},
{"label": "Last Name", "fieldname": "last_name", "fieldtype": "Data"},
]
docA = frappe.get_doc(
frappe.get_doc(
{
"doctype": "DocType",
"name": "Doc A",
@ -150,7 +149,7 @@ class TestQueryReport(FrappeTestCase):
}
).insert(ignore_if_duplicate=True)
docB = frappe.get_doc(
frappe.get_doc(
{
"doctype": "DocType",
"name": "Doc B",

View file

@ -109,7 +109,7 @@ class TestRecorder(FrappeTestCase):
self.assertEqual(len(request["calls"]), len(queries))
for query, call in zip(queries, request["calls"]):
for query, call in zip(queries, request["calls"], strict=False):
self.assertEqual(
call["query"],
sqlparse.format(
@ -134,7 +134,7 @@ class TestRecorder(FrappeTestCase):
requests = frappe.recorder.get()
request = frappe.recorder.get(requests[0]["uuid"])
for query, call in zip(queries, request["calls"]):
for query, call in zip(queries, request["calls"], strict=False):
self.assertEqual(call["exact_copies"], query[1])
def test_error_page_rendering(self):

View file

@ -93,7 +93,7 @@ class TestSafeExec(FrappeTestCase):
def test_ensure_getattrable_globals(self):
def check_safe(objects):
for obj in objects:
if isinstance(obj, (types.ModuleType, types.CodeType, types.TracebackType, types.FrameType)):
if isinstance(obj, types.ModuleType | types.CodeType | types.TracebackType | types.FrameType):
self.fail(f"{obj} wont work in safe exec.")
elif isinstance(obj, dict):
check_safe(obj.values())

View file

@ -74,7 +74,7 @@ class TestTranslate(FrappeTestCase):
msg=f"Mismatched output:\nExpected: {expected_output}\nFound: {data}",
)
for extracted, expected in zip(data, expected_output):
for extracted, expected in zip(data, expected_output, strict=False):
ext_filename, ext_message, ext_context, ext_line = extracted
exp_message, exp_context, exp_line = expected
self.assertEqual(ext_filename, exp_filename)
@ -214,7 +214,7 @@ class TestTranslate(FrappeTestCase):
output = extract_messages_from_python_code(code)
self.assertEqual(len(expected_output), len(output))
for expected, actual in zip(expected_output, output):
for expected, actual in zip(expected_output, output, strict=False):
with self.subTest():
self.assertEqual(expected, actual)
@ -251,7 +251,7 @@ class TestTranslate(FrappeTestCase):
output = extract_messages_from_javascript_code(code)
self.assertEqual(len(expected_output), len(output))
for expected, actual in zip(expected_output, output):
for expected, actual in zip(expected_output, output, strict=False):
with self.subTest():
self.assertEqual(expected, actual)

View file

@ -232,7 +232,7 @@ def toggle_2fa_all_role(state=None):
"""Enable or disable 2fa for 'all' role on the system."""
all_role = frappe.get_doc("Role", "All")
state = state if state is not None else False
if type(state) != bool:
if not isinstance(state, bool):
return
all_role.two_factor_auth = cint(state)

View file

@ -46,7 +46,11 @@ from frappe.utils import (
validate_phone_number_with_country_code,
validate_url,
)
from frappe.utils.change_log import check_release_on_github, get_remote_url, parse_github_url
from frappe.utils.change_log import (
check_release_on_github,
get_remote_url,
parse_github_url,
)
from frappe.utils.data import (
add_to_date,
add_years,
@ -98,7 +102,8 @@ class TestFilters(FrappeTestCase):
def test_multiple_dict(self):
self.assertTrue(
evaluate_filters(
{"doctype": "User", "status": "Open", "name": "Test 1"}, {"status": "Open", "name": "Test 1"}
{"doctype": "User", "status": "Open", "name": "Test 1"},
{"status": "Open", "name": "Test 1"},
)
)
self.assertFalse(
@ -139,12 +144,14 @@ class TestFilters(FrappeTestCase):
def test_lt_gt(self):
self.assertTrue(
evaluate_filters(
{"doctype": "User", "status": "Open", "age": 20}, {"status": "Open", "age": (">", 10)}
{"doctype": "User", "status": "Open", "age": 20},
{"status": "Open", "age": (">", 10)},
)
)
self.assertFalse(
evaluate_filters(
{"doctype": "User", "status": "Open", "age": 20}, {"status": "Open", "age": (">", 30)}
{"doctype": "User", "status": "Open", "age": 20},
{"status": "Open", "age": (">", 30)},
)
)
@ -152,12 +159,14 @@ class TestFilters(FrappeTestCase):
# date fields
self.assertTrue(
evaluate_filters(
{"doctype": "User", "birth_date": "2023-02-28"}, [("User", "birth_date", ">", "01-04-2022")]
{"doctype": "User", "birth_date": "2023-02-28"},
[("User", "birth_date", ">", "01-04-2022")],
)
)
self.assertFalse(
evaluate_filters(
{"doctype": "User", "birth_date": "2023-02-28"}, [("User", "birth_date", "<", "28-02-2023")]
{"doctype": "User", "birth_date": "2023-02-28"},
[("User", "birth_date", "<", "28-02-2023")],
)
)
@ -176,7 +185,12 @@ class TestFilters(FrappeTestCase):
)
def test_like_not_like(self):
doc = {"doctype": "User", "username": "test_abc", "prefix": "startswith", "suffix": "endswith"}
doc = {
"doctype": "User",
"username": "test_abc",
"prefix": "startswith",
"suffix": "endswith",
}
test_cases = [
([["username", "like", "test"]], True),
@ -371,7 +385,11 @@ class TestValidationUtils(FrappeTestCase):
self.assertTrue(validate_url("ftp://frappe.cloud", valid_schemes=["https", "ftp"]))
self.assertFalse(validate_url("bolo://frappe.io", valid_schemes=("http", "https", "ftp", "ftps")))
self.assertRaises(
frappe.ValidationError, validate_url, "gopher://frappe.io", valid_schemes="https", throw=True
frappe.ValidationError,
validate_url,
"gopher://frappe.io",
valid_schemes="https",
throw=True,
)
def test_valid_email(self):
@ -391,7 +409,12 @@ class TestValidationUtils(FrappeTestCase):
self.assertFalse(validate_email_address("test@example.com test2@example.com,undisclosed-recipient"))
# Invalid with throw
self.assertRaises(frappe.InvalidEmailAddressError, validate_email_address, "someone.com", throw=True)
self.assertRaises(
frappe.InvalidEmailAddressError,
validate_email_address,
"someone.com",
throw=True,
)
self.assertEqual(validate_email_address("Some%20One@frappe.com"), "Some%20One@frappe.com")
self.assertEqual(
@ -425,7 +448,8 @@ class TestImage(FrappeTestCase):
def test_strip_exif_data(self):
original_image = Image.open(frappe.get_app_path("frappe", "tests", "data", "exif_sample_image.jpg"))
original_image_content = open(
frappe.get_app_path("frappe", "tests", "data", "exif_sample_image.jpg"), mode="rb"
frappe.get_app_path("frappe", "tests", "data", "exif_sample_image.jpg"),
mode="rb",
).read()
new_image_content = strip_exif_data(original_image_content, "image/jpeg")
@ -522,19 +546,33 @@ class TestDateUtils(FrappeTestCase):
# Monday as start of the week
with patch.object(frappe.utils.data, "get_first_day_of_the_week", return_value="Monday"):
self.assertEqual(
frappe.utils.get_first_day_of_week("2020-12-25"), frappe.utils.getdate("2020-12-21")
frappe.utils.get_first_day_of_week("2020-12-25"),
frappe.utils.getdate("2020-12-21"),
)
self.assertEqual(
frappe.utils.get_first_day_of_week("2020-12-20"), frappe.utils.getdate("2020-12-14")
frappe.utils.get_first_day_of_week("2020-12-20"),
frappe.utils.getdate("2020-12-14"),
)
# Sunday as start of the week
self.assertEqual(frappe.utils.get_first_day_of_week("2020-12-25"), frappe.utils.getdate("2020-12-20"))
self.assertEqual(frappe.utils.get_first_day_of_week("2020-12-21"), frappe.utils.getdate("2020-12-20"))
self.assertEqual(
frappe.utils.get_first_day_of_week("2020-12-25"),
frappe.utils.getdate("2020-12-20"),
)
self.assertEqual(
frappe.utils.get_first_day_of_week("2020-12-21"),
frappe.utils.getdate("2020-12-20"),
)
def test_last_day_of_week(self):
self.assertEqual(frappe.utils.get_last_day_of_week("2020-12-24"), frappe.utils.getdate("2020-12-26"))
self.assertEqual(frappe.utils.get_last_day_of_week("2020-12-28"), frappe.utils.getdate("2021-01-02"))
self.assertEqual(
frappe.utils.get_last_day_of_week("2020-12-24"),
frappe.utils.getdate("2020-12-26"),
)
self.assertEqual(
frappe.utils.get_last_day_of_week("2020-12-28"),
frappe.utils.getdate("2021-01-02"),
)
def test_is_last_day_of_the_month(self):
self.assertEqual(frappe.utils.is_last_day_of_the_month("2020-12-24"), False)
@ -674,7 +712,14 @@ class TestResponse(FrappeTestCase):
"time_types": [
date(year=2020, month=12, day=2),
datetime(
year=2020, month=12, day=2, hour=23, minute=23, second=23, microsecond=23, tzinfo=pytz.utc
year=2020,
month=12,
day=2,
hour=23,
minute=23,
second=23,
microsecond=23,
tzinfo=pytz.utc,
),
time(hour=23, minute=23, second=23, microsecond=23, tzinfo=pytz.utc),
timedelta(days=10, hours=12, minutes=120, seconds=10),
@ -699,7 +744,7 @@ class TestResponse(FrappeTestCase):
self.assertTrue(all([isinstance(x, str) for x in processed_object["time_types"]]))
self.assertTrue(all([isinstance(x, float) for x in processed_object["float"]]))
self.assertTrue(all([isinstance(x, (list, str)) for x in processed_object["iter"]]))
self.assertTrue(all([isinstance(x, list | str) for x in processed_object["iter"]]))
self.assertIsInstance(processed_object["string"], str)
with self.assertRaises(TypeError):
json.dumps(BAD_OBJECT, default=json_handler)
@ -711,13 +756,17 @@ class TestTimeDeltaUtils(FrappeTestCase):
self.assertEqual(format_timedelta(timedelta(hours=10)), "10:00:00")
self.assertEqual(format_timedelta(timedelta(hours=100)), "100:00:00")
self.assertEqual(format_timedelta(timedelta(seconds=100, microseconds=129)), "0:01:40.000129")
self.assertEqual(format_timedelta(timedelta(seconds=100, microseconds=12212199129)), "3:25:12.199129")
self.assertEqual(
format_timedelta(timedelta(seconds=100, microseconds=12212199129)),
"3:25:12.199129",
)
def test_parse_timedelta(self):
self.assertEqual(parse_timedelta("0:0:0"), timedelta(seconds=0))
self.assertEqual(parse_timedelta("10:0:0"), timedelta(hours=10))
self.assertEqual(
parse_timedelta("7 days, 0:32:18.192221"), timedelta(days=7, seconds=1938, microseconds=192221)
parse_timedelta("7 days, 0:32:18.192221"),
timedelta(days=7, seconds=1938, microseconds=192221),
)
self.assertEqual(parse_timedelta("7 days, 0:32:18"), timedelta(days=7, seconds=1938))
@ -982,7 +1031,7 @@ class TestMiscUtils(FrappeTestCase):
class TestTypingValidations(FrappeTestCase):
ERR_REGEX = f"^Argument '.*' should be of type '.*' but got '.*' instead.$"
ERR_REGEX = "^Argument '.*' should be of type '.*' but got '.*' instead.$"
def test_validate_whitelisted_api(self):
from inspect import signature
@ -1009,9 +1058,9 @@ class TestTypingValidations(FrappeTestCase):
class TestTBSanitization(FrappeTestCase):
def test_traceback_sanitzation(self):
try:
password = "42"
args = {"password": "42", "pwd": "42", "safe": "safe_value"}
args = frappe._dict({"password": "42", "pwd": "42", "safe": "safe_value"})
password = "42" # noqa: F841
args = {"password": "42", "pwd": "42", "safe": "safe_value"} # noqa: F841
args = frappe._dict({"password": "42", "pwd": "42", "safe": "safe_value"}) # noqa: F841
raise Exception
except Exception:
traceback = frappe.get_traceback(with_context=True)
@ -1100,7 +1149,10 @@ class TestRounding(FrappeTestCase):
self.assertEqual(flt(3.35, 1, rounding_method=rounding_method), 3.4)
@change_settings("System Settings", {"rounding_method": "Commercial Rounding"})
@given(st.decimals(min_value=-1e8, max_value=1e8), st.integers(min_value=-2, max_value=4))
@given(
st.decimals(min_value=-1e8, max_value=1e8),
st.integers(min_value=-2, max_value=4),
)
def test_normal_rounding_property(self, number, precision):
with localcontext() as ctx:
ctx.rounding = ROUND_HALF_UP
@ -1165,7 +1217,10 @@ class TestRounding(FrappeTestCase):
self.assertEqual(flt(-3.35, 1, rounding_method=rounding_method), -3.4)
@change_settings("System Settings", {"rounding_method": "Banker's Rounding"})
@given(st.decimals(min_value=-1e8, max_value=1e8), st.integers(min_value=-2, max_value=4))
@given(
st.decimals(min_value=-1e8, max_value=1e8),
st.integers(min_value=-2, max_value=4),
)
def test_bankers_rounding_property(self, number, precision):
self.assertEqual(Decimal(str(flt(float(number), precision))), round(number, precision))
@ -1173,10 +1228,13 @@ class TestRounding(FrappeTestCase):
self.assertEqual(frappe.get_system_settings("rounding_method"), "Banker's Rounding")
class TestTypingValidations(FrappeTestCase):
class TestArgumentTypingValidations(FrappeTestCase):
def test_validate_argument_types(self):
from frappe.core.doctype.doctype.doctype import DocType
from frappe.utils.typing_validations import FrappeTypeError, validate_argument_types
from frappe.utils.typing_validations import (
FrappeTypeError,
validate_argument_types,
)
@validate_argument_types
def test_simple_types(a: int, b: float, c: bool):

View file

@ -60,7 +60,7 @@ class FrappeTestCase(unittest.TestCase):
if isinstance(value, list):
actual_child_docs = actual.get(field)
self.assertEqual(len(value), len(actual_child_docs), msg=f"{field} length should be same")
for exp_child, actual_child in zip(value, actual_child_docs):
for exp_child, actual_child in zip(value, actual_child_docs, strict=False):
self.assertDocumentEqual(exp_child, actual_child)
else:
self._compare_field(value, actual.get(field), actual, field)
@ -73,7 +73,7 @@ class FrappeTestCase(unittest.TestCase):
self.assertAlmostEqual(
expected, actual, places=precision, msg=f"{field} should be same to {precision} digits"
)
elif isinstance(expected, (bool, int)):
elif isinstance(expected, bool | int):
self.assertEqual(expected, cint(actual), msg=msg)
elif isinstance(expected, datetime_like_types):
self.assertEqual(str(expected), str(actual), msg=msg)

View file

@ -204,9 +204,7 @@ def get_translation_dict_from_file(path, lang, app, throw=False) -> dict[str, st
elif len(item) in [2, 3]:
translation_map[item[0]] = strip(item[1])
elif item:
msg = "Bad translation in '{app}' for language '{lang}': {values}".format(
app=app, lang=lang, values=cstr(item)
)
msg = f"Bad translation in '{app}' for language '{lang}': {cstr(item)}"
frappe.log_error(message=msg, title="Error in translation file")
if throw:
frappe.throw(msg, title="Error in translation file")
@ -545,7 +543,7 @@ def get_all_messages_from_js_files(app_name=None):
messages = []
for app in [app_name] if app_name else frappe.get_installed_apps(_ensure_on_bench=True):
if os.path.exists(frappe.get_app_path(app, "public")):
for basepath, folders, files in os.walk(frappe.get_app_path(app, "public")):
for basepath, folders, files in os.walk(frappe.get_app_path(app, "public")): # noqa: B007
if "frappe/public/js/lib" in basepath:
continue
@ -761,6 +759,7 @@ def update_translations(lang, untranslated_file, translated_file, app="_ALL_APPS
for key, value in zip(
frappe.get_file_items(untranslated_file, ignore_empty_lines=False),
frappe.get_file_items(translated_file, ignore_empty_lines=False),
strict=False,
):
# undo hack in get_untranslated
translation_dict[restore_newlines(key)] = restore_newlines(value)

Some files were not shown because too many files have changed in this diff Show more