seitime-frappe/frappe/utils/__init__.py

1202 lines
30 KiB
Python

# Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and Contributors
# License: MIT. See LICENSE
import functools
import io
import os
import shutil
import sys
import traceback
from collections import deque
from collections.abc import (
Callable,
Container,
Generator,
Iterable,
MutableMapping,
MutableSequence,
Sequence,
)
from email.header import decode_header, make_header
from email.utils import formataddr, getaddresses, parseaddr
from typing import Any, Generic, TypeAlias, TypedDict
import orjson
from werkzeug.test import Client
from frappe.deprecation_dumpster import gzip_compress, gzip_decompress, make_esc
# utility functions like cint, int, flt, etc.
from frappe.utils.data import *
from frappe.utils.html_utils import sanitize_html
EMAIL_NAME_PATTERN = re.compile(r"[^A-Za-z0-9\u00C0-\u024F\/\_\' ]+")
EMAIL_STRING_PATTERN = re.compile(r"([a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+)")
NON_MD_HTML_PATTERN = re.compile(r"<p[\s]*>|<br[\s]*>")
HTML_TAGS_PATTERN = re.compile(r"\<[^>]*\>")
INCLUDE_DIRECTIVE_PATTERN = re.compile("""({% include ['"]([^'"]*)['"] %})""")
PHONE_NUMBER_PATTERN = re.compile(r"([0-9\ \+\_\-\,\.\*\#\(\)]){1,20}$")
PERSON_NAME_PATTERN = re.compile(r"^[\w][\w\'\-]*( \w[\w\'\-]*)*$")
WHITESPACE_PATTERN = re.compile(r"[\t\n\r]")
MULTI_EMAIL_STRING_PATTERN = re.compile(r'[,\n](?=(?:[^"]|"[^"]*")*$)')
EMAIL_MATCH_PATTERN = re.compile(
r"[a-z0-9!#$%&'*+/=?^_`{|}~-]+(?:\.[a-z0-9!#$%&'*+/=?^_`{|}~-]+)*@(?:[a-z0-9](?:[a-z0-9-]*[a-z0-9])?\.)+[a-z0-9](?:[a-z0-9-]*[a-z0-9])?",
re.IGNORECASE,
)
UNSET = object()
PropertyType: TypeAlias = property | functools.cached_property
def get_fullname(user=None):
"""get the full name (first name + last name) of the user from User"""
if not user:
user = frappe.session.user
if not hasattr(frappe.local, "fullnames"):
frappe.local.fullnames = {}
if not frappe.local.fullnames.get(user):
p = frappe.db.get_value("User", user, ["first_name", "last_name"], as_dict=True)
if p:
frappe.local.fullnames[user] = (
" ".join(filter(None, [p.get("first_name"), p.get("last_name")])) or user
)
else:
frappe.local.fullnames[user] = user
return frappe.local.fullnames.get(user)
def get_email_address(user=None):
"""get the email address of the user from User"""
if not user:
user = frappe.session.user
return frappe.db.get_value("User", user, "email")
def get_formatted_email(user, mail=None):
"""get Email Address of user formatted as: `John Doe <johndoe@example.com>`"""
fullname = get_fullname(user)
method = get_hook_method("get_sender_details")
if method:
sender_name, mail = method()
# if method exists but sender_name is ""
fullname = sender_name or fullname
if not mail:
mail = get_email_address(user) or validate_email_address(user)
if not mail:
return ""
else:
return cstr(make_header(decode_header(formataddr((fullname, mail)))))
def extract_email_id(email: str) -> str:
"""fetch only the email part of the Email Address"""
if not email:
return ""
return cstr(parse_addr(email)[1])
def validate_phone_number_with_country_code(phone_number: str, fieldname: str) -> None:
from phonenumbers import NumberParseException, is_valid_number, parse
from frappe import _
if not phone_number:
return
valid_number = False
error_message = _("Phone Number {0} set in field {1} is not valid.")
error_title = _("Invalid Phone Number")
try:
if valid_number := is_valid_number(parse(phone_number)):
return True
except NumberParseException as e:
if e.error_type == NumberParseException.INVALID_COUNTRY_CODE:
error_message = _("Please select a country code for field {1}.")
error_title = _("Country Code Required")
finally:
if not valid_number:
frappe.throw(
error_message.format(frappe.bold(phone_number), frappe.bold(fieldname)),
title=error_title,
exc=frappe.InvalidPhoneNumberError,
)
def validate_phone_number(phone_number, throw=False):
"""Return True if valid phone number."""
if not phone_number:
return False
if not isinstance(phone_number, str):
phone_number = str(phone_number)
phone_number = phone_number.strip()
match = PHONE_NUMBER_PATTERN.match(phone_number)
if not match and throw:
frappe.throw(
frappe._("{0} is not a valid Phone Number").format(phone_number), frappe.InvalidPhoneNumberError
)
return bool(match)
def validate_name(name, throw=False):
"""Return True if the name is valid
* valid names may have unicode and ascii characters, dash, quotes, numbers
* anything else is considered invalid
Note: "Name" here is name of a person, not the primary key in Frappe doctypes.
"""
if not name:
return False
name = name.strip()
match = PERSON_NAME_PATTERN.match(name)
if not match and throw:
frappe.throw(frappe._("{0} is not a valid Name").format(name), frappe.InvalidNameError)
return bool(match)
def validate_email_address(email_str, throw=False):
"""Validates the email string"""
email_str = (email_str or "").strip()
out = []
# Replace newlines with commas so getaddresses can handle them
# getaddresses expects comma-separated values
email_str = email_str.replace("\n", ",").replace("\r", ",")
# Parse using stdlib (handles commas in display names correctly)
addresses = getaddresses([email_str])
for name, addr in addresses:
if not addr:
if throw:
frappe.throw(
frappe._("{0} is not a valid Email Address").format(
frappe.utils.escape_html(name or email_str)
),
frappe.InvalidEmailAddressError,
)
continue
# Skip undisclosed recipients
if "undisclosed-recipient" in addr:
continue
match = EMAIL_MATCH_PATTERN.match(addr)
if not match:
if throw:
frappe.throw(
frappe._("{0} is not a valid Email Address").format(frappe.utils.escape_html(addr)),
frappe.InvalidEmailAddressError,
)
continue
out.append(addr)
return ", ".join(out)
def split_emails(txt):
email_list = []
# emails can be separated by comma or newline
s = WHITESPACE_PATTERN.sub(" ", cstr(txt))
for email in MULTI_EMAIL_STRING_PATTERN.split(s):
email = strip(cstr(email))
if email:
email_list.append(email)
return email_list
def validate_url(
txt: str,
throw: bool = False,
valid_schemes: str | Container[str] | None = None,
) -> bool:
"""
Return True if `txt` represents a valid URL.
Args:
throw: throws a validationError if URL is not valid
valid_schemes: if provided checks the given URL's scheme against this
"""
url = urlparse(txt)
is_valid = bool(url.scheme and (url.netloc or url.path)) or bool(txt and txt.startswith("/"))
# Handle scheme validation
if isinstance(valid_schemes, str):
is_valid = is_valid and (url.scheme == valid_schemes)
elif isinstance(valid_schemes, list | tuple | set):
is_valid = is_valid and (url.scheme in valid_schemes)
if not is_valid and throw:
frappe.throw(frappe._("'{0}' is not a valid URL").format(frappe.bold(txt)))
return is_valid
def validate_iban(iban: str, throw: bool = False) -> bool:
from frappe import _
valid = is_valid_iban(iban)
if not valid and throw:
frappe.throw(frappe._("'{0}' is not a valid IBAN").format(frappe.bold(iban)))
return valid
def is_valid_iban(iban: str) -> bool:
"""
Algorithm: https://en.wikipedia.org/wiki/International_Bank_Account_Number#Validating_the_IBAN
"""
if not iban:
return False
def encode_char(c):
# Position in the alphabet (A=1, B=2, ...) plus nine
return str(9 + ord(c) - 64)
# remove whitespaces, upper case to get the right number from ord()
iban = iban.replace(" ", "").upper()
# Move country code and checksum from the start to the end
flipped = iban[4:] + iban[:4]
# Encode characters as numbers
encoded = [encode_char(c) if ord(c) >= 65 and ord(c) <= 90 else c for c in flipped]
try:
to_check = int("".join(encoded))
except ValueError:
return False
return to_check % 97 == 1
def random_string(length: int) -> str:
"""generate a random string"""
import secrets
import string
alphabet = string.ascii_letters + string.digits
return "".join(secrets.choice(alphabet) for i in range(length))
def has_gravatar(email: str) -> str:
"""Return gravatar url if user has set an avatar at gravatar.com."""
import requests
if frappe.flags.in_import or frappe.flags.in_install or frappe.in_test:
# no gravatar if via upload
# since querying gravatar for every item will be slow
return ""
gravatar_url = get_gravatar_url(email, "404")
try:
res = requests.get(gravatar_url, timeout=5)
if res.status_code == 200:
return gravatar_url
else:
return ""
except requests.exceptions.RequestException:
return ""
def get_gravatar_url(email: str, default: Literal["mm", "404"] = "mm") -> str:
"""Return gravatar URL for the given email.
If `default` is set to "404", gravatar URL will return 404 if no avatar is found.
If `default` is set to "mm", a placeholder image will be returned.
"""
hexdigest = hashlib.md5(frappe.as_unicode(email).encode("utf-8"), usedforsecurity=False).hexdigest()
return f"https://secure.gravatar.com/avatar/{hexdigest}?d={default}&s=200"
def get_gravatar(email: str) -> str:
"""Return gravatar URL if user has set an avatar at gravatar.com.
Else return identicon image (base64)."""
from frappe.utils.identicon import Identicon
return has_gravatar(email) or Identicon(email).base64()
def get_traceback(with_context: bool = False) -> str:
"""Return the traceback of the Exception."""
from traceback_with_variables import iter_exc_lines
exc_type, exc_value, exc_tb = sys.exc_info()
if not any([exc_type, exc_value, exc_tb]):
return ""
if with_context:
trace_list = iter_exc_lines(fmt=_get_traceback_sanitizer())
tb = "\n".join(trace_list)
else:
trace_list = traceback.format_exception(exc_type, exc_value, exc_tb)
tb = "".join(cstr(t) for t in trace_list)
bench_path = get_bench_path() + "/"
return tb.replace(bench_path, "")
@functools.lru_cache(maxsize=1)
def _get_traceback_sanitizer():
from traceback_with_variables import Format
blocklist = [
"password",
"passwd",
"secret",
"token",
"key",
"pwd",
]
placeholder = "********"
def dict_printer(v: dict) -> str:
from copy import deepcopy
v = deepcopy(v)
for key in blocklist:
if key in v:
v[key] = placeholder
return str(v)
# Adapted from https://github.com/andy-landy/traceback_with_variables/blob/master/examples/format_customized.py
# Reused under MIT license: https://github.com/andy-landy/traceback_with_variables/blob/master/LICENSE
return Format(
custom_var_printers=[
# redact variables
*[(variable_name, lambda *a, **kw: placeholder) for variable_name in blocklist],
# redact dictionary keys
(["_secret", dict, lambda *a, **kw: False], dict_printer),
(["_secret", frappe._dict, lambda *a, **kw: False], dict_printer),
],
)
def log(event, details):
frappe.logger(event).info(details)
def dict_to_str(args: dict[str, Any], sep: str = "&") -> str:
"""Convert a dictionary to URL."""
return sep.join(f"{k!s}=" + quote(str(args[k] or "")) for k in list(args))
def list_to_str(seq, sep=", "):
"""Convert a sequence into a string using seperator.
Same as str.join, but does type conversion and strip extra spaces.
"""
return sep.join(map(str.strip, map(str, seq)))
# Get Defaults
# ==============================================================================
def get_defaults(key=None):
"""
Get dictionary of default values from the defaults, or a value if key is passed
"""
return frappe.db.get_defaults(key)
def set_default(key, val):
"""
Set / add a default value to defaults`
"""
return frappe.db.set_default(key, val)
def remove_blanks(d: dict) -> dict:
"""Return d with empty ('' or None) values stripped. Mutates inplace."""
for k, v in tuple(d.items()):
if not v:
del d[k]
return d
def strip_html_tags(text: str) -> str:
"""Remove html tags from the given `text`."""
return HTML_TAGS_PATTERN.sub("", text)
def get_file_timestamp(fn):
"""Return timestamp of the given file."""
from frappe.utils import cint
try:
return str(cint(os.stat(fn).st_mtime))
except OSError as e:
if e.args[0] != 2:
raise
else:
return None
# esc / unescape characters -- used for command line
def esc(s, esc_chars):
"""
Escape special characters
"""
if not s:
return ""
for c in esc_chars:
esc_str = "\\" + c
s = s.replace(c, esc_str)
return s
def unesc(s, esc_chars):
"""
UnEscape special characters
"""
for c in esc_chars:
esc_str = "\\" + c
s = s.replace(esc_str, c)
return s
def execute_in_shell(cmd, verbose=False, low_priority=False, check_exit_code=False):
# using Popen instead of os.system - as recommended by python docs
import shlex
import tempfile
from subprocess import Popen
if isinstance(cmd, list):
# ensure it's properly escaped; only a single string argument executes via shell
cmd = shlex.join(cmd)
with tempfile.TemporaryFile() as stdout, tempfile.TemporaryFile() as stderr:
kwargs = {
"shell": True,
"stdout": stdout,
"stderr": stderr,
"executable": shutil.which("bash") or "/bin/bash",
}
if low_priority:
kwargs["preexec_fn"] = lambda: os.nice(10)
p = Popen(cmd, **kwargs)
exit_code = p.wait()
stdout.seek(0)
out = stdout.read()
stderr.seek(0)
err = stderr.read()
failed = check_exit_code and exit_code
if verbose or failed:
if err:
print(err)
if out:
print(out)
if failed:
raise frappe.CommandFailedError(
"Command failed", out.decode(errors="replace"), err.decode(errors="replace")
)
return err, out
def get_path(*path, **kwargs):
base = kwargs.get("base")
if not base:
base = frappe.local.site_path
return os.path.join(base, *path)
def get_site_base_path():
return frappe.local.site_path
def get_site_path(*path):
return get_path(*path, base=get_site_base_path())
def get_files_path(*path, **kwargs):
return get_site_path("private" if kwargs.get("is_private") else "public", "files", *path)
def get_bench_path():
return os.environ.get("FRAPPE_BENCH_ROOT") or os.path.realpath(
os.path.join(os.path.dirname(frappe.__file__), "..", "..", "..")
)
def get_bench_id():
return frappe.get_conf().get("bench_id", get_bench_path().strip("/").replace("/", "-"))
def get_site_id(site=None):
return f"{site or frappe.local.site}@{get_bench_id()}"
def get_backups_path():
return get_site_path("private", "backups")
def get_request_site_address(full_address=False):
return get_url(full_address=full_address)
def get_site_url(site):
conf = frappe.get_conf(site)
if conf.host_name:
return conf.host_name
return f"http://{site}:{conf.webserver_port}"
def encode_dict(d, encoding="utf-8"):
for key in d:
if isinstance(d[key], str):
d[key] = d[key].encode(encoding)
return d
def decode_dict(d, encoding="utf-8"):
for key in d:
if isinstance(d[key], str) and not isinstance(d[key], str):
d[key] = d[key].decode(encoding, "ignore")
return d
@functools.lru_cache
def get_site_name(hostname):
return hostname.split(":", 1)[0]
def get_disk_usage():
"""get disk usage of files folder"""
files_path = get_files_path()
if not os.path.exists(files_path):
return 0
_err, out = execute_in_shell(f"du -hsm {files_path}")
return cint(out.split("\n")[-2].split("\t")[0])
def touch_file(path):
with open(path, "a"):
os.utime(path, None)
return path
def get_test_client(use_cookies=True) -> Client:
"""Return an test instance of the Frappe WSGI."""
from frappe.app import application
return Client(application, use_cookies=use_cookies)
def get_hook_method(hook_name, fallback=None):
method = frappe.get_hooks().get(hook_name)
if method:
method = frappe.get_attr(method[0])
return method
if fallback:
return fallback
def call_hook_method(hook, *args, **kwargs):
out = None
for method_name in frappe.get_hooks(hook):
out = out or frappe.get_attr(method_name)(*args, **kwargs)
return out
def is_cli() -> bool:
"""Return True if current instance is being run via a terminal."""
invoked_from_terminal = False
try:
invoked_from_terminal = bool(os.get_terminal_size())
except Exception:
invoked_from_terminal = sys.stdin and sys.stdin.isatty()
return invoked_from_terminal
def update_progress_bar(txt, i, l, absolute=False):
if os.environ.get("CI"):
if i == 0:
sys.stdout.write(txt)
sys.stdout.write(".")
sys.stdout.flush()
return
if not getattr(frappe.local, "request", None) or is_cli(): # pragma: no cover
lt = len(txt)
try:
col = 40 if os.get_terminal_size().columns > 80 else 20
except OSError:
# in case function isn't being called from a terminal
col = 40
if lt < 36:
txt = txt + " " * (36 - lt)
complete = int(float(i + 1) / l * col)
completion_bar = ("=" * complete).ljust(col, " ")
percent_complete = f"{int(float(i + 1) / l * 100)!s}%"
status = f"{i} of {l}" if absolute else percent_complete
sys.stdout.write(f"\r{txt}: [{completion_bar}] {status}")
sys.stdout.flush()
def get_html_format(print_path):
html_format = None
if os.path.exists(print_path):
with open(print_path) as f:
html_format = f.read()
for include_directive, path in INCLUDE_DIRECTIVE_PATTERN.findall(html_format):
for app_name in frappe.get_installed_apps():
include_path = frappe.get_app_path(app_name, *path.split(os.path.sep))
if os.path.exists(include_path):
with open(include_path) as f:
html_format = html_format.replace(include_directive, f.read())
break
return html_format
def is_markdown(text):
if "<!-- markdown -->" in text:
return True
elif "<!-- html -->" in text:
return False
else:
return not NON_MD_HTML_PATTERN.search(text)
def is_a_property(x) -> bool:
"""Get properties (@property, @cached_property) in a controller class"""
return isinstance(x, PropertyType)
def get_sites(sites_path=None):
if not sites_path:
sites_path = getattr(frappe.local, "sites_path", None) or "."
sites = []
for site in os.listdir(sites_path):
path = os.path.join(sites_path, site)
if (
os.path.isdir(path)
and not os.path.islink(path)
and os.path.exists(os.path.join(path, "site_config.json"))
):
# is a dir and has site_config.json
sites.append(site)
return sorted(sites)
def get_request_session(max_retries=5):
import requests
from requests.adapters import HTTPAdapter, Retry
session = requests.Session()
http_adapter = HTTPAdapter(max_retries=Retry(total=max_retries, status_forcelist=[500]))
session.mount("http://", http_adapter)
session.mount("https://", http_adapter)
return session
def markdown(text, sanitize=True, linkify=True):
html = text if is_html(text) else frappe.utils.md_to_html(text)
if sanitize:
html = html.replace("<!-- markdown -->", "")
html = sanitize_html(html, linkify=linkify)
return html
def sanitize_email(emails):
sanitized = []
for e in split_emails(emails):
if not validate_email_address(e):
continue
full_name, email_id = parse_addr(e)
sanitized.append(formataddr((full_name, email_id)))
return ", ".join(sanitized)
def parse_addr(email_string):
"""
Return email_id and user_name based on email string
Raise error if email string is not valid
"""
name, email = parseaddr(email_string)
if check_format(email):
name = get_name_from_email_string(email_string, email, name)
return (name, email)
else:
email_list = EMAIL_STRING_PATTERN.findall(email_string)
if len(email_list) > 0 and check_format(email_list[0]):
# take only first email address
email = email_list[0]
name = get_name_from_email_string(email_string, email, name)
return (name, email)
return (None, email)
def check_format(email_id):
"""
Check if email_id is valid. valid email:text@example.com
String check ensures that email_id contains both '.' and
'@' and index of '@' is less than '.'
"""
is_valid = False
try:
pos = email_id.rindex("@")
is_valid = pos > 0 and (email_id.rindex(".") > pos) and (len(email_id) - pos > 4)
except Exception:
# print(e)
pass
return is_valid
def get_name_from_email_string(email_string, email_id, name):
name = email_string.replace(email_id, "")
name = EMAIL_NAME_PATTERN.sub("", name).strip()
if not name:
name = email_id
return name
def get_installed_apps_info():
out = []
from frappe.utils.change_log import get_versions
out.extend(
{
"app_name": app,
"version": version_details.get("branch_version") or version_details.get("version"),
"branch": version_details.get("branch"),
}
for app, version_details in get_versions().items()
)
return out
def get_site_info():
from frappe.email.queue import get_emails_sent_this_month
from frappe.utils.user import get_system_managers
# only get system users
users = frappe.get_all(
"User",
filters={"user_type": "System User", "name": ("not in", frappe.STANDARD_USERS)},
fields=["name", "enabled", "last_login", "last_active", "language", "time_zone"],
)
system_managers = get_system_managers(only_name=True)
for u in users:
# tag system managers
u.is_system_manager = 1 if u.name in system_managers else 0
u.full_name = get_fullname(u.name)
u.email = u.name
del u["name"]
system_settings = frappe.db.get_singles_dict("System Settings")
space_usage = frappe._dict((frappe.local.conf.limits or {}).get("space_usage", {}))
kwargs = {
"fields": ["user", "creation", "full_name"],
"filters": {"operation": "Login", "status": "Success"},
"limit": 10,
}
site_info = {
"installed_apps": get_installed_apps_info(),
"users": users,
"country": system_settings.country,
"language": system_settings.language or "english",
"time_zone": system_settings.time_zone,
"setup_complete": frappe.is_setup_complete(),
"scheduler_enabled": system_settings.enable_scheduler,
# usage
"emails_sent": get_emails_sent_this_month(),
"space_used": flt((space_usage.total or 0) / 1024.0, 2),
"database_size": space_usage.database_size,
"backup_size": space_usage.backup_size,
"files_size": space_usage.files_size,
"last_logins": frappe.get_all("Activity Log", **kwargs),
}
# from other apps
for method_name in frappe.get_hooks("get_site_info"):
site_info.update(frappe.get_attr(method_name)(site_info) or {})
# dumps -> loads to prevent datatype conflicts
return orjson.loads(frappe.as_json(site_info))
def get_db_count(*args):
"""
Pass a doctype or a series of doctypes to get the count of docs in them.
Parameters:
*args: Variable length argument list of doctype names whose doc count you need
Return:
dict: A dict with the count values.
Example:
via terminal:
bench --site erpnext.local execute frappe.utils.get_db_count --args "['DocType', 'Communication']"
"""
db_count = {}
for doctype in args:
db_count[doctype] = frappe.db.count(doctype)
return orjson.loads(frappe.as_json(db_count))
def call(fn, *args, **kwargs):
"""
Pass a doctype or a series of doctypes to get the count of docs in them
Parameters:
fn: frappe function to be called
Return:
based on the function you call: output of the function you call
Example:
via terminal:
bench --site erpnext.local execute frappe.utils.call --args '''["frappe.get_all", "Activity Log"]''' --kwargs '''{"fields": ["user", "creation", "full_name"], "filters":{"Operation": "Login", "Status": "Success"}, "limit": "10"}'''
"""
return orjson.loads(frappe.as_json(frappe.call(fn, *args, **kwargs)))
def get_safe_filters(filters):
try:
parsed = orjson.loads(filters)
except (TypeError, ValueError):
# not a string, or not valid json
return filters
# numeric JSON is ambiguous: docnames like "3E002" parse as floats and
# would be corrupted by stringifying back, so keep the original string
if isinstance(parsed, int | float) and not isinstance(parsed, bool):
return filters
return parsed
def create_batch(iterable: Iterable, size: int) -> Generator[Iterable]:
"""Convert an iterable to multiple batches of constant size of batch_size.
Args:
iterable (Iterable): Iterable object which is subscriptable
size (int): Maximum size of batches to be generated
Yields:
Generator[List]: Batched iterable of maximum length `size`
"""
total_count = len(iterable)
for i in range(0, total_count, size):
yield iterable[i : min(i + size, total_count)]
def set_request(**kwargs):
from werkzeug.test import EnvironBuilder
from werkzeug.wrappers import Request
builder = EnvironBuilder(**kwargs)
frappe.local.request = Request(builder.get_environ())
def get_html_for_route(route):
from frappe.website.serve import get_response
set_request(method="GET", path=route)
response = get_response()
return frappe.safe_decode(response.get_data())
def get_file_size(path, format=False):
num = os.path.getsize(path)
if not format:
return num
suffix = "B"
for unit in ["", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]:
if abs(num) < 1024:
return f"{num:3.1f}{unit}{suffix}"
num /= 1024
return "{:.1f}{}{}".format(num, "Yi", suffix)
def get_build_version():
try:
return str(os.path.getmtime(os.path.join(frappe.local.sites_path, "assets/assets.json")))
except OSError:
# .build can sometimes not exist
# this is not a major problem so send fallback
return frappe.utils.random_string(8)
def get_assets_json():
def _get_assets():
# get merged assets.json and assets-rtl.json
assets = frappe.parse_json(frappe.read_file("assets/assets.json"))
if assets_rtl := frappe.read_file("assets/assets-rtl.json"):
assets.update(frappe.parse_json(assets_rtl))
return assets
if not frappe.conf.developer_mode:
return frappe.client_cache.get_value(
"assets_json",
shared=True,
generator=_get_assets,
)
else:
return _get_assets()
def get_bench_relative_path(file_path):
"""Fix paths relative to the bench root directory if exists and return the absolute path.
Args:
file_path (str, Path): Path of a file that exists on the file system
Return:
str: Absolute path of the file_path
"""
if not os.path.exists(file_path):
base_path = ".."
elif file_path.startswith(os.sep):
base_path = os.sep
else:
base_path = "."
file_path = os.path.join(base_path, file_path)
if not os.path.exists(file_path):
print(f"Invalid path {file_path[3:]}")
sys.exit(1)
return os.path.abspath(file_path)
def groupby_metric(iterable: dict[str, list], key: str):
"""Group records by a metric.
Usecase: Lets assume we got country wise players list with the ranking given for each player(multiple players in a country can have same ranking aswell).
We can group the players by ranking(can be any other metric) using this function.
>>> d = {
'india': [{'id':1, 'name': 'iplayer-1', 'ranking': 1}, {'id': 2, 'ranking': 1, 'name': 'iplayer-2'}, {'id': 2, 'ranking': 2, 'name': 'iplayer-3'}],
'Aus': [{'id':1, 'name': 'aplayer-1', 'ranking': 1}, {'id': 2, 'ranking': 1, 'name': 'aplayer-2'}, {'id': 2, 'ranking': 2, 'name': 'aplayer-3'}]
}
>>> groupby(d, key="ranking")
{1: {'Aus': [{'id': 1, 'name': 'aplayer-1', 'ranking': 1},
{'id': 2, 'name': 'aplayer-2', 'ranking': 1}],
'india': [{'id': 1, 'name': 'iplayer-1', 'ranking': 1},
{'id': 2, 'name': 'iplayer-2', 'ranking': 1}]},
2: {'Aus': [{'id': 2, 'name': 'aplayer-3', 'ranking': 2}],
'india': [{'id': 2, 'name': 'iplayer-3', 'ranking': 2}]}}
"""
records = {}
for category, items in iterable.items():
for item in items:
records.setdefault(item[key], {}).setdefault(category, []).append(item)
return records
def get_table_name(table_name: str, wrap_in_backticks: bool = False) -> str:
name = f"tab{table_name}" if not table_name.startswith("__") else table_name
if wrap_in_backticks:
return f"`{name}`"
return name
def squashify(what):
if isinstance(what, Sequence) and len(what) == 1:
return what[0]
return what
def safe_json_loads(*args):
results = []
for arg in args:
try:
arg = orjson.loads(arg)
except Exception:
pass
results.append(arg)
return squashify(results)
def dictify(arg):
if isinstance(arg, MutableSequence):
for i, a in enumerate(arg):
arg[i] = dictify(a)
elif isinstance(arg, MutableMapping):
arg = frappe._dict(arg)
return arg
class _UserInfo(TypedDict):
fullname: str
image: str
name: str
email: str
time_zone: str
def add_user_info(user: str | list[str] | set[str], user_info: dict[str, _UserInfo]) -> None:
if not user:
return
if isinstance(user, str):
user = [user]
missing_users = [u for u in user if u not in user_info]
if not missing_users:
return
missing_info = frappe.get_all(
"User",
{"name": ("in", missing_users)},
["full_name", "user_image", "name", "email", "time_zone"],
)
for info in missing_info:
user_info.setdefault(info.name, frappe._dict()).update(
fullname=info.full_name or info.name,
image=info.user_image,
name=info.name,
email=info.email,
time_zone=info.time_zone,
)
def is_git_url(url: str) -> bool:
# modified to allow without the tailing .git from https://github.com/jonschlinkert/is-git-url.git
pattern = r"(?:git|ssh|https?|\w*@[-\w.]+):(\/\/)?(.*?)(\.git)?(\/?|\#[-\d\w._]+?)$"
return bool(re.match(pattern, url))
class CallbackManager:
"""Manage callbacks.
```
# Capture callacks
callbacks = CallbackManager()
# Put a function call in queue
callbacks.add(func)
# Run all pending functions in queue
callbacks.run()
# Reset queue
callbacks.reset()
```
Example usage: frappe.db.after_commit
"""
__slots__ = ("_functions",)
def __init__(self) -> None:
self._functions = deque()
def add(self, func: Callable) -> None:
"""Add a function to queue, functions are executed in order of addition."""
self._functions.append(func)
def __call__(self, func: Callable) -> None:
self.add(func)
def run(self):
"""Run all functions in queue"""
while self._functions:
_func = self._functions.popleft()
_func()
def reset(self):
self._functions.clear()
def safe_eval(code, eval_globals=None, eval_locals=None):
"""A safer `eval`"""
from frappe.utils.safe_exec import safe_eval
return safe_eval(code, eval_globals, eval_locals)
def create_folder(path, with_init=False):
"""Create a folder in the given path and add an `__init__.py` file (optional).
:param path: Folder path.
:param with_init: Create `__init__.py` in the new folder."""
from frappe.utils import touch_file
if not os.path.exists(path):
os.makedirs(path)
if with_init:
touch_file(os.path.join(path, "__init__.py"))
cached_property = functools.cached_property
def get_frappe_version() -> str:
return getattr(frappe, "__version__", "unknown")
def get_app_version(app_name: str) -> str:
try:
return frappe.get_attr(app_name + ".__version__")
except Exception:
return "0.0.1"