seitime-frappe/frappe/utils/__init__.py
2021-05-01 00:34:19 +05:30

836 lines
23 KiB
Python

# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors
# MIT License. See license.txt
import functools
import hashlib
import io
import json
import os
import re
import sys
import traceback
import typing
from email.header import decode_header, make_header
from email.utils import formataddr, parseaddr
from gzip import GzipFile
from typing import Generator, Iterable
from urllib.parse import quote, urlparse
from werkzeug.test import Client
import frappe
# utility functions like cint, int, flt, etc.
from frappe.utils.data import *
from frappe.utils.html_utils import sanitize_html
default_fields = ['doctype', 'name', 'owner', 'creation', 'modified', 'modified_by',
'parent', 'parentfield', 'parenttype', 'idx', 'docstatus']
def get_fullname(user=None):
"""get the full name (first name + last name) of the user from User"""
if not user:
user = frappe.session.user
if not hasattr(frappe.local, "fullnames"):
frappe.local.fullnames = {}
if not frappe.local.fullnames.get(user):
p = frappe.db.get_value("User", user, ["first_name", "last_name"], as_dict=True)
if p:
frappe.local.fullnames[user] = " ".join(filter(None,
[p.get('first_name'), p.get('last_name')])) or user
else:
frappe.local.fullnames[user] = user
return frappe.local.fullnames.get(user)
def get_email_address(user=None):
"""get the email address of the user from User"""
if not user:
user = frappe.session.user
return frappe.db.get_value("User", user, "email")
def get_formatted_email(user, mail=None):
"""get Email Address of user formatted as: `John Doe <johndoe@example.com>`"""
fullname = get_fullname(user)
if not mail:
mail = get_email_address(user) or validate_email_address(user)
if not mail:
return ''
else:
return cstr(make_header(decode_header(formataddr((fullname, mail)))))
def extract_email_id(email):
"""fetch only the email part of the Email Address"""
email_id = parse_addr(email)[1]
if email_id and isinstance(email_id, bytes):
email_id = email_id.decode("utf-8", "ignore")
return email_id
def validate_phone_number(phone_number, throw=False):
"""Returns True if valid phone number"""
if not phone_number:
return False
phone_number = phone_number.strip()
match = re.match(r"([0-9\ \+\_\-\,\.\*\#\(\)]){1,20}$", phone_number)
if not match and throw:
frappe.throw(frappe._("{0} is not a valid Phone Number").format(phone_number), frappe.InvalidPhoneNumberError)
return bool(match)
def validate_name(name, throw=False):
"""Returns True if the name is valid
valid names may have unicode and ascii characters, dash, quotes, numbers
anything else is considered invalid
"""
if not name:
return False
name = name.strip()
match = re.match(r"^[\w][\w\'\-]*([ \w][\w\'\-]+)*$", name)
if not match and throw:
frappe.throw(frappe._("{0} is not a valid Name").format(name), frappe.InvalidNameError)
return bool(match)
def validate_email_address(email_str, throw=False):
"""Validates the email string"""
email = email_str = (email_str or "").strip()
def _check(e):
_valid = True
if not e:
_valid = False
if 'undisclosed-recipient' in e:
return False
elif " " in e and "<" not in e:
# example: "test@example.com test2@example.com" will return "test@example.comtest2" after parseaddr!!!
_valid = False
else:
email_id = extract_email_id(e)
match = re.match("[a-z0-9!#$%&'*+/=?^_`{|}~-]+(?:\.[a-z0-9!#$%&'*+/=?^_`{|}~-]+)*@(?:[a-z0-9](?:[a-z0-9-]*[a-z0-9])?\.)+[a-z0-9](?:[a-z0-9-]*[a-z0-9])?", email_id.lower()) if email_id else None
if not match:
_valid = False
else:
matched = match.group(0)
if match:
match = matched==email_id.lower()
if not _valid:
if throw:
invalid_email = frappe.utils.escape_html(e)
frappe.throw(frappe._("{0} is not a valid Email Address").format(invalid_email),
frappe.InvalidEmailAddressError)
return None
else:
return matched
out = []
for e in email_str.split(','):
email = _check(e.strip())
if email:
out.append(email)
return ', '.join(out)
def split_emails(txt):
email_list = []
# emails can be separated by comma or newline
s = re.sub(r'[\t\n\r]', ' ', cstr(txt))
for email in re.split('''[,\\n](?=(?:[^"]|"[^"]*")*$)''', s):
email = strip(cstr(email))
if email:
email_list.append(email)
return email_list
def validate_url(txt, throw=False, valid_schemes=None):
"""
Tests wether the `txt` is a valid URL
Parameters:
throw (`bool`): throws a validationError if URL is not valid
valid_schemes (`str` or `list`): if provided checks the given URL's scheme against this
Returns:
bool: if `txt` represents a valid URL
"""
url = urlparse(txt)
is_valid = bool(url.netloc)
# Handle scheme validation
if isinstance(valid_schemes, str):
is_valid = is_valid and (url.scheme == valid_schemes)
elif isinstance(valid_schemes, (list, tuple, set)):
is_valid = is_valid and (url.scheme in valid_schemes)
if not is_valid and throw:
frappe.throw(
frappe._("'{0}' is not a valid URL").format(frappe.bold(txt))
)
return is_valid
def random_string(length):
"""generate a random string"""
import string
from random import choice
return ''.join([choice(string.ascii_letters + string.digits) for i in range(length)])
def has_gravatar(email):
'''Returns gravatar url if user has set an avatar at gravatar.com'''
import requests
if (frappe.flags.in_import
or frappe.flags.in_install
or frappe.flags.in_test):
# no gravatar if via upload
# since querying gravatar for every item will be slow
return ''
hexdigest = hashlib.md5(frappe.as_unicode(email).encode('utf-8')).hexdigest()
gravatar_url = "https://secure.gravatar.com/avatar/{hash}?d=404&s=200".format(hash=hexdigest)
try:
res = requests.get(gravatar_url)
if res.status_code==200:
return gravatar_url
else:
return ''
except requests.exceptions.ConnectionError:
return ''
def get_gravatar_url(email):
return "https://secure.gravatar.com/avatar/{hash}?d=mm&s=200".format(hash=hashlib.md5(email.encode('utf-8')).hexdigest())
def get_gravatar(email):
from frappe.utils.identicon import Identicon
gravatar_url = has_gravatar(email)
if not gravatar_url:
gravatar_url = Identicon(email).base64()
return gravatar_url
def get_traceback():
"""
Returns the traceback of the Exception
"""
exc_type, exc_value, exc_tb = sys.exc_info()
trace_list = traceback.format_exception(exc_type, exc_value, exc_tb)
body = "".join(cstr(t) for t in trace_list)
return body
def log(event, details):
frappe.logger().info(details)
def dict_to_str(args, sep = '&'):
"""
Converts a dictionary to URL
"""
t = []
for k in list(args):
t.append(str(k)+'='+quote(str(args[k] or '')))
return sep.join(t)
def list_to_str(seq, sep = ', '):
"""Convert a sequence into a string using seperator.
Same as str.join, but does type conversion and strip extra spaces.
"""
return sep.join(map(str.strip, map(str, seq)))
# Get Defaults
# ==============================================================================
def get_defaults(key=None):
"""
Get dictionary of default values from the defaults, or a value if key is passed
"""
return frappe.db.get_defaults(key)
def set_default(key, val):
"""
Set / add a default value to defaults`
"""
return frappe.db.set_default(key, val)
def remove_blanks(d):
"""
Returns d with empty ('' or None) values stripped
"""
empty_keys = []
for key in d:
if d[key]=='' or d[key]==None:
# del d[key] raises runtime exception, using a workaround
empty_keys.append(key)
for key in empty_keys:
del d[key]
return d
def strip_html_tags(text):
"""Remove html tags from text"""
return re.sub("\<[^>]*\>", "", text)
def get_file_timestamp(fn):
"""
Returns timestamp of the given file
"""
from frappe.utils import cint
try:
return str(cint(os.stat(fn).st_mtime))
except OSError as e:
if e.args[0]!=2:
raise
else:
return None
# to be deprecated
def make_esc(esc_chars):
"""
Function generator for Escaping special characters
"""
return lambda s: ''.join(['\\' + c if c in esc_chars else c for c in s])
# esc / unescape characters -- used for command line
def esc(s, esc_chars):
"""
Escape special characters
"""
if not s:
return ""
for c in esc_chars:
esc_str = '\\' + c
s = s.replace(c, esc_str)
return s
def unesc(s, esc_chars):
"""
UnEscape special characters
"""
for c in esc_chars:
esc_str = '\\' + c
s = s.replace(esc_str, c)
return s
def execute_in_shell(cmd, verbose=0, low_priority=False):
# using Popen instead of os.system - as recommended by python docs
import tempfile
from subprocess import Popen
with tempfile.TemporaryFile() as stdout:
with tempfile.TemporaryFile() as stderr:
kwargs = {
"shell": True,
"stdout": stdout,
"stderr": stderr
}
if low_priority:
kwargs["preexec_fn"] = lambda: os.nice(10)
p = Popen(cmd, **kwargs)
p.wait()
stdout.seek(0)
out = stdout.read()
stderr.seek(0)
err = stderr.read()
if verbose:
if err: print(err)
if out: print(out)
return err, out
def get_path(*path, **kwargs):
base = kwargs.get('base')
if not base:
base = frappe.local.site_path
return os.path.join(base, *path)
def get_site_base_path(sites_dir=None, hostname=None):
return frappe.local.site_path
def get_site_path(*path):
return get_path(base=get_site_base_path(), *path)
def get_files_path(*path, **kwargs):
return get_site_path("private" if kwargs.get("is_private") else "public", "files", *path)
def get_bench_path():
return os.path.realpath(os.path.join(os.path.dirname(frappe.__file__), '..', '..', '..'))
def get_backups_path():
return get_site_path("private", "backups")
def get_request_site_address(full_address=False):
return get_url(full_address=full_address)
def get_site_url(site):
return 'http://{site}:{port}'.format(
site=site,
port=frappe.get_conf(site).webserver_port
)
def encode_dict(d, encoding="utf-8"):
for key in d:
if isinstance(d[key], str):
d[key] = d[key].encode(encoding)
return d
def decode_dict(d, encoding="utf-8"):
for key in d:
if isinstance(d[key], bytes):
d[key] = d[key].decode(encoding, "ignore")
return d
@functools.lru_cache()
def get_site_name(hostname):
return hostname.split(':')[0]
def get_disk_usage():
"""get disk usage of files folder"""
files_path = get_files_path()
if not os.path.exists(files_path):
return 0
err, out = execute_in_shell("du -hsm {files_path}".format(files_path=files_path))
return cint(out.split("\n")[-2].split("\t")[0])
def touch_file(path):
with open(path, 'a'):
os.utime(path, None)
return path
def get_test_client():
from frappe.app import application
return Client(application)
def get_hook_method(hook_name, fallback=None):
method = (frappe.get_hooks().get(hook_name))
if method:
method = frappe.get_attr(method[0])
return method
if fallback:
return fallback
def call_hook_method(hook, *args, **kwargs):
out = None
for method_name in frappe.get_hooks(hook):
out = out or frappe.get_attr(method_name)(*args, **kwargs)
return out
def update_progress_bar(txt, i, l):
if os.environ.get("CI"):
if i == 0:
sys.stdout.write(txt)
sys.stdout.write(".")
sys.stdout.flush()
return
if not getattr(frappe.local, 'request', None):
lt = len(txt)
try:
col = 40 if os.get_terminal_size().columns > 80 else 20
except OSError:
# in case function isn't being called from a terminal
col = 40
if lt < 36:
txt = txt + " "*(36-lt)
complete = int(float(i+1) / l * col)
completion_bar = ("=" * complete).ljust(col, ' ')
percent_complete = str(int(float(i+1) / l * 100))
sys.stdout.write("\r{0}: [{1}] {2}%".format(txt, completion_bar, percent_complete))
sys.stdout.flush()
def get_html_format(print_path):
html_format = None
if os.path.exists(print_path):
with open(print_path, "r") as f:
html_format = f.read()
for include_directive, path in re.findall("""({% include ['"]([^'"]*)['"] %})""", html_format):
for app_name in frappe.get_installed_apps():
include_path = frappe.get_app_path(app_name, *path.split(os.path.sep))
if os.path.exists(include_path):
with open(include_path, "r") as f:
html_format = html_format.replace(include_directive, f.read())
break
return html_format
def is_markdown(text):
if "<!-- markdown -->" in text:
return True
elif "<!-- html -->" in text:
return False
else:
return not re.search("<p[\s]*>|<br[\s]*>", text)
def get_sites(sites_path=None):
if not sites_path:
sites_path = getattr(frappe.local, 'sites_path', None) or '.'
sites = []
for site in os.listdir(sites_path):
path = os.path.join(sites_path, site)
if (os.path.isdir(path)
and not os.path.islink(path)
and os.path.exists(os.path.join(path, 'site_config.json'))):
# is a dir and has site_config.json
sites.append(site)
return sorted(sites)
def get_request_session(max_retries=3):
import requests
from urllib3.util import Retry
session = requests.Session()
session.mount("http://", requests.adapters.HTTPAdapter(max_retries=Retry(total=5, status_forcelist=[500])))
session.mount("https://", requests.adapters.HTTPAdapter(max_retries=Retry(total=5, status_forcelist=[500])))
return session
def watch(path, handler=None, debug=True):
import time
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
class Handler(FileSystemEventHandler):
def on_any_event(self, event):
if debug:
print("File {0}: {1}".format(event.event_type, event.src_path))
if not handler:
print("No handler specified")
return
handler(event.src_path, event.event_type)
event_handler = Handler()
observer = Observer()
observer.schedule(event_handler, path, recursive=True)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
def markdown(text, sanitize=True, linkify=True):
html = text if is_html(text) else frappe.utils.md_to_html(text)
if sanitize:
html = html.replace("<!-- markdown -->", "")
html = sanitize_html(html, linkify=linkify)
return html
def sanitize_email(emails):
sanitized = []
for e in split_emails(emails):
if not validate_email_address(e):
continue
full_name, email_id = parse_addr(e)
sanitized.append(formataddr((full_name, email_id)))
return ", ".join(sanitized)
def parse_addr(email_string):
"""
Return email_id and user_name based on email string
Raise error if email string is not valid
"""
name, email = parseaddr(email_string)
if check_format(email):
name = get_name_from_email_string(email_string, email, name)
return (name, email)
else:
email_regex = re.compile(r"([a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+)")
email_list = re.findall(email_regex, email_string)
if len(email_list) > 0 and check_format(email_list[0]):
#take only first email address
email = email_list[0]
name = get_name_from_email_string(email_string, email, name)
return (name, email)
return (None, email)
def check_format(email_id):
"""
Check if email_id is valid. valid email:text@example.com
String check ensures that email_id contains both '.' and
'@' and index of '@' is less than '.'
"""
is_valid = False
try:
pos = email_id.rindex("@")
is_valid = pos > 0 and (email_id.rindex(".") > pos) and (len(email_id) - pos > 4)
except Exception:
#print(e)
pass
return is_valid
def get_name_from_email_string(email_string, email_id, name):
name = email_string.replace(email_id, '')
name = re.sub('[^A-Za-z0-9\u00C0-\u024F\/\_\' ]+', '', name).strip()
if not name:
name = email_id
return name
def get_installed_apps_info():
out = []
from frappe.utils.change_log import get_versions
for app, version_details in iteritems(get_versions()):
out.append({
'app_name': app,
'version': version_details.get('branch_version') or version_details.get('version'),
'branch': version_details.get('branch')
})
return out
def get_site_info():
from frappe.core.doctype.user.user import STANDARD_USERS
from frappe.email.queue import get_emails_sent_this_month
from frappe.utils.user import get_system_managers
# only get system users
users = frappe.get_all('User', filters={'user_type': 'System User', 'name': ('not in', STANDARD_USERS)},
fields=['name', 'enabled', 'last_login', 'last_active', 'language', 'time_zone'])
system_managers = get_system_managers(only_name=True)
for u in users:
# tag system managers
u.is_system_manager = 1 if u.name in system_managers else 0
u.full_name = get_fullname(u.name)
u.email = u.name
del u['name']
system_settings = frappe.db.get_singles_dict('System Settings')
space_usage = frappe._dict((frappe.local.conf.limits or {}).get('space_usage', {}))
kwargs = {"fields": ["user", "creation", "full_name"], "filters":{"Operation": "Login", "Status": "Success"}, "limit": "10"}
site_info = {
'installed_apps': get_installed_apps_info(),
'users': users,
'country': system_settings.country,
'language': system_settings.language or 'english',
'time_zone': system_settings.time_zone,
'setup_complete': cint(system_settings.setup_complete),
'scheduler_enabled': system_settings.enable_scheduler,
# usage
'emails_sent': get_emails_sent_this_month(),
'space_used': flt((space_usage.total or 0) / 1024.0, 2),
'database_size': space_usage.database_size,
'backup_size': space_usage.backup_size,
'files_size': space_usage.files_size,
'last_logins': frappe.get_all("Activity Log", **kwargs)
}
# from other apps
for method_name in frappe.get_hooks('get_site_info'):
site_info.update(frappe.get_attr(method_name)(site_info) or {})
# dumps -> loads to prevent datatype conflicts
return json.loads(frappe.as_json(site_info))
def parse_json(val):
"""
Parses json if string else return
"""
if isinstance(val, str):
val = json.loads(val)
if isinstance(val, dict):
val = frappe._dict(val)
return val
def get_db_count(*args):
"""
Pass a doctype or a series of doctypes to get the count of docs in them
Parameters:
*args: Variable length argument list of doctype names whose doc count you need
Returns:
dict: A dict with the count values.
Example:
via terminal:
bench --site erpnext.local execute frappe.utils.get_db_count --args "['DocType', 'Communication']"
"""
db_count = {}
for doctype in args:
db_count[doctype] = frappe.db.count(doctype)
return json.loads(frappe.as_json(db_count))
def call(fn, *args, **kwargs):
"""
Pass a doctype or a series of doctypes to get the count of docs in them
Parameters:
fn: frappe function to be called
Returns:
based on the function you call: output of the function you call
Example:
via terminal:
bench --site erpnext.local execute frappe.utils.call --args '''["frappe.get_all", "Activity Log"]''' --kwargs '''{"fields": ["user", "creation", "full_name"], "filters":{"Operation": "Login", "Status": "Success"}, "limit": "10"}'''
"""
return json.loads(frappe.as_json(frappe.call(fn, *args, **kwargs)))
# Following methods are aken as-is from Python 3 codebase
# since gzip.compress and gzip.decompress are not available in Python 2.7
def gzip_compress(data, compresslevel=9):
"""Compress data in one shot and return the compressed string.
Optional argument is the compression level, in range of 0-9.
"""
buf = io.BytesIO()
with GzipFile(fileobj=buf, mode='wb', compresslevel=compresslevel) as f:
f.write(data)
return buf.getvalue()
def gzip_decompress(data):
"""Decompress a gzip compressed string in one shot.
Return the decompressed string.
"""
with GzipFile(fileobj=io.BytesIO(data)) as f:
return f.read()
def get_safe_filters(filters):
try:
filters = json.loads(filters)
if isinstance(filters, (integer_types, float)):
filters = frappe.as_unicode(filters)
except (TypeError, ValueError):
# filters are not passed, not json
pass
return filters
def create_batch(iterable: Iterable, size: int) -> Generator[Iterable, None, None]:
"""Convert an iterable to multiple batches of constant size of batch_size
Args:
iterable (Iterable): Iterable object which is subscriptable
size (int): Maximum size of batches to be generated
Yields:
Generator[List]: Batched iterable of maximum length `size`
"""
total_count = len(iterable)
for i in range(0, total_count, size):
yield iterable[i : min(i + size, total_count)]
def set_request(**kwargs):
from werkzeug.test import EnvironBuilder
from werkzeug.wrappers import Request
builder = EnvironBuilder(**kwargs)
frappe.local.request = Request(builder.get_environ())
def get_html_for_route(route):
from frappe.website import render
set_request(method='GET', path=route)
response = render.render()
html = frappe.safe_decode(response.get_data())
return html
def get_file_size(path, format=False):
num = os.path.getsize(path)
if not format:
return num
suffix = 'B'
for unit in ['','Ki','Mi','Gi','Ti','Pi','Ei','Zi']:
if abs(num) < 1024:
return "{0:3.1f}{1}{2}".format(num, unit, suffix)
num /= 1024
return "{0:.1f}{1}{2}".format(num, 'Yi', suffix)
def get_build_version():
try:
return str(os.path.getmtime(os.path.join(frappe.local.sites_path, '.build')))
except OSError:
# .build can sometimes not exist
# this is not a major problem so send fallback
return frappe.utils.random_string(8)
def get_bench_relative_path(file_path):
"""Fixes paths relative to the bench root directory if exists and returns the absolute path
Args:
file_path (str, Path): Path of a file that exists on the file system
Returns:
str: Absolute path of the file_path
"""
if not os.path.exists(file_path):
base_path = '..'
elif file_path.startswith(os.sep):
base_path = os.sep
else:
base_path = '.'
file_path = os.path.join(base_path, file_path)
if not os.path.exists(file_path):
print('Invalid path {0}'.format(file_path[3:]))
sys.exit(1)
return os.path.abspath(file_path)
def groupby_metric(iterable: typing.Dict[str, list], key: str):
""" Group records by a metric.
Usecase: Lets assume we got country wise players list with the ranking given for each player(multiple players in a country can have same ranking aswell).
We can group the players by ranking(can be any other metric) using this function.
>>> d = {
'india': [{'id':1, 'name': 'iplayer-1', 'ranking': 1}, {'id': 2, 'ranking': 1, 'name': 'iplayer-2'}, {'id': 2, 'ranking': 2, 'name': 'iplayer-3'}],
'Aus': [{'id':1, 'name': 'aplayer-1', 'ranking': 1}, {'id': 2, 'ranking': 1, 'name': 'aplayer-2'}, {'id': 2, 'ranking': 2, 'name': 'aplayer-3'}]
}
>>> groupby(d, key='ranking')
{1: {'Aus': [{'id': 1, 'name': 'aplayer-1', 'ranking': 1},
{'id': 2, 'name': 'aplayer-2', 'ranking': 1}],
'india': [{'id': 1, 'name': 'iplayer-1', 'ranking': 1},
{'id': 2, 'name': 'iplayer-2', 'ranking': 1}]},
2: {'Aus': [{'id': 2, 'name': 'aplayer-3', 'ranking': 2}],
'india': [{'id': 2, 'name': 'iplayer-3', 'ranking': 2}]}}
"""
records = {}
for category, items in iterable.items():
for item in items:
records.setdefault(item[key], {}).setdefault(category, []).append(item)
return records