For eg. when a System User is using Outlook and replies to an email from their own client, it reaches the Email Account with the threading info lost and the (sender + subject match) doesn't work because the sender in the first communication was someone different to whom the system user is replying to via the common email account in Frappe. This fix bypasses the sender match when the sender is a system user and subject is atleast 10 chars long (for additional safety)
290 lines
8.1 KiB
Python
290 lines
8.1 KiB
Python
# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors
|
|
# MIT License. See license.txt
|
|
|
|
from __future__ import unicode_literals
|
|
|
|
import frappe, json
|
|
from frappe import _dict
|
|
import frappe.share
|
|
|
|
class User:
|
|
"""
|
|
A user permission object can be accessed as `frappe.get_user()`
|
|
"""
|
|
def __init__(self, name=''):
|
|
self.defaults = None
|
|
self.name = name or frappe.session.get('user')
|
|
self.roles = []
|
|
|
|
self.all_read = []
|
|
self.can_create = []
|
|
self.can_read = []
|
|
self.can_write = []
|
|
self.can_cancel = []
|
|
self.can_delete = []
|
|
self.can_search = []
|
|
self.can_get_report = []
|
|
self.can_import = []
|
|
self.can_export = []
|
|
self.can_print = []
|
|
self.can_email = []
|
|
self.can_set_user_permissions = []
|
|
self.allow_modules = []
|
|
self.in_create = []
|
|
self.setup_user()
|
|
|
|
def setup_user(self):
|
|
def get_user_doc():
|
|
user = None
|
|
try:
|
|
user = frappe.get_doc("User", self.name).as_dict()
|
|
except frappe.DoesNotExistError:
|
|
pass
|
|
except Exception, e:
|
|
# install boo-boo
|
|
if e.args[0] != 1146: raise
|
|
|
|
return user
|
|
|
|
if not frappe.flags.in_install_db and not frappe.flags.in_test:
|
|
user_doc = frappe.cache().hget("user_doc", self.name, get_user_doc)
|
|
if user_doc:
|
|
self.doc = frappe.get_doc(user_doc)
|
|
|
|
def get_roles(self):
|
|
"""get list of roles"""
|
|
if not self.roles:
|
|
self.roles = get_roles(self.name)
|
|
return self.roles
|
|
|
|
def get_block_modules(self):
|
|
"""Returns list of blocked modules"""
|
|
return [d.module for d in self.doc.block_modules] if self.doc.block_modules else []
|
|
|
|
def build_doctype_map(self):
|
|
"""build map of special doctype properties"""
|
|
|
|
self.doctype_map = {}
|
|
for r in frappe.db.sql("""select name, in_create, issingle, istable,
|
|
read_only, module from tabDocType""", as_dict=1):
|
|
self.doctype_map[r['name']] = r
|
|
|
|
def build_perm_map(self):
|
|
"""build map of permissions at level 0"""
|
|
self.perm_map = {}
|
|
roles = self.get_roles()
|
|
for r in frappe.db.sql("""select * from tabDocPerm where docstatus=0
|
|
and ifnull(permlevel,0)=0
|
|
and role in ({roles})""".format(roles=", ".join(["%s"]*len(roles))), tuple(roles), as_dict=1):
|
|
dt = r['parent']
|
|
|
|
if not dt in self.perm_map:
|
|
self.perm_map[dt] = {}
|
|
|
|
for k in frappe.permissions.rights:
|
|
if not self.perm_map[dt].get(k):
|
|
self.perm_map[dt][k] = r.get(k)
|
|
|
|
def build_permissions(self):
|
|
"""build lists of what the user can read / write / create
|
|
quirks:
|
|
read_only => Not in Search
|
|
in_create => Not in create
|
|
"""
|
|
self.build_doctype_map()
|
|
self.build_perm_map()
|
|
user_shared = frappe.share.get_shared_doctypes()
|
|
|
|
for dt in self.doctype_map:
|
|
dtp = self.doctype_map[dt]
|
|
p = self.perm_map.get(dt, {})
|
|
|
|
if not p.get("read") and (dt in user_shared):
|
|
p["read"] = 1
|
|
|
|
if not dtp.get('istable'):
|
|
if p.get('create') and not dtp.get('issingle'):
|
|
if dtp.get('in_create'):
|
|
self.in_create.append(dt)
|
|
else:
|
|
self.can_create.append(dt)
|
|
elif p.get('write'):
|
|
self.can_write.append(dt)
|
|
elif p.get('read'):
|
|
if dtp.get('read_only'):
|
|
self.all_read.append(dt)
|
|
else:
|
|
self.can_read.append(dt)
|
|
|
|
if p.get('cancel'):
|
|
self.can_cancel.append(dt)
|
|
|
|
if p.get('delete'):
|
|
self.can_delete.append(dt)
|
|
|
|
if (p.get('read') or p.get('write') or p.get('create')):
|
|
if p.get('report'):
|
|
self.can_get_report.append(dt)
|
|
for key in ("import", "export", "print", "email", "set_user_permissions"):
|
|
if p.get(key):
|
|
getattr(self, "can_" + key).append(dt)
|
|
|
|
if not dtp.get('istable'):
|
|
if not dtp.get('issingle') and not dtp.get('read_only'):
|
|
self.can_search.append(dt)
|
|
if not dtp.get('module') in self.allow_modules:
|
|
self.allow_modules.append(dtp.get('module'))
|
|
|
|
self.can_write += self.can_create
|
|
self.can_write += self.in_create
|
|
self.can_read += self.can_write
|
|
|
|
self.shared = frappe.db.sql_list("""select distinct share_doctype from `tabDocShare`
|
|
where `user`=%s and `read`=1""", self.name)
|
|
self.can_read = list(set(self.can_read + self.shared))
|
|
|
|
self.all_read += self.can_read
|
|
|
|
if "System Manager" in self.roles:
|
|
self.can_import = frappe.db.sql_list("""select name from `tabDocType`
|
|
where allow_import = 1""")
|
|
|
|
def get_defaults(self):
|
|
import frappe.defaults
|
|
self.defaults = frappe.defaults.get_defaults(self.name)
|
|
return self.defaults
|
|
|
|
# update recent documents
|
|
def update_recent(self, dt, dn):
|
|
rdl = frappe.cache().hget("user_recent", self.name) or []
|
|
new_rd = [dt, dn]
|
|
|
|
# clear if exists
|
|
for i in range(len(rdl)):
|
|
rd = rdl[i]
|
|
if rd==new_rd:
|
|
del rdl[i]
|
|
break
|
|
|
|
if len(rdl) > 19:
|
|
rdl = rdl[:19]
|
|
|
|
rdl = [new_rd] + rdl
|
|
|
|
frappe.cache().hset("user_recent", self.name, rdl)
|
|
|
|
def _get(self, key):
|
|
if not self.can_read:
|
|
self.build_permissions()
|
|
return getattr(self, key)
|
|
|
|
def get_can_read(self):
|
|
"""return list of doctypes that the user can read"""
|
|
if not self.can_read:
|
|
self.build_permissions()
|
|
return self.can_read
|
|
|
|
def load_user(self):
|
|
d = frappe.db.sql("""select email, first_name, last_name,
|
|
email_signature, user_type, language, background_image, background_style
|
|
from tabUser where name = %s""", (self.name,), as_dict=1)[0]
|
|
|
|
if not self.can_read:
|
|
self.build_permissions()
|
|
|
|
d.name = self.name
|
|
d.recent = json.dumps(frappe.cache().hget("user_recent", self.name) or [])
|
|
|
|
d.roles = self.get_roles()
|
|
d.defaults = self.get_defaults()
|
|
d.block_modules = self.get_block_modules()
|
|
|
|
for key in ("can_create", "can_write", "can_read", "can_cancel", "can_delete",
|
|
"can_get_report", "allow_modules", "all_read", "can_search",
|
|
"in_create", "can_export", "can_import", "can_print", "can_email",
|
|
"can_set_user_permissions"):
|
|
|
|
d[key] = list(set(getattr(self, key)))
|
|
|
|
return d
|
|
|
|
def get_user_fullname(user):
|
|
fullname = frappe.db.sql("SELECT CONCAT_WS(' ', first_name, last_name) FROM `tabUser` WHERE name=%s", (user,))
|
|
return fullname and fullname[0][0] or ''
|
|
|
|
def get_fullname_and_avatar(user):
|
|
first_name, last_name, avatar = frappe.db.get_value("User",
|
|
user, ["first_name", "last_name", "user_image"])
|
|
return _dict({
|
|
"fullname": " ".join(filter(None, [first_name, last_name])),
|
|
"avatar": avatar
|
|
})
|
|
|
|
def get_system_managers(only_name=False):
|
|
"""returns all system manager's user details"""
|
|
import email.utils
|
|
from frappe.core.doctype.user.user import STANDARD_USERS
|
|
system_managers = frappe.db.sql("""select distinct name,
|
|
concat_ws(" ", if(first_name="", null, first_name), if(last_name="", null, last_name))
|
|
as fullname from tabUser p
|
|
where docstatus < 2 and enabled = 1
|
|
and name not in ({})
|
|
and exists (select * from tabUserRole ur
|
|
where ur.parent = p.name and ur.role="System Manager")""".format(", ".join(["%s"]*len(STANDARD_USERS))),
|
|
STANDARD_USERS, as_dict=True)
|
|
|
|
if only_name:
|
|
return [p.name for p in system_managers]
|
|
else:
|
|
return [email.utils.formataddr((p.fullname, p.name)) for p in system_managers]
|
|
|
|
def add_role(user, role):
|
|
frappe.get_doc("User", user).add_roles(role)
|
|
|
|
def add_system_manager(email, first_name=None, last_name=None):
|
|
# add user
|
|
user = frappe.new_doc("User")
|
|
user.update({
|
|
"name": email,
|
|
"email": email,
|
|
"enabled": 1,
|
|
"first_name": first_name or email,
|
|
"last_name": last_name,
|
|
"user_type": "System User"
|
|
})
|
|
user.insert()
|
|
|
|
# add roles
|
|
roles = frappe.db.sql_list("""select name from `tabRole`
|
|
where name not in ("Administrator", "Guest", "All")""")
|
|
user.add_roles(*roles)
|
|
|
|
def get_roles(user=None, with_standard=True):
|
|
"""get roles of current user"""
|
|
if not user:
|
|
user = frappe.session.user
|
|
|
|
if user=='Guest':
|
|
return ['Guest']
|
|
|
|
def get():
|
|
return [r[0] for r in frappe.db.sql("""select role from tabUserRole
|
|
where parent=%s and role not in ('All', 'Guest')""", (user,))] + ['All', 'Guest']
|
|
|
|
roles = frappe.cache().hget("roles", user, get)
|
|
|
|
# filter standard if required
|
|
if not with_standard:
|
|
roles = filter(lambda x: x not in ['All', 'Guest', 'Administrator'], roles)
|
|
|
|
return roles
|
|
|
|
def get_enabled_system_users():
|
|
return frappe.db.sql("""select * from tabUser where
|
|
user_type='System User' and enabled=1 and name not in ('Administrator', 'Guest')""", as_dict=1)
|
|
|
|
def is_website_user():
|
|
return frappe.get_user().doc.user_type == "Website User"
|
|
|
|
def is_system_user(username):
|
|
return frappe.db.get_value("User", {"name": username, "enabled": 1, "user_type": "System User"})
|