seitime-frappe/frappe/utils/__init__.py
2016-12-20 12:35:59 +05:30

547 lines
14 KiB
Python

# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors
# MIT License. See license.txt
# util __init__.py
from __future__ import unicode_literals
from werkzeug.test import Client
import os, re, urllib, sys, json, md5, requests, traceback
import bleach, bleach_whitelist
from html5lib.sanitizer import HTMLSanitizer
from markdown2 import markdown as _markdown
import frappe
from frappe.utils.identicon import Identicon
# utility functions like cint, int, flt, etc.
from frappe.utils.data import *
default_fields = ['doctype', 'name', 'owner', 'creation', 'modified', 'modified_by',
'parent', 'parentfield', 'parenttype', 'idx', 'docstatus']
# used in import_docs.py
# TODO: deprecate it
def getCSVelement(v):
"""
Returns the CSV value of `v`, For example:
* apple becomes "apple"
* hi"there becomes "hi""there"
"""
v = cstr(v)
if not v: return ''
if (',' in v) or ('\n' in v) or ('"' in v):
if '"' in v: v = v.replace('"', '""')
return '"'+v+'"'
else: return v or ''
def get_fullname(user=None):
"""get the full name (first name + last name) of the user from User"""
if not user:
user = frappe.session.user
if not hasattr(frappe.local, "fullnames"):
frappe.local.fullnames = {}
if not frappe.local.fullnames.get(user):
p = frappe.db.get_value("User", user, ["first_name", "last_name"], as_dict=True)
if p:
frappe.local.fullnames[user] = " ".join(filter(None,
[p.get('first_name'), p.get('last_name')])) or user
else:
frappe.local.fullnames[user] = user
return frappe.local.fullnames.get(user)
def get_formatted_email(user):
"""get email id of user formatted as: `John Doe <johndoe@example.com>`"""
if user == "Administrator":
return user
from email.utils import formataddr
fullname = get_fullname(user)
return formataddr((fullname, user))
def extract_email_id(email):
"""fetch only the email part of the email id"""
from email.utils import parseaddr
fullname, email_id = parseaddr(email)
if isinstance(email_id, basestring) and not isinstance(email_id, unicode):
email_id = email_id.decode("utf-8", "ignore")
return email_id
def validate_email_add(email_str, throw=False):
"""Validates the email string"""
email_str = (email_str or "").strip()
if not email_str:
return False
elif " " in email_str and "<" not in email_str:
# example: "test@example.com test2@example.com" will return "test@example.comtest2" after parseaddr!!!
return False
email = extract_email_id(email_str.strip())
match = re.match("[a-z0-9!#$%&'*+/=?^_`{|}~-]+(?:\.[a-z0-9!#$%&'*+/=?^_`{|}~-]+)*@(?:[a-z0-9](?:[a-z0-9-]*[a-z0-9])?\.)+[a-z0-9](?:[a-z0-9-]*[a-z0-9])?", email.lower())
if not match:
if throw:
frappe.throw(frappe._("{0} is not a valid email id").format(email),
frappe.InvalidEmailAddressError)
else:
return False
matched = match.group(0)
if match:
match = matched==email.lower()
if not match and throw:
frappe.throw(frappe._("{0} is not a valid email id").format(email),
frappe.InvalidEmailAddressError)
return matched
def split_emails(txt):
email_list = []
# emails can be separated by comma or newline
for email in re.split('''[,\\n](?=(?:[^"]|"[^"]*")*$)''', cstr(txt)):
email = strip(cstr(email))
if email:
email_list.append(email)
return email_list
def random_string(length):
"""generate a random string"""
import string
from random import choice
return ''.join([choice(string.letters + string.digits) for i in range(length)])
def has_gravatar(email):
'''Returns gravatar url if user has set an avatar at gravatar.com'''
if (frappe.flags.in_import
or frappe.flags.in_install
or frappe.flags.in_test):
# no gravatar if via upload
# since querying gravatar for every item will be slow
return ''
hexdigest = md5.md5(frappe.as_unicode(email)).hexdigest()
gravatar_url = "https://secure.gravatar.com/avatar/{hash}?d=404&s=200".format(hash=hexdigest)
try:
res = requests.get(gravatar_url)
if res.status_code==200:
return gravatar_url
else:
return ''
except requests.exceptions.ConnectionError:
return ''
def get_gravatar_url(email):
return "https://secure.gravatar.com/avatar/{hash}?d=mm&s=200".format(hash=md5.md5(email).hexdigest())
def get_gravatar(email):
gravatar_url = has_gravatar(email)
if not gravatar_url:
gravatar_url = Identicon(email).base64()
return gravatar_url
def get_traceback():
"""
Returns the traceback of the Exception
"""
exc_type, exc_value, exc_tb = sys.exc_info()
trace_list = traceback.format_exception(exc_type, exc_value, exc_tb)
body = "".join(cstr(t) for t in trace_list)
return body
def log(event, details):
frappe.logger().info(details)
def dict_to_str(args, sep='&'):
"""
Converts a dictionary to URL
"""
t = []
for k in args.keys():
t.append(str(k)+'='+urllib.quote(str(args[k] or '')))
return sep.join(t)
# Get Defaults
# ==============================================================================
def get_defaults(key=None):
"""
Get dictionary of default values from the defaults, or a value if key is passed
"""
return frappe.db.get_defaults(key)
def set_default(key, val):
"""
Set / add a default value to defaults`
"""
return frappe.db.set_default(key, val)
def remove_blanks(d):
"""
Returns d with empty ('' or None) values stripped
"""
empty_keys = []
for key in d:
if d[key]=='' or d[key]==None:
# del d[key] raises runtime exception, using a workaround
empty_keys.append(key)
for key in empty_keys:
del d[key]
return d
def strip_html_tags(text):
"""Remove html tags from text"""
return re.sub("\<[^>]*\>", "", text)
def get_file_timestamp(fn):
"""
Returns timestamp of the given file
"""
from frappe.utils import cint
try:
return str(cint(os.stat(fn).st_mtime))
except OSError, e:
if e.args[0]!=2:
raise
else:
return None
# to be deprecated
def make_esc(esc_chars):
"""
Function generator for Escaping special characters
"""
return lambda s: ''.join(['\\' + c if c in esc_chars else c for c in s])
# esc / unescape characters -- used for command line
def esc(s, esc_chars):
"""
Escape special characters
"""
if not s:
return ""
for c in esc_chars:
esc_str = '\\' + c
s = s.replace(c, esc_str)
return s
def unesc(s, esc_chars):
"""
UnEscape special characters
"""
for c in esc_chars:
esc_str = '\\' + c
s = s.replace(esc_str, c)
return s
def execute_in_shell(cmd, verbose=0):
# using Popen instead of os.system - as recommended by python docs
from subprocess import Popen
import tempfile
with tempfile.TemporaryFile() as stdout:
with tempfile.TemporaryFile() as stderr:
p = Popen(cmd, shell=True, stdout=stdout, stderr=stderr)
p.wait()
stdout.seek(0)
out = stdout.read()
stderr.seek(0)
err = stderr.read()
if verbose:
if err: print err
if out: print out
return err, out
def get_path(*path, **kwargs):
base = kwargs.get('base')
if not base:
base = frappe.local.site_path
return os.path.join(base, *path)
def get_site_base_path(sites_dir=None, hostname=None):
return frappe.local.site_path
def get_site_path(*path):
return get_path(base=get_site_base_path(), *path)
def get_files_path(*path, **kwargs):
return get_site_path("private" if kwargs.get("is_private") else "public", "files", *path)
def get_bench_path():
return os.path.realpath(os.path.join(os.path.dirname(frappe.__file__), '..', '..', '..'))
def get_backups_path():
return get_site_path("private", "backups")
def get_request_site_address(full_address=False):
return get_url(full_address=full_address)
def encode_dict(d, encoding="utf-8"):
for key in d:
if isinstance(d[key], basestring) and isinstance(d[key], unicode):
d[key] = d[key].encode(encoding)
return d
def decode_dict(d, encoding="utf-8"):
for key in d:
if isinstance(d[key], basestring) and not isinstance(d[key], unicode):
d[key] = d[key].decode(encoding, "ignore")
return d
def get_site_name(hostname):
return hostname.split(':')[0]
def get_disk_usage():
"""get disk usage of files folder"""
files_path = get_files_path()
if not os.path.exists(files_path):
return 0
err, out = execute_in_shell("du -hsm {files_path}".format(files_path=files_path))
return cint(out.split("\n")[-2].split("\t")[0])
def touch_file(path):
with open(path, 'a'):
os.utime(path, None)
return path
def get_test_client():
from frappe.app import application
return Client(application)
def get_hook_method(hook_name, fallback=None):
method = (frappe.get_hooks().get(hook_name))
if method:
method = frappe.get_attr(method[0])
return method
if fallback:
return fallback
def call_hook_method(hook, *args, **kwargs):
out = None
for method_name in frappe.get_hooks(hook):
out = out or frappe.get_attr(method_name)(*args, **kwargs)
return out
def update_progress_bar(txt, i, l):
if not getattr(frappe.local, 'request', None):
lt = len(txt)
if lt < 36:
txt = txt + " "*(36-lt)
complete = int(float(i+1) / l * 40)
sys.stdout.write("\r{0}: [{1}{2}]".format(txt, "="*complete, " "*(40-complete)))
sys.stdout.flush()
def get_html_format(print_path):
html_format = None
if os.path.exists(print_path):
with open(print_path, "r") as f:
html_format = f.read()
for include_directive, path in re.findall("""({% include ['"]([^'"]*)['"] %})""", html_format):
for app_name in frappe.get_installed_apps():
include_path = frappe.get_app_path(app_name, *path.split(os.path.sep))
if os.path.exists(include_path):
with open(include_path, "r") as f:
html_format = html_format.replace(include_directive, f.read())
break
return html_format
def is_markdown(text):
if "<!-- markdown -->" in text:
return True
elif "<!-- html -->" in text:
return False
else:
return not re.search("<p[\s]*>|<br[\s]*>", text)
def get_sites(sites_path=None):
import os
if not sites_path:
sites_path = getattr(frappe.local, 'sites_path', None) or '.'
sites = []
for site in os.listdir(sites_path):
path = os.path.join(sites_path, site)
if (os.path.isdir(path)
and not os.path.islink(path)
and os.path.exists(os.path.join(path, 'site_config.json'))):
# is a dir and has site_config.json
sites.append(site)
return sorted(sites)
def get_request_session(max_retries=3):
from requests.packages.urllib3.util import Retry
session = requests.Session()
session.mount("http://", requests.adapters.HTTPAdapter(max_retries=Retry(total=5, status_forcelist=[500])))
session.mount("https://", requests.adapters.HTTPAdapter(max_retries=Retry(total=5, status_forcelist=[500])))
return session
def watch(path, handler=None, debug=True):
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
class Handler(FileSystemEventHandler):
def on_any_event(self, event):
if debug:
print "File {0}: {1}".format(event.event_type, event.src_path)
if not handler:
print "No handler specified"
return
handler(event.src_path, event.event_type)
event_handler = Handler()
observer = Observer()
observer.schedule(event_handler, path, recursive=True)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
def sanitize_html(html, linkify=False):
"""
Sanitize HTML tags, attributes and style to prevent XSS attacks
Based on bleach clean, bleach whitelist and HTML5lib's Sanitizer defaults
Does not sanitize JSON, as it could lead to future problems
"""
if not isinstance(html, basestring):
return html
elif is_json(html):
return html
tags = (HTMLSanitizer.acceptable_elements + HTMLSanitizer.svg_elements
+ ["html", "head", "meta", "link", "body", "iframe", "style", "o:p"])
attributes = {"*": HTMLSanitizer.acceptable_attributes, "svg": HTMLSanitizer.svg_attributes}
styles = bleach_whitelist.all_styles
strip_comments = False
# retuns html with escaped tags, escaped orphan >, <, etc.
escaped_html = bleach.clean(html, tags=tags, attributes=attributes, styles=styles, strip_comments=strip_comments)
if linkify:
# based on bleach.clean
class s(bleach.BleachSanitizer):
allowed_elements = tags
allowed_attributes = attributes
allowed_css_properties = styles
strip_disallowed_elements = False
strip_html_comments = strip_comments
escaped_html = bleach.linkify(escaped_html, tokenizer=s)
return escaped_html
def is_json(text):
try:
json.loads(text)
except ValueError:
return False
else:
return True
def markdown(text, sanitize=True, linkify=True):
html = _markdown(text)
if sanitize:
html = html.replace("<!-- markdown -->", "")
html = sanitize_html(html, linkify=linkify)
return html
def sanitize_email(emails):
from email.utils import parseaddr, formataddr
sanitized = []
for e in split_emails(emails):
if not validate_email_add(e):
continue
fullname, email_id = parseaddr(e)
sanitized.append(formataddr((fullname, email_id)))
return ", ".join(sanitized)
def get_installed_apps_info():
out = []
for app in frappe.get_installed_apps():
out.append({
'app_name': app,
'version': getattr(frappe.get_module(app), '__version__', 'Unknown')
})
return out
def get_site_info():
from frappe.utils.user import get_system_managers
from frappe.core.doctype.user.user import STANDARD_USERS
from frappe.email.queue import get_emails_sent_this_month
# only get system users
users = frappe.get_all('User', filters={'user_type': 'System User', 'name': ('not in', STANDARD_USERS)},
fields=['name', 'enabled', 'last_login', 'last_active', 'language', 'time_zone'])
system_managers = get_system_managers(only_name=True)
for u in users:
# tag system managers
u.is_system_manager = 1 if u.name in system_managers else 0
u.full_name = get_fullname(u.name)
u.email = u.name
del u['name']
system_settings = frappe.db.get_singles_dict('System Settings')
space_usage = frappe._dict((frappe.local.conf.limits or {}).get('space_usage', {}))
site_info = {
'installed_apps': get_installed_apps_info(),
'users': users,
'country': system_settings.country,
'language': system_settings.language or 'english',
'time_zone': system_settings.time_zone,
'setup_complete': cint(system_settings.setup_complete),
'scheduler_enabled': system_settings.enable_scheduler,
# usage
'emails_sent': get_emails_sent_this_month(),
'space_used': flt((space_usage.total or 0) / 1024.0, 2),
'database_size': space_usage.database_size,
'backup_size': space_usage.backup_size,
'files_size': space_usage.files_size
}
# from other apps
for method_name in frappe.get_hooks('get_site_info'):
site_info.update(frappe.get_attr(method_name)(site_info) or {})
# dumps -> loads to prevent datatype conflicts
return json.loads(frappe.as_json(site_info))