558 lines
15 KiB
Python
558 lines
15 KiB
Python
# Copyright (c) 2013, Web Notes Technologies Pvt. Ltd. and Contributors
|
|
# MIT License. See license.txt
|
|
"""
|
|
globals attached to webnotes module
|
|
+ some utility functions that should probably be moved
|
|
"""
|
|
|
|
from __future__ import unicode_literals
|
|
|
|
from werkzeug.local import Local
|
|
from werkzeug.exceptions import NotFound
|
|
from MySQLdb import ProgrammingError as SQLError
|
|
|
|
import os, sys, importlib
|
|
import json
|
|
import semantic_version
|
|
|
|
local = Local()
|
|
|
|
class _dict(dict):
|
|
"""dict like object that exposes keys as attributes"""
|
|
def __getattr__(self, key):
|
|
return self.get(key)
|
|
def __setattr__(self, key, value):
|
|
self[key] = value
|
|
def __getstate__(self):
|
|
return self
|
|
def __setstate__(self, d):
|
|
self.update(d)
|
|
def update(self, d):
|
|
"""update and return self -- the missing dict feature in python"""
|
|
super(_dict, self).update(d)
|
|
return self
|
|
def copy(self):
|
|
return _dict(dict(self).copy())
|
|
|
|
def __getattr__(self, key):
|
|
return local.get("key", None)
|
|
|
|
def _(msg):
|
|
"""translate object in current lang, if exists"""
|
|
if hasattr(local, 'translations'):
|
|
return local.translations.get(lang, {}).get(msg, msg)
|
|
|
|
return msg
|
|
|
|
def set_user_lang(user, user_language=None):
|
|
from webnotes.translate import get_lang_dict
|
|
|
|
if not user_language:
|
|
user_language = conn.get_value("Profile", user, "language")
|
|
|
|
if user_language:
|
|
lang_dict = get_lang_dict()
|
|
if user_language in lang_dict:
|
|
local.lang = lang_dict[user_language]
|
|
|
|
def load_translations(module, doctype, name):
|
|
from webnotes.translate import load_doc_messages
|
|
load_doc_messages(module, doctype, name)
|
|
|
|
|
|
# local-globals
|
|
conn = local("conn")
|
|
conf = local("conf")
|
|
form = form_dict = local("form_dict")
|
|
request = local("request")
|
|
request_method = local("request_method")
|
|
response = local("response")
|
|
_response = local("_response")
|
|
session = local("session")
|
|
user = local("user")
|
|
flags = local("flags")
|
|
|
|
error_log = local("error_log")
|
|
debug_log = local("debug_log")
|
|
message_log = local("message_log")
|
|
|
|
lang = local("lang")
|
|
|
|
def init(site, sites_path=None):
|
|
if getattr(local, "initialised", None):
|
|
return
|
|
|
|
if not sites_path:
|
|
sites_path = '.'
|
|
|
|
local.error_log = []
|
|
local.site = site
|
|
local.sites_path = sites_path
|
|
local.site_path = os.path.join(sites_path, site)
|
|
local.message_log = []
|
|
local.debug_log = []
|
|
local.response = _dict({})
|
|
local.lang = "en"
|
|
local.request_method = request.method if request else None
|
|
local.conf = _dict(get_site_config())
|
|
local.initialised = True
|
|
local.flags = _dict({})
|
|
local.rollback_observers = []
|
|
|
|
setup_module_map()
|
|
|
|
def get_site_config():
|
|
site_filepath = os.path.join(local.site_path, "site_config.json")
|
|
if os.path.exists(site_filepath):
|
|
with open(site_filepath, 'r') as f:
|
|
return json.load(f)
|
|
else:
|
|
return _dict()
|
|
|
|
def destroy():
|
|
"""closes connection and releases werkzeug local"""
|
|
if conn:
|
|
conn.close()
|
|
|
|
from werkzeug.local import release_local
|
|
release_local(local)
|
|
|
|
_memc = None
|
|
test_objects = {}
|
|
|
|
# memcache
|
|
def cache():
|
|
global _memc
|
|
if not _memc:
|
|
from webnotes.memc import MClient
|
|
_memc = MClient(['localhost:11211'])
|
|
return _memc
|
|
|
|
class DuplicateEntryError(Exception): pass
|
|
class ValidationError(Exception): pass
|
|
class AuthenticationError(Exception): pass
|
|
class PermissionError(Exception): pass
|
|
class UnknownDomainError(Exception): pass
|
|
class SessionStopped(Exception): pass
|
|
class MappingMismatchError(ValidationError): pass
|
|
class InvalidStatusError(ValidationError): pass
|
|
class DoesNotExistError(ValidationError): pass
|
|
class MandatoryError(ValidationError): pass
|
|
class InvalidSignatureError(ValidationError): pass
|
|
|
|
def getTraceback():
|
|
import utils
|
|
return utils.getTraceback()
|
|
|
|
def errprint(msg):
|
|
from utils import cstr
|
|
if not request:
|
|
print cstr(msg)
|
|
|
|
error_log.append(cstr(msg))
|
|
|
|
def log(msg):
|
|
if not request:
|
|
import conf
|
|
if conf.get("logging") or False:
|
|
print repr(msg)
|
|
|
|
from utils import cstr
|
|
debug_log.append(cstr(msg))
|
|
|
|
def msgprint(msg, small=0, raise_exception=0, as_table=False):
|
|
def _raise_exception():
|
|
if raise_exception:
|
|
if flags.rollback_on_exception:
|
|
conn.rollback()
|
|
import inspect
|
|
if inspect.isclass(raise_exception) and issubclass(raise_exception, Exception):
|
|
raise raise_exception, msg
|
|
else:
|
|
raise ValidationError, msg
|
|
|
|
if flags.mute_messages:
|
|
_raise_exception()
|
|
return
|
|
|
|
from utils import cstr
|
|
if as_table and type(msg) in (list, tuple):
|
|
msg = '<table border="1px" style="border-collapse: collapse" cellpadding="2px">' + ''.join(['<tr>'+''.join(['<td>%s</td>' % c for c in r])+'</tr>' for r in msg]) + '</table>'
|
|
|
|
if flags.print_messages:
|
|
print "Message: " + repr(msg)
|
|
|
|
message_log.append((small and '__small:' or '')+cstr(msg or ''))
|
|
_raise_exception()
|
|
|
|
def throw(msg, exc=ValidationError):
|
|
msgprint(msg, raise_exception=exc)
|
|
|
|
def create_folder(path):
|
|
if not os.path.exists(path): os.makedirs(path)
|
|
|
|
def connect(site=None, db_name=None):
|
|
from db import Database
|
|
if site:
|
|
init(site)
|
|
local.conn = Database(user=db_name)
|
|
local.response = _dict()
|
|
local.form_dict = _dict()
|
|
local.session = _dict()
|
|
set_user("Administrator")
|
|
|
|
def set_user(username):
|
|
import webnotes.profile
|
|
local.session["user"] = username
|
|
local.user = webnotes.profile.Profile(username)
|
|
|
|
def get_request_header(key, default=None):
|
|
return request.headers.get(key, default)
|
|
|
|
logger = None
|
|
whitelisted = []
|
|
guest_methods = []
|
|
def whitelist(allow_guest=False, allow_roles=None):
|
|
"""
|
|
decorator for whitelisting a function
|
|
|
|
Note: if the function is allowed to be accessed by a guest user,
|
|
it must explicitly be marked as allow_guest=True
|
|
|
|
for specific roles, set allow_roles = ['Administrator'] etc.
|
|
"""
|
|
def innerfn(fn):
|
|
global whitelisted, guest_methods
|
|
whitelisted.append(fn)
|
|
|
|
if allow_guest:
|
|
guest_methods.append(fn)
|
|
|
|
if allow_roles:
|
|
roles = get_roles()
|
|
allowed = False
|
|
for role in allow_roles:
|
|
if role in roles:
|
|
allowed = True
|
|
break
|
|
|
|
if not allowed:
|
|
raise PermissionError, "Method not allowed"
|
|
|
|
return fn
|
|
|
|
return innerfn
|
|
|
|
def clear_cache(user=None, doctype=None):
|
|
"""clear cache"""
|
|
from webnotes.sessions import clear_cache
|
|
if doctype:
|
|
from webnotes.model.doctype import clear_cache
|
|
clear_cache(doctype)
|
|
reset_metadata_version()
|
|
elif user:
|
|
clear_cache(user)
|
|
else: # everything
|
|
clear_cache()
|
|
reset_metadata_version()
|
|
|
|
def get_roles(username=None):
|
|
import webnotes.profile
|
|
if not username or username==session.user:
|
|
return user.get_roles()
|
|
else:
|
|
return webnotes.profile.Profile(username).get_roles()
|
|
|
|
def check_admin_or_system_manager():
|
|
if ("System Manager" not in get_roles()) and \
|
|
(session.user!="Administrator"):
|
|
msgprint("Only Allowed for Role System Manager or Administrator", raise_exception=True)
|
|
|
|
def has_permission(doctype, ptype="read", refdoc=None):
|
|
"""check if user has permission"""
|
|
from webnotes.utils import cint
|
|
|
|
if session.user=="Administrator" or conn.get_value("DocType", doctype, "istable")==1:
|
|
return True
|
|
|
|
meta = get_doctype(doctype)
|
|
|
|
# get user permissions
|
|
user_roles = get_roles()
|
|
perms = [p for p in meta.get({"doctype": "DocPerm"})
|
|
if cint(p.get(ptype))==1 and cint(p.permlevel)==0 and (p.role=="All" or p.role in user_roles)]
|
|
|
|
if refdoc:
|
|
return has_match(meta, perms, refdoc)
|
|
else:
|
|
return perms and True or False
|
|
|
|
def has_match(meta, perms, refdoc):
|
|
from webnotes.defaults import get_user_default_as_list
|
|
|
|
if isinstance(refdoc, basestring):
|
|
refdoc = doc(meta[0].name, refdoc)
|
|
|
|
match_failed = {}
|
|
for p in perms:
|
|
if p.match:
|
|
if ":" in p.match:
|
|
keys = p.match.split(":")
|
|
else:
|
|
keys = [p.match, p.match]
|
|
|
|
if refdoc.fields.get(keys[0],"[No Value]") in get_user_default_as_list(keys[1]):
|
|
return True
|
|
else:
|
|
match_failed[keys[0]] = refdoc.fields.get(keys[0],"[No Value]")
|
|
else:
|
|
# found a permission without a match
|
|
return True
|
|
|
|
# no valid permission found
|
|
if match_failed:
|
|
msg = _("Not allowed for: ")
|
|
for key in match_failed:
|
|
msg += "\n" + (meta.get_field(key) and meta.get_label(key) or key) \
|
|
+ " = " + (match_failed[key] or "None")
|
|
msgprint(msg)
|
|
|
|
return False
|
|
|
|
def generate_hash():
|
|
"""Generates random hash for session id"""
|
|
import hashlib, time
|
|
return hashlib.sha224(str(time.time())).hexdigest()
|
|
|
|
def reset_metadata_version():
|
|
v = generate_hash()
|
|
cache().set_value("metadata_version", v)
|
|
return v
|
|
|
|
def get_obj(dt = None, dn = None, doc=None, doclist=[], with_children = True):
|
|
from webnotes.model.code import get_obj
|
|
return get_obj(dt, dn, doc, doclist, with_children)
|
|
|
|
def doc(doctype=None, name=None, fielddata=None):
|
|
from webnotes.model.doc import Document
|
|
return Document(doctype, name, fielddata)
|
|
|
|
def new_doc(doctype, parent_doc=None, parentfield=None):
|
|
from webnotes.model.create_new import get_new_doc
|
|
return get_new_doc(doctype, parent_doc, parentfield)
|
|
|
|
def new_bean(doctype):
|
|
from webnotes.model.create_new import get_new_doc
|
|
return bean([get_new_doc(doctype)])
|
|
|
|
def doclist(lst=None):
|
|
from webnotes.model.doclist import DocList
|
|
return DocList(lst)
|
|
|
|
def bean(doctype=None, name=None, copy=None):
|
|
"""return an instance of the object, wrapped as a Bean (webnotes.model.bean)"""
|
|
from webnotes.model.bean import Bean
|
|
if copy:
|
|
return Bean(copy_doclist(copy))
|
|
else:
|
|
return Bean(doctype, name)
|
|
|
|
def set_value(doctype, docname, fieldname, value):
|
|
import webnotes.client
|
|
return webnotes.client.set_value(doctype, docname, fieldname, value)
|
|
|
|
def get_doclist(doctype, name=None):
|
|
return bean(doctype, name).doclist
|
|
|
|
def get_doctype(doctype, processed=False):
|
|
import webnotes.model.doctype
|
|
return webnotes.model.doctype.get(doctype, processed)
|
|
|
|
def delete_doc(doctype=None, name=None, doclist = None, force=0, ignore_doctypes=None, for_reload=False, ignore_permissions=False):
|
|
import webnotes.model.utils
|
|
|
|
if not ignore_doctypes:
|
|
ignore_doctypes = []
|
|
|
|
if isinstance(name, list):
|
|
for n in name:
|
|
webnotes.model.utils.delete_doc(doctype, n, doclist, force, ignore_doctypes, for_reload, ignore_permissions)
|
|
else:
|
|
webnotes.model.utils.delete_doc(doctype, name, doclist, force, ignore_doctypes, for_reload, ignore_permissions)
|
|
|
|
def clear_perms(doctype):
|
|
conn.sql("""delete from tabDocPerm where parent=%s""", doctype)
|
|
|
|
def reset_perms(doctype):
|
|
clear_perms(doctype)
|
|
reload_doc(conn.get_value("DocType", doctype, "module"), "DocType", doctype, force=True)
|
|
|
|
def reload_doc(module, dt=None, dn=None, force=False):
|
|
import webnotes.modules
|
|
return webnotes.modules.reload_doc(module, dt, dn, force=force)
|
|
|
|
def rename_doc(doctype, old, new, debug=0, force=False, merge=False):
|
|
from webnotes.model.rename_doc import rename_doc
|
|
return rename_doc(doctype, old, new, force=force, merge=merge)
|
|
|
|
def insert(doclist):
|
|
import webnotes.model
|
|
return webnotes.model.insert(doclist)
|
|
|
|
def get_module(modulename):
|
|
return importlib.import_module(modulename)
|
|
|
|
def scrub(txt):
|
|
return txt.replace(' ','_').replace('-', '_').replace('/', '_').lower()
|
|
|
|
def get_module_path(module):
|
|
module = scrub(module)
|
|
return get_pymodule_path(local.module_app[module] + "." + module)
|
|
|
|
def get_pymodule_path(modulename, *joins):
|
|
return os.path.join(os.path.dirname(get_module(modulename).__file__), *joins)
|
|
|
|
def get_module_list(app_name):
|
|
return get_file_items(os.path.join(os.path.dirname(get_module(app_name).__file__), "modules.txt"))
|
|
|
|
def get_app_list(with_webnotes=False):
|
|
apps = get_file_items(os.path.join(local.sites_path, "apps.txt"))
|
|
if with_webnotes:
|
|
apps.insert(0, 'webnotes')
|
|
return apps
|
|
|
|
def get_installed_apps():
|
|
def load_installed_apps():
|
|
return json.loads(conn.get_global("installed_apps") or "[]")
|
|
return cache().get_value("installed_apps", load_installed_apps)
|
|
|
|
def get_hooks():
|
|
def load_app_hooks():
|
|
hooks = {}
|
|
for app in get_installed_apps():
|
|
for item in get_file_items(get_pymodule_path(app, "hooks.txt")):
|
|
key, value = item.split(None, 1)
|
|
hooks.setdefault(key, [])
|
|
hooks[key].append(value)
|
|
return hooks
|
|
|
|
return _dict(cache().get_value("app_hooks", load_app_hooks))
|
|
|
|
def setup_module_map():
|
|
_cache = cache()
|
|
local.app_modules = _cache.get_value("app_modules")
|
|
local.module_app = _cache.get_value("module_app")
|
|
|
|
if not local.app_modules:
|
|
local.module_app, local.app_modules = {}, {}
|
|
for app in get_app_list(True):
|
|
for module in get_module_list(app):
|
|
local.module_app[module] = app
|
|
local.app_modules.setdefault(app, [])
|
|
local.app_modules[app].append(module)
|
|
|
|
_cache.set_value("app_modules", local.app_modules)
|
|
_cache.set_value("module_app", local.module_app)
|
|
|
|
def get_file_items(path):
|
|
if os.path.exists(path):
|
|
with open(path, "r") as f:
|
|
return [p.strip() for p in f.read().split("\n") if p.strip() and not p.startswith("#")]
|
|
else:
|
|
return []
|
|
|
|
def get_attr(method_string):
|
|
modulename = '.'.join(method_string.split('.')[:-1])
|
|
methodname = method_string.split('.')[-1]
|
|
|
|
return getattr(get_module(modulename), methodname)
|
|
|
|
def make_property_setter(args):
|
|
args = _dict(args)
|
|
bean([{
|
|
'doctype': "Property Setter",
|
|
'doctype_or_field': args.doctype_or_field or "DocField",
|
|
'doc_type': args.doctype,
|
|
'field_name': args.fieldname,
|
|
'property': args.property,
|
|
'value': args.value,
|
|
'property_type': args.property_type or "Data",
|
|
'__islocal': 1
|
|
}]).save()
|
|
|
|
def get_application_home_page(user='Guest'):
|
|
"""get home page for user"""
|
|
hpl = conn.sql("""select home_page
|
|
from `tabDefault Home Page`
|
|
where parent='Control Panel'
|
|
and role in ('%s') order by idx asc limit 1""" % "', '".join(get_roles(user)))
|
|
if hpl:
|
|
return hpl[0][0]
|
|
else:
|
|
return conn.get_value("Control Panel", None, "home_page")
|
|
|
|
def copy_doclist(in_doclist):
|
|
new_doclist = []
|
|
parent_doc = None
|
|
for i, d in enumerate(in_doclist):
|
|
is_dict = False
|
|
if isinstance(d, dict):
|
|
is_dict = True
|
|
values = _dict(d.copy())
|
|
else:
|
|
values = _dict(d.fields.copy())
|
|
|
|
newd = new_doc(values.doctype, parent_doc=(None if i==0 else parent_doc), parentfield=values.parentfield)
|
|
newd.fields.update(values)
|
|
|
|
if i==0:
|
|
parent_doc = newd
|
|
|
|
new_doclist.append(newd.fields if is_dict else newd)
|
|
|
|
return doclist(new_doclist)
|
|
|
|
def compare(val1, condition, val2):
|
|
import webnotes.utils
|
|
return webnotes.utils.compare(val1, condition, val2)
|
|
|
|
def repsond_as_web_page(title, html):
|
|
local.message_title = title
|
|
local.message = "<h3>" + title + "</h3>" + html
|
|
local.response['type'] = 'page'
|
|
local.response['page_name'] = 'message.html'
|
|
|
|
def build_match_conditions(doctype, fields=None, as_condition=True):
|
|
import webnotes.widgets.reportview
|
|
return webnotes.widgets.reportview.build_match_conditions(doctype, fields, as_condition)
|
|
|
|
def get_list(doctype, filters=None, fields=None, docstatus=None,
|
|
group_by=None, order_by=None, limit_start=0, limit_page_length=None,
|
|
as_list=False, debug=False):
|
|
import webnotes.widgets.reportview
|
|
return webnotes.widgets.reportview.execute(doctype, filters=filters, fields=fields, docstatus=docstatus,
|
|
group_by=group_by, order_by=order_by, limit_start=limit_start, limit_page_length=limit_page_length,
|
|
as_list=as_list, debug=debug)
|
|
|
|
jenv = None
|
|
|
|
def get_jenv():
|
|
global jenv
|
|
if not jenv:
|
|
from jinja2 import Environment, ChoiceLoader, PackageLoader
|
|
from webnotes.utils import get_base_path, global_date_format
|
|
from markdown2 import markdown
|
|
from json import dumps
|
|
|
|
# webnotes will be loaded last, so app templates will get precedence
|
|
jenv = Environment(loader = ChoiceLoader([PackageLoader(app, ".") \
|
|
for app in get_app_list() + ["webnotes"]]))
|
|
|
|
jenv.filters["global_date_format"] = global_date_format
|
|
jenv.filters["markdown"] = markdown
|
|
jenv.filters["json"] = dumps
|
|
|
|
return jenv
|
|
|
|
def get_template(path):
|
|
return get_jenv().get_template(path)
|