fix: ruff fixes

Signed-off-by: Akhil Narang <me@akhilnarang.dev>
This commit is contained in:
Akhil Narang 2025-12-22 17:48:19 +05:30
parent e0cad22cdd
commit 8723a2b6ee
No known key found for this signature in database
GPG key ID: 9DCC61E211BF645F
29 changed files with 66 additions and 74 deletions

View file

@ -73,8 +73,8 @@ if TYPE_CHECKING: # pragma: no cover
controllers: dict[str, type] = {}
lazy_controllers: dict[str, type] = {}
local = Local()
cache: Optional["RedisWrapper"] = None
client_cache: Optional["ClientCache"] = None
cache: "RedisWrapper" | None = None
client_cache: "ClientCache" | None = None
STANDARD_USERS = ("Guest", "Administrator")
# this global may be subsequently changed by frappe.tests.utils.toggle_test_mode()
@ -100,10 +100,8 @@ ResponseDict: TypeAlias = _dict[str, Any] # type: ignore[no-any-explicit]
FlagsDict: TypeAlias = _dict[str, Any] # type: ignore[no-any-explicit]
FormDict: TypeAlias = _dict[str, str]
db: LocalProxy[Union["PyMariaDBDatabase", "MariaDBDatabase", "PostgresDatabase", "SQLiteDatabase"]] = local(
"db"
)
qb: LocalProxy[Union["MariaDB", "Postgres", "SQLite"]] = local("qb")
db: LocalProxy["PyMariaDBDatabase" | "MariaDBDatabase" | "PostgresDatabase" | "SQLiteDatabase"] = local("db")
qb: LocalProxy["MariaDB" | "Postgres" | "SQLite"] = local("qb")
conf: LocalProxy[ConfType] = local("conf")
form_dict: LocalProxy[FormDict] = local("form_dict")
form = form_dict
@ -675,7 +673,7 @@ def is_table(doctype: str) -> bool:
def get_precision(
doctype: str, fieldname: str, currency: str | None = None, doc: Optional["Document"] = None
doctype: str, fieldname: str, currency: str | None = None, doc: "Document" | None = None
) -> int:
"""Get precision for a given field"""
from frappe.model.meta import get_field_precision

View file

@ -1140,10 +1140,10 @@ def validate_empty_name(dt, autoname):
frappe.toast(_("Warning: Naming is not set"), indicator="yellow")
def validate_autoincrement_autoname(dt: Union[DocType, "CustomizeForm"]) -> bool:
def validate_autoincrement_autoname(dt: DocType | "CustomizeForm") -> bool:
"""Checks if can doctype can change to/from autoincrement autoname"""
def get_autoname_before_save(dt: Union[DocType, "CustomizeForm"]) -> str:
def get_autoname_before_save(dt: DocType | "CustomizeForm") -> str:
if dt.doctype == "Customize Form":
property_value = frappe.db.get_value(
"Property Setter", {"doc_type": dt.doc_type, "property": "autoname"}, "value"

View file

@ -54,7 +54,7 @@ def get_extension(
filename,
extn: str | None = None,
content: bytes | None = None,
response: Optional["Response"] = None,
response: "Response" | None = None,
) -> str:
mimetype = None
@ -426,7 +426,7 @@ def decode_file_content(content: bytes) -> bytes:
return safe_b64decode(content)
def find_file_by_url(path: str, name: str | None = None) -> Optional["File"]:
def find_file_by_url(path: str, name: str | None = None) -> "File" | None:
filters = {"file_url": str(path)}
if name:
filters["name"] = str(name)

View file

@ -43,9 +43,9 @@ def make_perm_log(doc, method=None):
def insert_perm_log(
doc: Document,
doc_before_save: Document = None,
for_doctype: Optional["str"] = None,
for_document: Optional["str"] = None,
fields: Optional["list | tuple"] = None,
for_doctype: "str" | None = None,
for_document: "str" | None = None,
fields: "list | tuple" | None = None,
):
if frappe.flags.in_install or frappe.flags.in_migrate:
# no need to log changes when migrating or installing app/site

View file

@ -109,7 +109,6 @@ def serialize_worker(worker: Worker) -> frappe._dict:
def compute_utilization(worker: Worker) -> float:
with suppress(Exception):
total_time = (
datetime.datetime.now(datetime.timezone.utc)
- worker.birth_date.replace(tzinfo=datetime.timezone.utc)
datetime.datetime.now(datetime.UTC) - worker.birth_date.replace(tzinfo=datetime.UTC)
).total_seconds()
return worker.total_working_time / total_time * 100

View file

@ -12,7 +12,7 @@ if TYPE_CHECKING:
queue_prefix = "insert_queue_for_"
def deferred_insert(doctype: str, records: list[Union[dict, "Document"]] | str):
def deferred_insert(doctype: str, records: list[dict | "Document"] | str):
if isinstance(records, dict | list):
_records = json.dumps(records)
else:
@ -48,7 +48,7 @@ def save_to_db():
frappe.db.commit()
def insert_record(record: Union[dict, "Document"], doctype: str):
def insert_record(record: dict | "Document", doctype: str):
try:
record.update({"doctype": doctype})
frappe.get_doc(record).insert()

View file

@ -10,7 +10,7 @@ from frappe.utils.data import convert_utc_to_system_timezone
def get_time(path: Path):
return convert_utc_to_system_timezone(
datetime.datetime.fromtimestamp(path.stat().st_mtime, tz=datetime.timezone.utc)
datetime.datetime.fromtimestamp(path.stat().st_mtime, tz=datetime.UTC)
).strftime("%a %b %d %H:%M %Y")

View file

@ -3,9 +3,10 @@
import json
import re
from typing import TypedDict
from typing_extensions import NotRequired # not required in 3.11+
from typing import (
NotRequired, # not required in 3.11+
TypedDict,
)
import frappe

View file

@ -148,7 +148,7 @@ def sendmail(
email_read_tracker_url=None,
x_priority: Literal[1, 3, 5] = 3,
email_headers=None,
) -> Optional["EmailQueue"]:
) -> "EmailQueue" | None:
"""Send email using user's default **Email Account** or global default **Email Account**.

View file

@ -403,7 +403,7 @@ class Email:
if self.mail["Date"]:
try:
utc = email.utils.mktime_tz(email.utils.parsedate_tz(self.mail["Date"]))
utc_dt = datetime.datetime.fromtimestamp(utc, tz=datetime.timezone.utc)
utc_dt = datetime.datetime.fromtimestamp(utc, tz=datetime.UTC)
self.date = convert_utc_to_system_timezone(utc_dt).strftime("%Y-%m-%d %H:%M:%S")
except Exception:
self.date = now()

View file

@ -74,8 +74,8 @@ class TokenCache(Document):
def get_expires_in(self):
system_timezone = ZoneInfo(get_system_timezone())
modified: datetime.datetime = get_datetime(self.modified).replace(tzinfo=system_timezone)
expiry_utc = modified.astimezone(datetime.timezone.utc) + datetime.timedelta(seconds=self.expires_in)
now_utc = datetime.datetime.now(datetime.timezone.utc)
expiry_utc = modified.astimezone(datetime.UTC) + datetime.timedelta(seconds=self.expires_in)
now_utc = datetime.datetime.now(datetime.UTC)
return cint((expiry_utc - now_utc).total_seconds())
def is_expired(self):

View file

@ -9,9 +9,8 @@ from collections.abc import Generator, Iterable
from contextlib import contextmanager
from functools import wraps
from types import MappingProxyType
from typing import TYPE_CHECKING, Any, Literal, Optional, TypeAlias, Union, overload
from typing import TYPE_CHECKING, Any, Literal, Optional, Self, TypeAlias, Union, overload, override
from typing_extensions import Self, override
from werkzeug.exceptions import NotFound
import frappe
@ -34,7 +33,7 @@ from frappe.utils.data import get_absolute_url, get_datetime, get_timedelta, get
from frappe.utils.global_search import update_global_search
if TYPE_CHECKING:
from typing_extensions import Self
from typing import Self
from frappe.core.doctype.docfield.docfield import DocField
@ -614,7 +613,7 @@ class Document(BaseDocument):
for df in self.meta.get_table_fields():
self.update_child_table(df.fieldname, df)
def update_child_table(self, fieldname: str, df: Optional["DocField"] = None):
def update_child_table(self, fieldname: str, df: "DocField" | None = None):
"""sync child table for given fieldname"""
df: DocField = df or self.meta.get_field(fieldname)
if df.is_virtual:
@ -1994,7 +1993,7 @@ def bulk_insert(
def _document_values_generator(
documents: Iterable["Document"],
columns: list[str],
) -> Generator[tuple[Any], None, None]:
) -> Generator[tuple[Any]]:
for doc in documents:
doc.creation = doc.modified = now()
doc.owner = doc.modified_by = frappe.session.user
@ -2140,7 +2139,7 @@ def copy_doc(doc: "Document", ignore_no_copy: bool = True) -> "Document":
def new_doc(
doctype: str,
*,
parent_doc: Optional["Document"] = None,
parent_doc: "Document" | None = None,
parentfield: str | None = None,
as_dict: bool = False,
**kwargs,

View file

@ -200,7 +200,7 @@ def set_new_name(doc):
doc.name = validate_name(doc.doctype, doc.name)
def is_autoincremented(doctype: str, meta: Optional["Meta"] = None) -> bool:
def is_autoincremented(doctype: str, meta: "Meta" | None = None) -> bool:
"""Checks if the doctype has autoincrement autoname set"""
if not meta:
@ -328,7 +328,7 @@ def _generate_random_string(length=10):
def parse_naming_series(
parts: list[str] | str,
doctype=None,
doc: Optional["Document"] = None,
doc: "Document" | None = None,
number_generator: Callable[[str, int], str] | None = None,
) -> str:
"""Parse the naming series and get next name.

View file

@ -40,7 +40,7 @@ def get_workflow_name(doctype):
@frappe.whitelist()
def get_transitions(
doc: Union["Document", str, dict], workflow: "Workflow" = None, raise_exception: bool = False
doc: "Document" | str | dict, workflow: "Workflow" = None, raise_exception: bool = False
) -> list[dict]:
"""Return list of possible transitions for the given doc"""
from frappe.model.document import Document

View file

@ -301,9 +301,7 @@ def get_app_publisher(module: str) -> str:
return frappe.get_hooks(hook="app_publisher", app_name=app)[0]
def make_boilerplate(
template: str, doc: Union["Document", "frappe._dict"], opts: Union[dict, "frappe._dict"] = None
):
def make_boilerplate(template: str, doc: "Document" | "frappe._dict", opts: dict | "frappe._dict" = None):
target_path = get_doc_path(doc.module, doc.doctype, doc.name)
template_name = template.replace("controller", scrub(doc.name))
if template_name.endswith("._py"):

View file

@ -52,7 +52,7 @@ class Monitor:
self.data = frappe._dict(
{
"site": frappe.local.site,
"timestamp": datetime.datetime.now(datetime.timezone.utc),
"timestamp": datetime.datetime.now(datetime.UTC),
"transaction_type": transaction_type,
"uuid": str(uuid.uuid4()),
}
@ -85,7 +85,7 @@ class Monitor:
if job := rq.get_current_job():
self.data.job_id = job.id
waitdiff = self.data.timestamp - job.enqueued_at.replace(tzinfo=datetime.timezone.utc)
waitdiff = self.data.timestamp - job.enqueued_at.replace(tzinfo=datetime.UTC)
self.data.job.wait = int(waitdiff.total_seconds() * 1000000)
def add_custom_data(self, **kwargs):
@ -94,7 +94,7 @@ class Monitor:
def dump(self, response=None):
try:
timediff = datetime.datetime.now(datetime.timezone.utc) - self.data.timestamp
timediff = datetime.datetime.now(datetime.UTC) - self.data.timestamp
# Obtain duration in microseconds
self.data.duration = int(timediff.total_seconds() * 1000000)

View file

@ -1,5 +1,5 @@
import hashlib
from datetime import datetime, timezone
from datetime import UTC, datetime, timezone
import frappe
@ -83,7 +83,7 @@ def get_frappe_version() -> str:
def utc_iso() -> str:
return datetime.now(timezone.utc).isoformat()
return datetime.now(UTC).isoformat()
def get_app_version(app_name: str) -> str:

View file

@ -8,7 +8,7 @@ permission, homepage, default variables, system defaults etc
"""
import json
from datetime import datetime, timezone
from datetime import UTC, datetime, timezone
from urllib.parse import unquote
import redis
@ -370,7 +370,7 @@ class Session:
if self.time_diff > expiry or (
(session_end := session_data.get("session_end"))
and datetime.now(tz=timezone.utc) > datetime.fromisoformat(session_end)
and datetime.now(tz=UTC) > datetime.fromisoformat(session_end)
):
self._delete_session()
data = None

View file

@ -165,7 +165,7 @@ class TestAuth(IntegrationTestCase):
client = FrappeClient(self.HOST_NAME, self.test_user_email, self.test_user_password)
expiry_time = next(x for x in client.session.cookies if x.name == "sid").expires
current_time = datetime.datetime.now(tz=datetime.timezone.utc).timestamp()
current_time = datetime.datetime.now(tz=datetime.UTC).timestamp()
self.assertAlmostEqual(get_expiry_in_seconds(), expiry_time - current_time, delta=60 * 60)

View file

@ -5,7 +5,7 @@ import io
import json
import os
import sys
from datetime import date, datetime, time, timedelta, timezone
from datetime import UTC, date, datetime, time, timedelta, timezone
from decimal import ROUND_HALF_UP, Decimal, localcontext
from enum import Enum
from io import StringIO
@ -961,9 +961,9 @@ class TestResponse(IntegrationTestCase):
minute=23,
second=23,
microsecond=23,
tzinfo=timezone.utc,
tzinfo=UTC,
),
time(hour=23, minute=23, second=23, microsecond=23, tzinfo=timezone.utc),
time(hour=23, minute=23, second=23, microsecond=23, tzinfo=UTC),
timedelta(days=10, hours=12, minutes=120, seconds=10),
],
"float": [

View file

@ -137,9 +137,7 @@ def load_test_records_for(index_doctype) -> dict[str, Any]:
# Test record generation
def _generate_all_records_towards(
index_doctype, reset=False, commit=False
) -> Generator[tuple[str, int], None, None]:
def _generate_all_records_towards(index_doctype, reset=False, commit=False) -> Generator[tuple[str, int]]:
"""Generate test records for the given doctype and its dependencies."""
# NOTE: visited excludes dependency discovery of any index doctype which
@ -155,7 +153,7 @@ def _generate_all_records_towards(
def _generate_records_for(
index_doctype: str, reset: bool = False, commit: bool = False, initial_doctype: str | None = None
) -> Generator[tuple[str, "Document"], None, None]:
) -> Generator[tuple[str, "Document"]]:
"""Create and yield test records for a specific doctype."""
test_module: ModuleType
@ -204,7 +202,7 @@ test_record_manager_instance = None
def _sync_records(
index_doctype: str, test_records: dict[str, list], reset: bool = False, commit: bool = False
) -> Generator[tuple[str, "Document"], None, None]:
) -> Generator[tuple[str, "Document"]]:
"""Generate test objects for a register doctype from provided records, with caching and persistence."""
# NOTE: This method is called in roughly these situations:
# 1. First sync of a index doctype's records

View file

@ -4,10 +4,9 @@ from collections.abc import Generator, Iterable, Mapping, Sequence
from datetime import date, datetime
from itertools import groupby
from operator import attrgetter
from typing import Any, NamedTuple, TypeAlias, TypeGuard, TypeVar, cast
from typing import Any, NamedTuple, Self, TypeAlias, TypeGuard, TypeVar, cast, override
from pypika import Column
from typing_extensions import Self, override
Doct: TypeAlias = str
Fld: TypeAlias = str
@ -254,7 +253,7 @@ class Filters(list[FilterTuple]):
optimized.extend(filters)
else:
def _values() -> Generator[_Value, None, None]:
def _values() -> Generator[_Value]:
for f in filters:
# f.value is already narrowed to Val when we optimize over fully initialized FilterTuple
yield cast(_Value, f.value) # = operator only is allowed to have _Value

View file

@ -1,12 +1,12 @@
from collections.abc import Iterable, Mapping
from typing import (
TYPE_CHECKING,
Self,
TypeVar,
overload,
override,
)
from typing_extensions import Self, override
_KT = TypeVar("_KT")
_VT = TypeVar("_VT")

View file

@ -925,7 +925,7 @@ def get_safe_filters(filters):
return filters
def create_batch(iterable: Iterable, size: int) -> Generator[Iterable, None, None]:
def create_batch(iterable: Iterable, size: int) -> Generator[Iterable]:
"""Convert an iterable to multiple batches of constant size of batch_size.
Args:

View file

@ -116,7 +116,7 @@ def is_invalid_date_string(date_string: str) -> bool:
def getdate(
string_date: Optional["DateTimeLikeObject"] = None, parse_day_first: bool = False
string_date: "DateTimeLikeObject" | None = None, parse_day_first: bool = False
) -> datetime.date | None:
"""
Convert string date (yyyy-mm-dd) to datetime.date object.
@ -148,7 +148,7 @@ def getdate(
def get_datetime(
datetime_str: Optional["DateTimeLikeObject"] | tuple | list = None,
datetime_str: "DateTimeLikeObject" | None | tuple | list = None,
) -> datetime.datetime | None:
"""Return the below mentioned values based on the given `datetime_str`:
@ -373,7 +373,7 @@ def now_datetime() -> datetime.datetime:
return datetime.datetime.now(ZoneInfo(get_system_timezone())).replace(tzinfo=None)
def get_timestamp(date: Optional["DateTimeLikeObject"] = None) -> float:
def get_timestamp(date: "DateTimeLikeObject" | None = None) -> float:
"""Return the Unix timestamp (seconds since Epoch) for the given `date`.
If `date` is None, the current timestamp is returned.
"""
@ -402,7 +402,7 @@ def convert_utc_to_timezone(utc_timestamp: datetime.datetime, time_zone: str) ->
def get_datetime_in_timezone(time_zone: str) -> datetime.datetime:
"""Return the current datetime in the given timezone (e.g. 'Asia/Kolkata')."""
utc_timestamp = datetime.datetime.now(datetime.timezone.utc)
utc_timestamp = datetime.datetime.now(datetime.UTC)
return convert_utc_to_timezone(utc_timestamp, time_zone)
@ -2404,7 +2404,7 @@ def to_markdown(html: str) -> str:
pass
def md_to_html(markdown_text: str) -> Optional["UnicodeWithAttrs"]:
def md_to_html(markdown_text: str) -> "UnicodeWithAttrs" | None:
"""Convert the given markdown text to HTML and returns it."""
from markdown2 import MarkdownError
from markdown2 import markdown as _markdown
@ -2424,7 +2424,7 @@ def md_to_html(markdown_text: str) -> Optional["UnicodeWithAttrs"]:
pass
def markdown(markdown_text: str) -> Optional["UnicodeWithAttrs"]:
def markdown(markdown_text: str) -> "UnicodeWithAttrs" | None:
"""Convert the given markdown text to HTML and returns it."""
return md_to_html(markdown_text)

View file

@ -171,7 +171,7 @@ class CDPSocketClient:
event = event[1]
try:
self.loop.run_until_complete(asyncio.wait_for(event, timeout))
except asyncio.TimeoutError:
except TimeoutError:
frappe.log_error(title="Timeout waiting for event", message=f"{frappe.get_traceback()}")
def remove_listener(self, method, event):

View file

@ -84,7 +84,7 @@ def sleep_duration(tick):
# This makes scheduler aligned with real clock,
# so event scheduled at 12:00 happen at 12:00 and not 12:00:35.
minutes = tick // 60
now = datetime.datetime.now(datetime.timezone.utc)
now = datetime.datetime.now(datetime.UTC)
left_minutes = minutes - now.minute % minutes
next_execution = now.replace(second=0) + datetime.timedelta(minutes=left_minutes)

View file

@ -151,7 +151,7 @@ class WebsiteTheme(Document):
return [{"name": app, "title": values["title"]} for app, values in apps.items()]
def get_active_theme() -> Optional["WebsiteTheme"]:
def get_active_theme() -> "WebsiteTheme" | None:
if website_theme := frappe.get_website_settings("website_theme"):
try:
return frappe.client_cache.get_doc("Website Theme", website_theme)

View file

@ -118,7 +118,7 @@ def get_context(context) -> PrintContext:
}
def get_print_format_doc(print_format_name: str, meta: "Meta") -> Optional["PrintFormat"]:
def get_print_format_doc(print_format_name: str, meta: "Meta") -> "PrintFormat" | None:
"""Return print format document."""
if not print_format_name:
print_format_name = frappe.form_dict.format or meta.default_print_format or "Standard"
@ -135,7 +135,7 @@ def get_print_format_doc(print_format_name: str, meta: "Meta") -> Optional["Prin
def get_rendered_template(
doc: "Document",
print_format: Optional["PrintFormat"] = None,
print_format: "PrintFormat" | None = None,
meta: "Meta" = None,
no_letterhead: bool | None = None,
letterhead: str | None = None,
@ -281,7 +281,7 @@ def set_link_titles(doc: "Document") -> None:
def set_title_values_for_link_and_dynamic_link_fields(
meta: "Meta", doc: "Document", parent_doc: Optional["Document"] = None
meta: "Meta", doc: "Document", parent_doc: "Document" | None = None
) -> None:
if parent_doc and not parent_doc.get("__link_titles"):
setattr(parent_doc, "__link_titles", {})
@ -586,7 +586,7 @@ def has_value(df: "DocField", doc: "Document") -> bool:
def get_print_style(
style: str | None = None, print_format: Optional["PrintFormat"] = None, for_legacy: bool = False
style: str | None = None, print_format: "PrintFormat" | None = None, for_legacy: bool = False
) -> str:
print_settings = frappe.get_doc("Print Settings")
@ -618,7 +618,7 @@ def get_print_style(
def get_font(
print_settings: "PrintSettings", print_format: Optional["PrintFormat"] = None, for_legacy=False
print_settings: "PrintSettings", print_format: "PrintFormat" | None = None, for_legacy=False
) -> str:
default = """
"InterVariable", "Inter", -apple-system", "BlinkMacSystemFont",