# Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and Contributors # License: MIT. See LICENSE import functools import io import os import shutil import sys import traceback from collections import deque from collections.abc import ( Callable, Container, Generator, Iterable, MutableMapping, MutableSequence, Sequence, ) from email.header import decode_header, make_header from email.utils import formataddr, parseaddr from typing import Any, Generic, TypeAlias, TypedDict from werkzeug.test import Client from frappe.deprecation_dumpster import gzip_compress, gzip_decompress, make_esc # utility functions like cint, int, flt, etc. from frappe.utils.data import * from frappe.utils.html_utils import sanitize_html EMAIL_NAME_PATTERN = re.compile(r"[^A-Za-z0-9\u00C0-\u024F\/\_\' ]+") EMAIL_STRING_PATTERN = re.compile(r"([a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+)") NON_MD_HTML_PATTERN = re.compile(r"|") HTML_TAGS_PATTERN = re.compile(r"\<[^>]*\>") INCLUDE_DIRECTIVE_PATTERN = re.compile("""({% include ['"]([^'"]*)['"] %})""") PHONE_NUMBER_PATTERN = re.compile(r"([0-9\ \+\_\-\,\.\*\#\(\)]){1,20}$") PERSON_NAME_PATTERN = re.compile(r"^[\w][\w\'\-]*( \w[\w\'\-]*)*$") WHITESPACE_PATTERN = re.compile(r"[\t\n\r]") MULTI_EMAIL_STRING_PATTERN = re.compile(r'[,\n](?=(?:[^"]|"[^"]*")*$)') EMAIL_MATCH_PATTERN = re.compile( r"[a-z0-9!#$%&'*+/=?^_`{|}~-]+(?:\.[a-z0-9!#$%&'*+/=?^_`{|}~-]+)*@(?:[a-z0-9](?:[a-z0-9-]*[a-z0-9])?\.)+[a-z0-9](?:[a-z0-9-]*[a-z0-9])?", re.IGNORECASE, ) UNSET = object() PropertyType: TypeAlias = property | functools.cached_property if sys.version_info < (3, 11): def exception(): _exc_type, exc_value, _exc_traceback = sys.exc_info() return exc_value sys.exception = exception def get_fullname(user=None): """get the full name (first name + last name) of the user from User""" if not user: user = frappe.session.user if not hasattr(frappe.local, "fullnames"): frappe.local.fullnames = {} if not frappe.local.fullnames.get(user): p = frappe.db.get_value("User", user, ["first_name", "last_name"], as_dict=True) if p: frappe.local.fullnames[user] = ( " ".join(filter(None, [p.get("first_name"), p.get("last_name")])) or user ) else: frappe.local.fullnames[user] = user return frappe.local.fullnames.get(user) def get_email_address(user=None): """get the email address of the user from User""" if not user: user = frappe.session.user return frappe.db.get_value("User", user, "email") def get_formatted_email(user, mail=None): """get Email Address of user formatted as: `John Doe `""" fullname = get_fullname(user) method = get_hook_method("get_sender_details") if method: sender_name, mail = method() # if method exists but sender_name is "" fullname = sender_name or fullname if not mail: mail = get_email_address(user) or validate_email_address(user) if not mail: return "" else: return cstr(make_header(decode_header(formataddr((fullname, mail))))) def extract_email_id(email): """fetch only the email part of the Email Address""" return cstr(parse_addr(email)[1]) def validate_phone_number_with_country_code(phone_number: str, fieldname: str) -> None: from phonenumbers import NumberParseException, is_valid_number, parse from frappe import _ if not phone_number: return valid_number = False error_message = _("Phone Number {0} set in field {1} is not valid.") error_title = _("Invalid Phone Number") try: if valid_number := is_valid_number(parse(phone_number)): return True except NumberParseException as e: if e.error_type == NumberParseException.INVALID_COUNTRY_CODE: error_message = _("Please select a country code for field {1}.") error_title = _("Country Code Required") finally: if not valid_number: frappe.throw( error_message.format(frappe.bold(phone_number), frappe.bold(fieldname)), title=error_title, exc=frappe.InvalidPhoneNumberError, ) def validate_phone_number(phone_number, throw=False): """Return True if valid phone number.""" if not phone_number: return False phone_number = phone_number.strip() match = PHONE_NUMBER_PATTERN.match(phone_number) if not match and throw: frappe.throw( frappe._("{0} is not a valid Phone Number").format(phone_number), frappe.InvalidPhoneNumberError ) return bool(match) def validate_name(name, throw=False): """Return True if the name is valid * valid names may have unicode and ascii characters, dash, quotes, numbers * anything else is considered invalid Note: "Name" here is name of a person, not the primary key in Frappe doctypes. """ if not name: return False name = name.strip() match = PERSON_NAME_PATTERN.match(name) if not match and throw: frappe.throw(frappe._("{0} is not a valid Name").format(name), frappe.InvalidNameError) return bool(match) def validate_email_address(email_str, throw=False): """Validates the email string""" email = email_str = (email_str or "").strip() def _check(e): _valid = True if not e: _valid = False if "undisclosed-recipient" in e: return False elif " " in e and "<" not in e: # example: "test@example.com test2@example.com" will return "test@example.comtest2" after parseaddr!!! _valid = False else: email_id = extract_email_id(e) match = EMAIL_MATCH_PATTERN.match(email_id) if email_id else None if not match: _valid = False if not _valid: if throw: invalid_email = frappe.utils.escape_html(e) frappe.throw( frappe._("{0} is not a valid Email Address").format(invalid_email), frappe.InvalidEmailAddressError, ) return None else: return email_id out = [] for e in email_str.split(","): if not e: continue email = _check(e.strip()) if email: out.append(email) return ", ".join(out) def split_emails(txt): email_list = [] # emails can be separated by comma or newline s = WHITESPACE_PATTERN.sub(" ", cstr(txt)) for email in MULTI_EMAIL_STRING_PATTERN.split(s): email = strip(cstr(email)) if email: email_list.append(email) return email_list def validate_url( txt: str, throw: bool = False, valid_schemes: str | Container[str] | None = None, ) -> bool: """ Return True if `txt` represents a valid URL. Args: throw: throws a validationError if URL is not valid valid_schemes: if provided checks the given URL's scheme against this """ url = urlparse(txt) is_valid = bool(url.scheme and (url.netloc or url.path)) or bool(txt and txt.startswith("/")) # Handle scheme validation if isinstance(valid_schemes, str): is_valid = is_valid and (url.scheme == valid_schemes) elif isinstance(valid_schemes, list | tuple | set): is_valid = is_valid and (url.scheme in valid_schemes) if not is_valid and throw: frappe.throw(frappe._("'{0}' is not a valid URL").format(frappe.bold(txt))) return is_valid def random_string(length: int) -> str: """generate a random string""" import string from random import choice return "".join(choice(string.ascii_letters + string.digits) for i in range(length)) def has_gravatar(email: str) -> str: """Return gravatar url if user has set an avatar at gravatar.com.""" import requests if frappe.flags.in_import or frappe.flags.in_install or frappe.in_test: # no gravatar if via upload # since querying gravatar for every item will be slow return "" gravatar_url = get_gravatar_url(email, "404") try: res = requests.get(gravatar_url, timeout=5) if res.status_code == 200: return gravatar_url else: return "" except requests.exceptions.RequestException: return "" def get_gravatar_url(email: str, default: Literal["mm", "404"] = "mm") -> str: """Return gravatar URL for the given email. If `default` is set to "404", gravatar URL will return 404 if no avatar is found. If `default` is set to "mm", a placeholder image will be returned. """ hexdigest = hashlib.md5(frappe.as_unicode(email).encode("utf-8"), usedforsecurity=False).hexdigest() return f"https://secure.gravatar.com/avatar/{hexdigest}?d={default}&s=200" def get_gravatar(email: str) -> str: """Return gravatar URL if user has set an avatar at gravatar.com. Else return identicon image (base64).""" from frappe.utils.identicon import Identicon return has_gravatar(email) or Identicon(email).base64() def get_traceback(with_context: bool = False) -> str: """Return the traceback of the Exception.""" from traceback_with_variables import iter_exc_lines exc_type, exc_value, exc_tb = sys.exc_info() if not any([exc_type, exc_value, exc_tb]): return "" if with_context: trace_list = iter_exc_lines(fmt=_get_traceback_sanitizer()) tb = "\n".join(trace_list) else: trace_list = traceback.format_exception(exc_type, exc_value, exc_tb) tb = "".join(cstr(t) for t in trace_list) bench_path = get_bench_path() + "/" return tb.replace(bench_path, "") @functools.lru_cache(maxsize=1) def _get_traceback_sanitizer(): from traceback_with_variables import Format blocklist = [ "password", "passwd", "secret", "token", "key", "pwd", ] placeholder = "********" def dict_printer(v: dict) -> str: from copy import deepcopy v = deepcopy(v) for key in blocklist: if key in v: v[key] = placeholder return str(v) # Adapted from https://github.com/andy-landy/traceback_with_variables/blob/master/examples/format_customized.py # Reused under MIT license: https://github.com/andy-landy/traceback_with_variables/blob/master/LICENSE return Format( custom_var_printers=[ # redact variables *[(variable_name, lambda *a, **kw: placeholder) for variable_name in blocklist], # redact dictionary keys (["_secret", dict, lambda *a, **kw: False], dict_printer), (["_secret", frappe._dict, lambda *a, **kw: False], dict_printer), ], ) def log(event, details): frappe.logger(event).info(details) def dict_to_str(args: dict[str, Any], sep: str = "&") -> str: """Convert a dictionary to URL.""" return sep.join(f"{k!s}=" + quote(str(args[k] or "")) for k in list(args)) def list_to_str(seq, sep=", "): """Convert a sequence into a string using seperator. Same as str.join, but does type conversion and strip extra spaces. """ return sep.join(map(str.strip, map(str, seq))) # Get Defaults # ============================================================================== def get_defaults(key=None): """ Get dictionary of default values from the defaults, or a value if key is passed """ return frappe.db.get_defaults(key) def set_default(key, val): """ Set / add a default value to defaults` """ return frappe.db.set_default(key, val) def remove_blanks(d: dict) -> dict: """Return d with empty ('' or None) values stripped. Mutates inplace.""" for k, v in tuple(d.items()): if not v: del d[k] return d def strip_html_tags(text: str) -> str: """Remove html tags from the given `text`.""" return HTML_TAGS_PATTERN.sub("", text) def get_file_timestamp(fn): """Return timestamp of the given file.""" from frappe.utils import cint try: return str(cint(os.stat(fn).st_mtime)) except OSError as e: if e.args[0] != 2: raise else: return None # esc / unescape characters -- used for command line def esc(s, esc_chars): """ Escape special characters """ if not s: return "" for c in esc_chars: esc_str = "\\" + c s = s.replace(c, esc_str) return s def unesc(s, esc_chars): """ UnEscape special characters """ for c in esc_chars: esc_str = "\\" + c s = s.replace(esc_str, c) return s def execute_in_shell(cmd, verbose=False, low_priority=False, check_exit_code=False): # using Popen instead of os.system - as recommended by python docs import shlex import tempfile from subprocess import Popen if isinstance(cmd, list): # ensure it's properly escaped; only a single string argument executes via shell cmd = shlex.join(cmd) with tempfile.TemporaryFile() as stdout, tempfile.TemporaryFile() as stderr: kwargs = { "shell": True, "stdout": stdout, "stderr": stderr, "executable": shutil.which("bash") or "/bin/bash", } if low_priority: kwargs["preexec_fn"] = lambda: os.nice(10) p = Popen(cmd, **kwargs) exit_code = p.wait() stdout.seek(0) out = stdout.read() stderr.seek(0) err = stderr.read() failed = check_exit_code and exit_code if verbose or failed: if err: print(err) if out: print(out) if failed: raise frappe.CommandFailedError( "Command failed", out.decode(errors="replace"), err.decode(errors="replace") ) return err, out def get_path(*path, **kwargs): base = kwargs.get("base") if not base: base = frappe.local.site_path return os.path.join(base, *path) def get_site_base_path(): return frappe.local.site_path def get_site_path(*path): return get_path(*path, base=get_site_base_path()) def get_files_path(*path, **kwargs): return get_site_path("private" if kwargs.get("is_private") else "public", "files", *path) def get_bench_path(): return os.environ.get("FRAPPE_BENCH_ROOT") or os.path.realpath( os.path.join(os.path.dirname(frappe.__file__), "..", "..", "..") ) def get_bench_id(): return frappe.get_conf().get("bench_id", get_bench_path().strip("/").replace("/", "-")) def get_site_id(site=None): return f"{site or frappe.local.site}@{get_bench_id()}" def get_backups_path(): return get_site_path("private", "backups") def get_request_site_address(full_address=False): return get_url(full_address=full_address) def get_site_url(site): conf = frappe.get_conf(site) if conf.host_name: return conf.host_name return f"http://{site}:{conf.webserver_port}" def encode_dict(d, encoding="utf-8"): for key in d: if isinstance(d[key], str) and isinstance(d[key], str): d[key] = d[key].encode(encoding) return d def decode_dict(d, encoding="utf-8"): for key in d: if isinstance(d[key], str) and not isinstance(d[key], str): d[key] = d[key].decode(encoding, "ignore") return d @functools.lru_cache def get_site_name(hostname): return hostname.split(":", 1)[0] def get_disk_usage(): """get disk usage of files folder""" files_path = get_files_path() if not os.path.exists(files_path): return 0 err, out = execute_in_shell(f"du -hsm {files_path}") return cint(out.split("\n")[-2].split("\t")[0]) def touch_file(path): with open(path, "a"): os.utime(path, None) return path def get_test_client(use_cookies=True) -> Client: """Return an test instance of the Frappe WSGI.""" from frappe.app import application return Client(application, use_cookies=use_cookies) def get_hook_method(hook_name, fallback=None): method = frappe.get_hooks().get(hook_name) if method: method = frappe.get_attr(method[0]) return method if fallback: return fallback def call_hook_method(hook, *args, **kwargs): out = None for method_name in frappe.get_hooks(hook): out = out or frappe.get_attr(method_name)(*args, **kwargs) return out def is_cli() -> bool: """Return True if current instance is being run via a terminal.""" invoked_from_terminal = False try: invoked_from_terminal = bool(os.get_terminal_size()) except Exception: invoked_from_terminal = sys.stdin and sys.stdin.isatty() return invoked_from_terminal def update_progress_bar(txt, i, l, absolute=False): if os.environ.get("CI"): if i == 0: sys.stdout.write(txt) sys.stdout.write(".") sys.stdout.flush() return if not getattr(frappe.local, "request", None) or is_cli(): # pragma: no cover lt = len(txt) try: col = 40 if os.get_terminal_size().columns > 80 else 20 except OSError: # in case function isn't being called from a terminal col = 40 if lt < 36: txt = txt + " " * (36 - lt) complete = int(float(i + 1) / l * col) completion_bar = ("=" * complete).ljust(col, " ") percent_complete = f"{int(float(i + 1) / l * 100)!s}%" status = f"{i} of {l}" if absolute else percent_complete sys.stdout.write(f"\r{txt}: [{completion_bar}] {status}") sys.stdout.flush() def get_html_format(print_path): html_format = None if os.path.exists(print_path): with open(print_path) as f: html_format = f.read() for include_directive, path in INCLUDE_DIRECTIVE_PATTERN.findall(html_format): for app_name in frappe.get_installed_apps(): include_path = frappe.get_app_path(app_name, *path.split(os.path.sep)) if os.path.exists(include_path): with open(include_path) as f: html_format = html_format.replace(include_directive, f.read()) break return html_format def is_markdown(text): if "" in text: return True elif "" in text: return False else: return not NON_MD_HTML_PATTERN.search(text) def is_a_property(x) -> bool: """Get properties (@property, @cached_property) in a controller class""" return isinstance(x, PropertyType) def get_sites(sites_path=None): if not sites_path: sites_path = getattr(frappe.local, "sites_path", None) or "." sites = [] for site in os.listdir(sites_path): path = os.path.join(sites_path, site) if ( os.path.isdir(path) and not os.path.islink(path) and os.path.exists(os.path.join(path, "site_config.json")) ): # is a dir and has site_config.json sites.append(site) return sorted(sites) def get_request_session(max_retries=5): import requests from requests.adapters import HTTPAdapter, Retry session = requests.Session() http_adapter = HTTPAdapter(max_retries=Retry(total=max_retries, status_forcelist=[500])) session.mount("http://", http_adapter) session.mount("https://", http_adapter) return session def markdown(text, sanitize=True, linkify=True): html = text if is_html(text) else frappe.utils.md_to_html(text) if sanitize: html = html.replace("", "") html = sanitize_html(html, linkify=linkify) return html def sanitize_email(emails): sanitized = [] for e in split_emails(emails): if not validate_email_address(e): continue full_name, email_id = parse_addr(e) sanitized.append(formataddr((full_name, email_id))) return ", ".join(sanitized) def parse_addr(email_string): """ Return email_id and user_name based on email string Raise error if email string is not valid """ name, email = parseaddr(email_string) if check_format(email): name = get_name_from_email_string(email_string, email, name) return (name, email) else: email_list = EMAIL_STRING_PATTERN.findall(email_string) if len(email_list) > 0 and check_format(email_list[0]): # take only first email address email = email_list[0] name = get_name_from_email_string(email_string, email, name) return (name, email) return (None, email) def check_format(email_id): """ Check if email_id is valid. valid email:text@example.com String check ensures that email_id contains both '.' and '@' and index of '@' is less than '.' """ is_valid = False try: pos = email_id.rindex("@") is_valid = pos > 0 and (email_id.rindex(".") > pos) and (len(email_id) - pos > 4) except Exception: # print(e) pass return is_valid def get_name_from_email_string(email_string, email_id, name): name = email_string.replace(email_id, "") name = EMAIL_NAME_PATTERN.sub("", name).strip() if not name: name = email_id return name def get_installed_apps_info(): out = [] from frappe.utils.change_log import get_versions out.extend( { "app_name": app, "version": version_details.get("branch_version") or version_details.get("version"), "branch": version_details.get("branch"), } for app, version_details in get_versions().items() ) return out def get_site_info(): from frappe.email.queue import get_emails_sent_this_month from frappe.utils.user import get_system_managers # only get system users users = frappe.get_all( "User", filters={"user_type": "System User", "name": ("not in", frappe.STANDARD_USERS)}, fields=["name", "enabled", "last_login", "last_active", "language", "time_zone"], ) system_managers = get_system_managers(only_name=True) for u in users: # tag system managers u.is_system_manager = 1 if u.name in system_managers else 0 u.full_name = get_fullname(u.name) u.email = u.name del u["name"] system_settings = frappe.db.get_singles_dict("System Settings") space_usage = frappe._dict((frappe.local.conf.limits or {}).get("space_usage", {})) kwargs = { "fields": ["user", "creation", "full_name"], "filters": {"operation": "Login", "status": "Success"}, "limit": "10", } site_info = { "installed_apps": get_installed_apps_info(), "users": users, "country": system_settings.country, "language": system_settings.language or "english", "time_zone": system_settings.time_zone, "setup_complete": frappe.is_setup_complete(), "scheduler_enabled": system_settings.enable_scheduler, # usage "emails_sent": get_emails_sent_this_month(), "space_used": flt((space_usage.total or 0) / 1024.0, 2), "database_size": space_usage.database_size, "backup_size": space_usage.backup_size, "files_size": space_usage.files_size, "last_logins": frappe.get_all("Activity Log", **kwargs), } # from other apps for method_name in frappe.get_hooks("get_site_info"): site_info.update(frappe.get_attr(method_name)(site_info) or {}) # dumps -> loads to prevent datatype conflicts return json.loads(frappe.as_json(site_info)) def parse_json(val: str): """ Parses json if string else return """ if isinstance(val, str): val = json.loads(val) if isinstance(val, dict): val = frappe._dict(val) return val def get_db_count(*args): """ Pass a doctype or a series of doctypes to get the count of docs in them. Parameters: *args: Variable length argument list of doctype names whose doc count you need Return: dict: A dict with the count values. Example: via terminal: bench --site erpnext.local execute frappe.utils.get_db_count --args "['DocType', 'Communication']" """ db_count = {} for doctype in args: db_count[doctype] = frappe.db.count(doctype) return json.loads(frappe.as_json(db_count)) def call(fn, *args, **kwargs): """ Pass a doctype or a series of doctypes to get the count of docs in them Parameters: fn: frappe function to be called Return: based on the function you call: output of the function you call Example: via terminal: bench --site erpnext.local execute frappe.utils.call --args '''["frappe.get_all", "Activity Log"]''' --kwargs '''{"fields": ["user", "creation", "full_name"], "filters":{"Operation": "Login", "Status": "Success"}, "limit": "10"}''' """ return json.loads(frappe.as_json(frappe.call(fn, *args, **kwargs))) def get_safe_filters(filters): try: filters = json.loads(filters) if isinstance(filters, int | float): filters = frappe.as_unicode(filters) except (TypeError, ValueError): # filters are not passed, not json pass return filters def create_batch(iterable: Iterable, size: int) -> Generator[Iterable, None, None]: """Convert an iterable to multiple batches of constant size of batch_size. Args: iterable (Iterable): Iterable object which is subscriptable size (int): Maximum size of batches to be generated Yields: Generator[List]: Batched iterable of maximum length `size` """ total_count = len(iterable) for i in range(0, total_count, size): yield iterable[i : min(i + size, total_count)] def set_request(**kwargs): from werkzeug.test import EnvironBuilder from werkzeug.wrappers import Request builder = EnvironBuilder(**kwargs) frappe.local.request = Request(builder.get_environ()) def get_html_for_route(route): from frappe.website.serve import get_response set_request(method="GET", path=route) response = get_response() return frappe.safe_decode(response.get_data()) def get_file_size(path, format=False): num = os.path.getsize(path) if not format: return num suffix = "B" for unit in ["", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]: if abs(num) < 1024: return f"{num:3.1f}{unit}{suffix}" num /= 1024 return "{:.1f}{}{}".format(num, "Yi", suffix) def get_build_version(): try: return str(os.path.getmtime(os.path.join(frappe.local.sites_path, "assets/assets.json"))) except OSError: # .build can sometimes not exist # this is not a major problem so send fallback return frappe.utils.random_string(8) def get_assets_json(): def _get_assets(): # get merged assets.json and assets-rtl.json assets = frappe.parse_json(frappe.read_file("assets/assets.json")) if assets_rtl := frappe.read_file("assets/assets-rtl.json"): assets.update(frappe.parse_json(assets_rtl)) return assets if not frappe.conf.developer_mode: return frappe.client_cache.get_value( "assets_json", shared=True, generator=_get_assets, ) else: return _get_assets() def get_bench_relative_path(file_path): """Fix paths relative to the bench root directory if exists and return the absolute path. Args: file_path (str, Path): Path of a file that exists on the file system Return: str: Absolute path of the file_path """ if not os.path.exists(file_path): base_path = ".." elif file_path.startswith(os.sep): base_path = os.sep else: base_path = "." file_path = os.path.join(base_path, file_path) if not os.path.exists(file_path): print(f"Invalid path {file_path[3:]}") sys.exit(1) return os.path.abspath(file_path) def groupby_metric(iterable: dict[str, list], key: str): """Group records by a metric. Usecase: Lets assume we got country wise players list with the ranking given for each player(multiple players in a country can have same ranking aswell). We can group the players by ranking(can be any other metric) using this function. >>> d = { 'india': [{'id':1, 'name': 'iplayer-1', 'ranking': 1}, {'id': 2, 'ranking': 1, 'name': 'iplayer-2'}, {'id': 2, 'ranking': 2, 'name': 'iplayer-3'}], 'Aus': [{'id':1, 'name': 'aplayer-1', 'ranking': 1}, {'id': 2, 'ranking': 1, 'name': 'aplayer-2'}, {'id': 2, 'ranking': 2, 'name': 'aplayer-3'}] } >>> groupby(d, key="ranking") {1: {'Aus': [{'id': 1, 'name': 'aplayer-1', 'ranking': 1}, {'id': 2, 'name': 'aplayer-2', 'ranking': 1}], 'india': [{'id': 1, 'name': 'iplayer-1', 'ranking': 1}, {'id': 2, 'name': 'iplayer-2', 'ranking': 1}]}, 2: {'Aus': [{'id': 2, 'name': 'aplayer-3', 'ranking': 2}], 'india': [{'id': 2, 'name': 'iplayer-3', 'ranking': 2}]}} """ records = {} for category, items in iterable.items(): for item in items: records.setdefault(item[key], {}).setdefault(category, []).append(item) return records def get_table_name(table_name: str, wrap_in_backticks: bool = False) -> str: name = f"tab{table_name}" if not table_name.startswith("__") else table_name if wrap_in_backticks: return f"`{name}`" return name def squashify(what): if isinstance(what, Sequence) and len(what) == 1: return what[0] return what def safe_json_loads(*args): results = [] for arg in args: try: arg = json.loads(arg) except Exception: pass results.append(arg) return squashify(results) def dictify(arg): if isinstance(arg, MutableSequence): for i, a in enumerate(arg): arg[i] = dictify(a) elif isinstance(arg, MutableMapping): arg = frappe._dict(arg) return arg class _UserInfo(TypedDict): fullname: str image: str name: str email: str time_zone: str def add_user_info(user: str | list[str] | set[str], user_info: dict[str, _UserInfo]) -> None: if not user: return if isinstance(user, str): user = [user] missing_users = [u for u in user if u not in user_info] if not missing_users: return missing_info = frappe.get_all( "User", {"name": ("in", missing_users)}, ["full_name", "user_image", "name", "email", "time_zone"], ) for info in missing_info: user_info.setdefault(info.name, frappe._dict()).update( fullname=info.full_name or info.name, image=info.user_image, name=info.name, email=info.email, time_zone=info.time_zone, ) def is_git_url(url: str) -> bool: # modified to allow without the tailing .git from https://github.com/jonschlinkert/is-git-url.git pattern = r"(?:git|ssh|https?|\w*@[-\w.]+):(\/\/)?(.*?)(\.git)?(\/?|\#[-\d\w._]+?)$" return bool(re.match(pattern, url)) class CallbackManager: """Manage callbacks. ``` # Capture callacks callbacks = CallbackManager() # Put a function call in queue callbacks.add(func) # Run all pending functions in queue callbacks.run() # Reset queue callbacks.reset() ``` Example usage: frappe.db.after_commit """ __slots__ = ("_functions",) def __init__(self) -> None: self._functions = deque() def add(self, func: Callable) -> None: """Add a function to queue, functions are executed in order of addition.""" self._functions.append(func) def __call__(self, func: Callable) -> None: self.add(func) def run(self): """Run all functions in queue""" while self._functions: _func = self._functions.popleft() _func() def reset(self): self._functions.clear() def safe_eval(code, eval_globals=None, eval_locals=None): """A safer `eval`""" from frappe.utils.safe_exec import safe_eval return safe_eval(code, eval_globals, eval_locals) cached_property = functools.cached_property if sys.version_info.minor < 12: T = TypeVar("T") class cached_property(functools.cached_property, Generic[T]): """ A simpler `functools.cached_property` implementation without locks. This isn't needed in Python 3.12+, since lock was removed in newer versions. Hence, in those versions, it returns the `functools.cached_property` object. This does not prevent a possible race condition in multi-threaded usage. The getter function could run more than once on the same instance, with the latest run setting the cached value. If the cached property is idempotent or otherwise not harmful to run more than once on an instance, this is fine. If synchronization is needed, implement the necessary locking inside the decorated getter function or around the cached property access. """ def __init__(self, func: Callable[[Any], T]): self.func = func self.attrname = None self.__doc__ = func.__doc__ self.__module__ = func.__module__ def __set_name__(self, owner, name): if self.attrname is None: self.attrname = name elif name != self.attrname: raise TypeError( "Cannot assign the same cached_property to two different names " f"({self.attrname!r} and {name!r})." ) def __get__(self, instance, owner=None) -> T: if instance is None: return self value = self.func(instance) instance.__dict__[self.attrname] = value return value # end: custom cached_property implementation