Merge branch 'develop' into load-address-and-contact-display

This commit is contained in:
Raffael Meyer 2023-02-09 12:23:20 +01:00 committed by GitHub
commit 38ea101435
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
27 changed files with 411 additions and 271 deletions

View file

@ -570,7 +570,7 @@ def get_user():
def get_roles(username=None) -> list[str]:
"""Returns roles of current user."""
if not local.session:
if not local.session or not local.session.user:
return ["Guest"]
import frappe.permissions

View file

@ -3,16 +3,15 @@
"""
bootstrap client session
"""
import frappe
import frappe.defaults
import frappe.desk.desk_page
from frappe.core.doctype.navbar_settings.navbar_settings import get_app_logo, get_navbar_settings
from frappe.database.utils import Query
from frappe.desk.doctype.route_history.route_history import frequently_visited_links
from frappe.desk.form.load import get_meta_bundle
from frappe.email.inbox import get_email_accounts
from frappe.model.base_document import get_controller
from frappe.model.db_query import DatabaseQuery
from frappe.query_builder import DocType
from frappe.query_builder.functions import Count
from frappe.query_builder.terms import ParameterizedValueWrapper, SubQuery
@ -170,7 +169,6 @@ def get_user_pages_or_reports(parent, cache=False):
parentTable = DocType(parent)
# get pages or reports set on custom role
# must end in a WHERE clause for `_run_with_permission_query`
pages_with_custom_roles = (
frappe.qb.from_(customRole)
.from_(hasRole)
@ -184,8 +182,7 @@ def get_user_pages_or_reports(parent, cache=False):
& (customRole[parent.lower()].isnotnull())
& (hasRole.role.isin(roles))
)
)
pages_with_custom_roles = _run_with_permission_query(pages_with_custom_roles, parent)
).run(as_dict=True)
for p in pages_with_custom_roles:
has_role[p.name] = {"modified": p.modified, "title": p.title, "ref_doctype": p.ref_doctype}
@ -196,7 +193,6 @@ def get_user_pages_or_reports(parent, cache=False):
.where(customRole[parent.lower()].isnotnull())
)
# must end in a WHERE clause for `_run_with_permission_query`
pages_with_standard_roles = (
frappe.qb.from_(hasRole)
.from_(parentTable)
@ -212,7 +208,7 @@ def get_user_pages_or_reports(parent, cache=False):
if parent == "Report":
pages_with_standard_roles = pages_with_standard_roles.where(report.disabled == 0)
pages_with_standard_roles = _run_with_permission_query(pages_with_standard_roles, parent)
pages_with_standard_roles = pages_with_standard_roles.run(as_dict=True)
for p in pages_with_standard_roles:
if p.name not in has_role:
@ -226,13 +222,12 @@ def get_user_pages_or_reports(parent, cache=False):
# pages with no role are allowed
if parent == "Page":
# must end in a WHERE clause for `_run_with_permission_query`
pages_with_no_roles = (
frappe.qb.from_(parentTable)
.select(parentTable.name, parentTable.modified, *columns)
.where(no_of_roles == 0)
)
pages_with_no_roles = _run_with_permission_query(pages_with_no_roles, parent)
).run(as_dict=True)
for p in pages_with_no_roles:
if p.name not in has_role:
@ -253,17 +248,6 @@ def get_user_pages_or_reports(parent, cache=False):
return has_role
def _run_with_permission_query(query: "Query", doctype: str) -> list[dict]:
"""
Adds Permission Query (Server Script) conditions and runs/executes modified query
Note: Works only if 'WHERE' is the last clause in the query
"""
permission_query = DatabaseQuery(doctype, frappe.session.user).get_permission_query_conditions()
if permission_query and frappe.session.user != "Administrator":
return frappe.db.sql(f"{query} AND {permission_query}", as_dict=True)
return query.run(as_dict=True)
def load_translations(bootinfo):
bootinfo["lang"] = frappe.lang
bootinfo["__messages"] = get_messages_for_boot()

View file

@ -592,6 +592,8 @@ def disable_user(context, email):
@pass_context
def migrate(context, skip_failing=False, skip_search_index=False):
"Run patches, sync schema and rebuild files/translations"
from traceback_with_variables import activate_by_import
from frappe.migrate import SiteMigration
for site in context.sites:

View file

@ -8,7 +8,13 @@ from frappe.utils import cstr
class AccessLog(Document):
pass
@staticmethod
def clear_old_logs(days=30):
from frappe.query_builder import Interval
from frappe.query_builder.functions import Now
table = frappe.qb.DocType("Access Log")
frappe.db.delete(table, filters=(table.modified < (Now() - Interval(days=days))))
@frappe.whitelist()

View file

@ -487,28 +487,32 @@ def parse_email(communication, email_strings):
"""
Parse email to add timeline links.
When automatic email linking is enabled, an email from email_strings can contain
a doctype and docname ie in the format `admin+doctype+docname@example.com`,
a doctype and docname ie in the format `admin+doctype+docname@example.com` or `admin+doctype=docname@example.com`,
the email is parsed and doctype and docname is extracted and timeline link is added.
"""
if not frappe.get_all("Email Account", filters={"enable_automatic_linking": 1}):
if not frappe.db.get_value("Email Account", filters={"enable_automatic_linking": 1}):
return
delimiter = "+"
for email_string in email_strings:
if email_string:
for email in email_string.split(","):
if delimiter in email:
email = email.split("@", 1)[0]
email_local_parts = email.split(delimiter)
if not len(email_local_parts) == 3:
continue
email_username = email.split("@", 1)[0]
email_local_parts = email_username.split("+")
docname = doctype = None
if len(email_local_parts) == 3:
doctype = unquote(email_local_parts[1])
docname = unquote(email_local_parts[2])
if doctype and docname and frappe.db.exists(doctype, docname):
communication.add_link(doctype, docname)
elif len(email_local_parts) == 2:
document_parts = email_local_parts[1].split("=", 1)
if len(document_parts) != 2:
continue
doctype = unquote(document_parts[0])
docname = unquote(document_parts[1])
if doctype and docname and frappe.db.get_value(doctype, docname, ignore=True):
communication.add_link(doctype, docname)
def get_email_without_link(email):

View file

@ -219,17 +219,17 @@ class TestCommunication(FrappeTestCase):
self.assertIn(comm_note_2.name, data)
def test_link_in_email(self):
frappe.delete_doc_if_exists("Note", "test document link in email")
create_email_account()
note = frappe.get_doc(
{
"doctype": "Note",
"title": "test document link in email",
"content": "test document link in email",
}
).insert(ignore_permissions=True)
notes = {}
for i in range(2):
frappe.delete_doc_if_exists("Note", f"test document link in email {i}")
notes[i] = frappe.get_doc(
{
"doctype": "Note",
"title": f"test document link in email {i}",
}
).insert(ignore_permissions=True)
comm = frappe.get_doc(
{
@ -237,14 +237,15 @@ class TestCommunication(FrappeTestCase):
"communication_medium": "Email",
"subject": "Document Link in Email",
"sender": "comm_sender@example.com",
"recipients": f'comm_recipient+{quote("Note")}+{quote(note.name)}@example.com',
"recipients": f'comm_recipient+{quote("Note")}+{quote(notes[0].name)}@example.com,comm_recipient+{quote("Note")}={quote(notes[1].name)}@example.com',
}
).insert(ignore_permissions=True)
doc_links = [
(timeline_link.link_doctype, timeline_link.link_name) for timeline_link in comm.timeline_links
]
self.assertIn(("Note", note.name), doc_links)
self.assertIn(("Note", notes[0].name), doc_links)
self.assertIn(("Note", notes[1].name), doc_links)
def test_parse_emails(self):
emails = get_emails(

View file

@ -526,13 +526,14 @@ class TestDocType(FrappeTestCase):
self.assertRaises(InvalidFieldNameError, validate_links_table_fieldnames, doc)
def test_create_virtual_doctype(self):
"""Test virtual DOcTYpe."""
"""Test virtual DocType."""
virtual_doc = new_doctype("Test Virtual Doctype")
virtual_doc.is_virtual = 1
virtual_doc.insert()
virtual_doc.save()
virtual_doc.insert(ignore_if_duplicate=True)
virtual_doc.reload()
doc = frappe.get_doc("DocType", "Test Virtual Doctype")
self.assertDictEqual(doc.as_dict(), virtual_doc.as_dict())
self.assertEqual(doc.is_virtual, 1)
self.assertFalse(frappe.db.table_exists("Test Virtual Doctype"))

View file

@ -20,6 +20,7 @@ import frappe.defaults
import frappe.model.meta
from frappe import _
from frappe.database.utils import (
DefaultOrderBy,
EmptyQueryValues,
FallBackDateTimeStr,
LazyMogrify,
@ -422,7 +423,7 @@ class Database:
ignore=None,
as_dict=False,
debug=False,
order_by="KEEP_DEFAULT_ORDERING",
order_by=DefaultOrderBy,
cache=False,
for_update=False,
*,
@ -492,7 +493,7 @@ class Database:
ignore=None,
as_dict=False,
debug=False,
order_by="KEEP_DEFAULT_ORDERING",
order_by=DefaultOrderBy,
update=None,
cache=False,
for_update=False,
@ -551,7 +552,7 @@ class Database:
if (filters is not None) and (filters != doctype or doctype == "DocType"):
try:
if order_by:
order_by = "modified" if order_by == "KEEP_DEFAULT_ORDERING" else order_by
order_by = "modified" if order_by == DefaultOrderBy else order_by
out = self._get_values_from_table(
fields=fields,
filters=filters,

View file

@ -116,6 +116,7 @@ CREATE TABLE `tabDocPerm` (
-- Table structure for table `tabDocType Action`
--
DROP TABLE IF EXISTS `tabDocType Action`;
CREATE TABLE `tabDocType Action` (
`name` varchar(140) COLLATE utf8mb4_unicode_ci NOT NULL,
`creation` datetime(6) DEFAULT NULL,
@ -137,9 +138,10 @@ CREATE TABLE `tabDocType Action` (
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci ROW_FORMAT=DYNAMIC;
--
-- Table structure for table `tabDocType Action`
-- Table structure for table `tabDocType Link`
--
DROP TABLE IF EXISTS `tabDocType Link`;
CREATE TABLE `tabDocType Link` (
`name` varchar(140) COLLATE utf8mb4_unicode_ci NOT NULL,
`creation` datetime(6) DEFAULT NULL,

View file

@ -10,7 +10,7 @@ from pypika.queries import QueryBuilder, Table
import frappe
from frappe import _
from frappe.database.operator_map import OPERATOR_MAP
from frappe.database.utils import get_doctype_name
from frappe.database.utils import DefaultOrderBy, get_doctype_name
from frappe.query_builder import Criterion, Field, Order, functions
from frappe.query_builder.functions import Function, SqlFunctions
from frappe.query_builder.utils import PseudoColumnMapper
@ -314,7 +314,7 @@ class Engine:
return _fields
def apply_order_by(self, order_by: str | None):
if not order_by or order_by == "KEEP_DEFAULT_ORDERING":
if not order_by or order_by == DefaultOrderBy:
return
for declaration in order_by.split(","):
if _order_by := declaration.strip():

View file

@ -17,7 +17,7 @@ QueryValues = tuple | list | dict | NoneType
EmptyQueryValues = object()
FallBackDateTimeStr = "0001-01-01 00:00:00.000000"
DefaultOrderBy = "KEEP_DEFAULT_ORDERING"
NestedSetHierarchy = (
"ancestors of",
"descendants of",

View file

@ -403,7 +403,7 @@ def get_document_email(doctype, name):
return None
email = email.split("@")
return f"{email[0]}+{quote(doctype)}+{quote(cstr(name))}@{email[1]}"
return f"{email[0]}+{quote(doctype)}={quote(cstr(name))}@{email[1]}"
def get_automatic_email_link():

View file

@ -9,16 +9,14 @@ def validate_route_conflict(doctype, name):
Raises exception if name clashes with routes from other documents for /app routing
"""
if frappe.flags.in_migrate:
return
all_names = []
for _doctype in ["Page", "Workspace", "DocType"]:
try:
all_names.extend(
[
slug(d) for d in frappe.get_all(_doctype, pluck="name") if (doctype != _doctype and d != name)
]
)
except frappe.db.TableMissingError:
pass
all_names.extend(
[slug(d) for d in frappe.get_all(_doctype, pluck="name") if (doctype != _doctype and d != name)]
)
if slug(name) in all_names:
frappe.msgprint(frappe._("Name already taken, please set a new name"))

View file

@ -39,7 +39,7 @@ class EmailQueue(Document):
def set_recipients(self, recipients):
self.set("recipients", [])
for r in recipients:
self.append("recipients", {"recipient": r, "status": "Not Sent"})
self.append("recipients", {"recipient": r.strip(), "status": "Not Sent"})
def on_trash(self):
self.prevent_email_queue_delete()
@ -711,7 +711,7 @@ class QueueBuilder:
"attachments": json.dumps(self.get_attachments()),
"message_id": get_string_between("<", mail.msg_root["Message-Id"], ">"),
"message": mail_to_string,
"sender": self.sender,
"sender": mail.sender,
"reference_doctype": self.reference_doctype,
"reference_name": self.reference_name,
"add_unsubscribe_link": self._add_unsubscribe_link,

View file

@ -99,7 +99,7 @@ def get_unsubcribed_url(
@frappe.whitelist(allow_guest=True)
def unsubscribe(doctype, name, email):
# unsubsribe from comments and communications
if not verify_request():
if not frappe.flags.in_test and not verify_request():
return
try:

View file

@ -43,7 +43,7 @@ def get_controller(doctype):
:param doctype: DocType name as string.
"""
if frappe.local.dev_server:
if frappe.local.dev_server or frappe.flags.in_migrate:
return import_controller(doctype)
site_controllers = frappe.controllers.setdefault(frappe.local.site, {})
@ -59,11 +59,11 @@ def import_controller(doctype):
module_name = "Core"
if doctype not in DOCTYPES_FOR_DOCTYPE:
meta = frappe.get_meta(doctype)
if meta.custom:
return NestedSet if meta.get("is_tree") else Document
module_name = meta.module
doctype_info = frappe.db.get_value("DocType", doctype, fieldname="*")
if doctype_info:
if doctype_info.custom:
return NestedSet if doctype_info.is_tree else Document
module_name = doctype_info.module
module_path = None
class_overrides = frappe.get_hooks("override_doctype_class")

View file

@ -13,9 +13,10 @@ import frappe.permissions
import frappe.share
from frappe import _
from frappe.core.doctype.server_script.server_script_utils import get_server_script_map
from frappe.database.utils import FallBackDateTimeStr, NestedSetHierarchy
from frappe.database.utils import DefaultOrderBy, FallBackDateTimeStr, NestedSetHierarchy
from frappe.model import get_permitted_fields, optional_fields
from frappe.model.meta import get_table_columns
from frappe.model.utils import is_virtual_doctype
from frappe.model.utils.user_settings import get_user_settings, update_user_settings
from frappe.query_builder.utils import Column
from frappe.utils import (
@ -80,7 +81,7 @@ class DatabaseQuery:
or_filters=None,
docstatus=None,
group_by=None,
order_by="KEEP_DEFAULT_ORDERING",
order_by=DefaultOrderBy,
limit_start=False,
limit_page_length=None,
as_list=False,
@ -171,6 +172,21 @@ class DatabaseQuery:
if user_settings:
self.user_settings = json.loads(user_settings)
if is_virtual_doctype(self.doctype):
from frappe.model.base_document import get_controller
controller = get_controller(self.doctype)
self.parse_args()
kwargs = {
"as_list": as_list,
"with_comment_count": with_comment_count,
"save_user_settings": save_user_settings,
"save_user_settings_fields": save_user_settings_fields,
"pluck": pluck,
"parent_doctype": parent_doctype,
} | self.__dict__
return controller.get_list(kwargs)
self.columns = self.get_table_columns()
# no table & ignore_ddl, return

View file

@ -232,7 +232,11 @@ def set_naming_from_document_naming_rule(doc):
"""
Evaluate rules based on "Document Naming Series" doctype
"""
if doc.doctype in log_types:
from frappe.model.base_document import DOCTYPES_FOR_DOCTYPE
IGNORED_DOCTYPES = {*log_types, *DOCTYPES_FOR_DOCTYPE, "DefaultValue", "Patch Log"}
if doc.doctype in IGNORED_DOCTYPES:
return
document_naming_rules = frappe.cache_manager.get_doctype_map(

View file

@ -397,7 +397,11 @@ def rename_doctype(doctype: str, old: str, new: str) -> None:
def update_child_docs(old: str, new: str, meta: "Meta") -> None:
# update "parent"
for df in meta.get_table_fields():
frappe.qb.update(df.options).set("parent", new).where(Field("parent") == old).run()
(
frappe.qb.update(df.options)
.set("parent", new)
.where((Field("parent") == old) & (Field("parenttype") == meta.name))
).run()
def update_link_field_values(link_fields: list[dict], old: str, new: str, doctype: str) -> None:

View file

@ -129,5 +129,7 @@ def get_fetch_values(doctype, fieldname, value):
@site_cache()
def is_virtual_doctype(doctype):
return frappe.db.get_value("DocType", doctype, "is_virtual")
def is_virtual_doctype(doctype: str):
if frappe.db.has_column("DocType", "is_virtual"):
return frappe.db.get_value("DocType", doctype, "is_virtual")
return False

View file

@ -5,22 +5,27 @@ import frappe
def execute():
if frappe.db.table_exists("List View Setting"):
if not frappe.db.table_exists("List View Settings"):
frappe.reload_doc("desk", "doctype", "List View Settings")
if not frappe.db.table_exists("List View Setting"):
return
if not frappe.db.exists("DocType", "List View Setting"):
return
existing_list_view_settings = frappe.get_all("List View Settings", as_list=True)
for list_view_setting in frappe.get_all(
"List View Setting",
fields=["disable_count", "disable_sidebar_stats", "disable_auto_refresh", "name"],
):
name = list_view_setting.pop("name")
if name not in [x[0] for x in existing_list_view_settings]:
list_view_setting["doctype"] = "List View Settings"
list_view_settings = frappe.get_doc(list_view_setting)
# setting name here is necessary because autoname is set as prompt
list_view_settings.name = name
list_view_settings.insert()
frappe.reload_doc("desk", "doctype", "List View Settings")
frappe.delete_doc("DocType", "List View Setting", force=True)
frappe.db.commit()
existing_list_view_settings = frappe.get_all(
"List View Settings", as_list=True, order_by="modified"
)
for list_view_setting in frappe.get_all(
"List View Setting",
fields=["disable_count", "disable_sidebar_stats", "disable_auto_refresh", "name"],
order_by="modified",
):
name = list_view_setting.pop("name")
if name not in [x[0] for x in existing_list_view_settings]:
list_view_setting["doctype"] = "List View Settings"
list_view_settings = frappe.get_doc(list_view_setting)
# setting name here is necessary because autoname is set as prompt
list_view_settings.name = name
list_view_settings.insert()
frappe.delete_doc("DocType", "List View Setting", force=True)

View file

@ -413,7 +413,7 @@ def get_roles(user=None, with_standard=True):
if not user:
user = frappe.session.user
if user == "Guest":
if user == "Guest" or not user:
return ["Guest"]
def get():

View file

@ -1320,11 +1320,17 @@ frappe.views.ListView = class ListView extends frappe.views.BaseList {
}
setup_realtime_updates() {
this.pending_document_refreshes = [];
if (this.list_view_settings && this.list_view_settings.disable_auto_refresh) {
return;
}
frappe.socketio.doctype_subscribe(this.doctype);
frappe.realtime.on("list_update", (data) => {
if (data?.doctype !== this.doctype) {
return;
}
if (!frappe.get_doc(data?.doctype, data?.name)?.__unsaved) {
frappe.model.remove_from_locals(data.doctype, data.name);
}
@ -1333,28 +1339,41 @@ frappe.views.ListView = class ListView extends frappe.views.BaseList {
return;
}
const { doctype, name } = data;
if (doctype !== this.doctype) return;
this.pending_document_refreshes.push(data);
frappe.utils.debounce(this.process_document_refreshes.bind(this), 1000)();
});
}
// filters to get only the doc with this name
const call_args = this.get_call_args();
call_args.args.filters.push([this.doctype, "name", "=", name]);
call_args.args.start = 0;
process_document_refreshes() {
if (!this.pending_document_refreshes.length) return;
frappe.call(call_args).then(({ message }) => {
if (!message) return;
const data = frappe.utils.dict(message.keys, message.values);
if (!(data && data.length)) {
// this doc was changed and should not be visible
// in the listview according to filters applied
// let's remove it manually
this.data = this.data.filter((d) => d.name !== name);
this.render_list();
return;
}
const names = this.pending_document_refreshes.map((d) => d.name);
this.pending_document_refreshes = this.pending_document_refreshes.filter(
(d) => names.indexOf(d.name) === -1
);
const datum = data[0];
const index = this.data.findIndex((d) => d.name === datum.name);
if (!names.length) return;
// filters to get only the doc with this name
const call_args = this.get_call_args();
call_args.args.filters.push([this.doctype, "name", "in", names]);
call_args.args.start = 0;
frappe.call(call_args).then(({ message }) => {
if (!message) return;
const data = frappe.utils.dict(message.keys, message.values);
if (!(data && data.length)) {
// this doc was changed and should not be visible
// in the listview according to filters applied
// let's remove it manually
this.data = this.data.filter((d) => names.indexOf(d.name) === -1);
this.render_list();
return;
}
data.forEach((datum) => {
const index = this.data.findIndex((doc) => doc.name === datum.name);
if (index === -1) {
// append new data
@ -1363,31 +1382,31 @@ frappe.views.ListView = class ListView extends frappe.views.BaseList {
// update this data in place
this.data[index] = datum;
}
this.data.sort((a, b) => {
const a_value = a[this.sort_by] || "";
const b_value = b[this.sort_by] || "";
let return_value = 0;
if (a_value > b_value) {
return_value = 1;
}
if (b_value > a_value) {
return_value = -1;
}
if (this.sort_order === "desc") {
return_value = -return_value;
}
return return_value;
});
this.toggle_result_area();
this.render_list();
if (this.$checks && this.$checks.length) {
this.set_rows_as_checked();
}
});
this.data.sort((a, b) => {
const a_value = a[this.sort_by] || "";
const b_value = b[this.sort_by] || "";
let return_value = 0;
if (a_value > b_value) {
return_value = 1;
}
if (b_value > a_value) {
return_value = -1;
}
if (this.sort_order === "desc") {
return_value = -return_value;
}
return return_value;
});
if (this.$checks && this.$checks.length) {
this.set_rows_as_checked();
}
this.toggle_result_area();
this.render_list();
});
}

View file

@ -1,5 +1,5 @@
import frappe
from frappe.boot import get_unseen_notes, get_user_pages_or_reports
from frappe.boot import get_unseen_notes
from frappe.desk.doctype.note.note import mark_as_seen
from frappe.tests.utils import FrappeTestCase
@ -26,47 +26,3 @@ class TestBootData(FrappeTestCase):
mark_as_seen(note.name)
unseen_notes = [d.title for d in get_unseen_notes()]
self.assertListEqual(unseen_notes, [])
def test_get_user_pages_or_reports_with_permission_query(self):
# Create a ToDo custom report with admin user
frappe.set_user("Administrator")
frappe.get_doc(
{
"doctype": "Report",
"ref_doctype": "ToDo",
"report_name": "Test Admin Report",
"report_type": "Report Builder",
"is_standard": "No",
}
).insert()
# Add permission query such that each user can only see their own custom reports
frappe.get_doc(
dict(
doctype="Server Script",
name="test_report_permission_query",
script_type="Permission Query",
reference_doctype="Report",
script="""conditions = f"(`tabReport`.is_standard = 'Yes' or `tabReport`.owner = '{frappe.session.user}')"
""",
)
).insert()
# Create a ToDo custom report with test user
frappe.set_user("test@example.com")
frappe.get_doc(
{
"doctype": "Report",
"ref_doctype": "ToDo",
"report_name": "Test User Report",
"report_type": "Report Builder",
"is_standard": "No",
}
).insert(ignore_permissions=True)
get_user_pages_or_reports("Report")
allowed_reports = frappe.cache().get_value("has_role:Report", user=frappe.session.user)
# Test user must not see admin user's report
self.assertNotIn("Test Admin Report", allowed_reports)
self.assertIn("Test User Report", allowed_reports)

View file

@ -2,10 +2,13 @@
# License: MIT. See LICENSE
import datetime
from contextlib import contextmanager
from unittest.mock import MagicMock, patch
import frappe
from frappe.core.doctype.doctype.test_doctype import new_doctype
from frappe.core.page.permission_manager.permission_manager import add, reset, update
from frappe.custom.doctype.property_setter.property_setter import make_property_setter
from frappe.database.utils import DefaultOrderBy
from frappe.desk.reportview import get_filters_cond
from frappe.handler import execute_cmd
from frappe.model.db_query import DatabaseQuery
@ -43,7 +46,7 @@ def setup_patched_blog_post():
yield
class TestReportview(FrappeTestCase):
class TestDBQuery(FrappeTestCase):
def setUp(self):
frappe.set_user("Administrator")
@ -848,68 +851,6 @@ class TestReportview(FrappeTestCase):
fields=["blog_category.description"],
)
def test_reportview_get_permlevel_system_users(self):
with setup_patched_blog_post(), setup_test_user(set_user=True):
frappe.local.request = frappe._dict()
frappe.local.request.method = "POST"
frappe.local.form_dict = frappe._dict(
{
"doctype": "Blog Post",
"fields": ["published", "title", "`tabTest Child`.`test_field`"],
}
)
# even if * is passed, fields which are not accessible should be filtered out
response = execute_cmd("frappe.desk.reportview.get")
self.assertListEqual(response["keys"], ["title"])
frappe.local.form_dict = frappe._dict(
{
"doctype": "Blog Post",
"fields": ["*"],
}
)
response = execute_cmd("frappe.desk.reportview.get")
self.assertNotIn("published", response["keys"])
def test_reportview_get_admin(self):
# Admin should be able to see access all fields
with setup_patched_blog_post():
frappe.local.request = frappe._dict()
frappe.local.request.method = "POST"
frappe.local.form_dict = frappe._dict(
{
"doctype": "Blog Post",
"fields": ["published", "title", "`tabTest Child`.`test_field`"],
}
)
response = execute_cmd("frappe.desk.reportview.get")
self.assertListEqual(response["keys"], ["published", "title", "test_field"])
def test_reportview_get_aggregation(self):
# test aggregation based on child table field
frappe.local.request = frappe._dict()
frappe.local.request.method = "POST"
frappe.local.form_dict = frappe._dict(
{
"doctype": "DocType",
"fields": """["`tabDocField`.`label` as field_label","`tabDocField`.`name` as field_name"]""",
"filters": "[]",
"order_by": "_aggregate_column desc",
"start": 0,
"page_length": 20,
"view": "Report",
"with_comment_count": 0,
"group_by": "field_label, field_name",
"aggregate_on_field": "columns",
"aggregate_on_doctype": "DocField",
"aggregate_function": "sum",
}
)
response = execute_cmd("frappe.desk.reportview.get")
self.assertListEqual(response["keys"], ["field_label", "field_name", "_aggregate_column"])
def test_cast_name(self):
from frappe.core.doctype.doctype.test_doctype import new_doctype
@ -1007,6 +948,33 @@ class TestReportview(FrappeTestCase):
self.assertTrue(dashboard_settings)
def test_virtual_doctype(self):
"""Test that virtual doctypes can be queried using get_all"""
virtual_doctype = new_doctype("Virtual DocType")
virtual_doctype.is_virtual = 1
virtual_doctype.insert(ignore_if_duplicate=True)
class VirtualDocType:
@staticmethod
def get_list(args):
...
with patch("frappe.controllers", new={frappe.local.site: {"Virtual DocType": VirtualDocType}}):
VirtualDocType.get_list = MagicMock()
frappe.get_all("Virtual DocType", filters={"name": "test"}, fields=["name"], limit=1)
call_args = VirtualDocType.get_list.call_args[0][0]
VirtualDocType.get_list.assert_called_once()
self.assertIsInstance(call_args, dict)
self.assertEqual(call_args["doctype"], "Virtual DocType")
self.assertEqual(call_args["filters"], [["Virtual DocType", "name", "=", "test"]])
self.assertEqual(call_args["fields"], ["name"])
self.assertEqual(call_args["limit_page_length"], 1)
self.assertEqual(call_args["limit_start"], 0)
self.assertEqual(call_args["order_by"], DefaultOrderBy)
def test_coalesce_with_in_ops(self):
self.assertNotIn("ifnull", frappe.get_all("User", {"name": ("in", ["a", "b"])}, run=0))
self.assertIn("ifnull", frappe.get_all("User", {"name": ("in", ["a", None])}, run=0))
@ -1017,6 +985,129 @@ class TestReportview(FrappeTestCase):
self.assertIn("ifnull", frappe.get_all("User", {"name": ("not in", [""])}, run=0))
class TestReportView(FrappeTestCase):
def test_reportview_get(self):
user = frappe.get_doc("User", "test@example.com")
add_child_table_to_blog_post()
user_roles = frappe.get_roles()
user.remove_roles(*user_roles)
user.add_roles("Blogger")
make_property_setter("Blog Post", "published", "permlevel", 1, "Int")
reset("Blog Post")
add("Blog Post", "Website Manager", 1)
update("Blog Post", "Website Manager", 1, "write", 1)
frappe.set_user(user.name)
frappe.local.request = frappe._dict()
frappe.local.request.method = "POST"
frappe.local.form_dict = frappe._dict(
{
"doctype": "Blog Post",
"fields": ["published", "title", "`tabTest Child`.`test_field`"],
}
)
# even if * is passed, fields which are not accessible should be filtered out
response = execute_cmd("frappe.desk.reportview.get")
self.assertListEqual(response["keys"], ["title"])
frappe.local.form_dict = frappe._dict(
{
"doctype": "Blog Post",
"fields": ["*"],
}
)
response = execute_cmd("frappe.desk.reportview.get")
self.assertNotIn("published", response["keys"])
frappe.set_user("Administrator")
user.add_roles("Website Manager")
frappe.set_user(user.name)
frappe.set_user("Administrator")
# Admin should be able to see access all fields
frappe.local.form_dict = frappe._dict(
{
"doctype": "Blog Post",
"fields": ["published", "title", "`tabTest Child`.`test_field`"],
}
)
response = execute_cmd("frappe.desk.reportview.get")
self.assertListEqual(response["keys"], ["published", "title", "test_field"])
# reset user roles
user.remove_roles("Blogger", "Website Manager")
user.add_roles(*user_roles)
def test_reportview_get_aggregation(self):
# test aggregation based on child table field
frappe.local.request = frappe._dict()
frappe.local.request.method = "POST"
frappe.local.form_dict = frappe._dict(
{
"doctype": "DocType",
"fields": """["`tabDocField`.`label` as field_label","`tabDocField`.`name` as field_name"]""",
"filters": "[]",
"order_by": "_aggregate_column desc",
"start": 0,
"page_length": 20,
"view": "Report",
"with_comment_count": 0,
"group_by": "field_label, field_name",
"aggregate_on_field": "columns",
"aggregate_on_doctype": "DocField",
"aggregate_function": "sum",
}
)
response = execute_cmd("frappe.desk.reportview.get")
self.assertListEqual(response["keys"], ["field_label", "field_name", "_aggregate_column"])
def test_reportview_get_permlevel_system_users(self):
with setup_patched_blog_post(), setup_test_user(set_user=True):
frappe.local.request = frappe._dict()
frappe.local.request.method = "POST"
frappe.local.form_dict = frappe._dict(
{
"doctype": "Blog Post",
"fields": ["published", "title", "`tabTest Child`.`test_field`"],
}
)
# even if * is passed, fields which are not accessible should be filtered out
response = execute_cmd("frappe.desk.reportview.get")
self.assertListEqual(response["keys"], ["title"])
frappe.local.form_dict = frappe._dict(
{
"doctype": "Blog Post",
"fields": ["*"],
}
)
response = execute_cmd("frappe.desk.reportview.get")
self.assertNotIn("published", response["keys"])
def test_reportview_get_admin(self):
# Admin should be able to see access all fields
with setup_patched_blog_post():
frappe.local.request = frappe._dict()
frappe.local.request.method = "POST"
frappe.local.form_dict = frappe._dict(
{
"doctype": "Blog Post",
"fields": ["published", "title", "`tabTest Child`.`test_field`"],
}
)
response = execute_cmd("frappe.desk.reportview.get")
self.assertListEqual(response["keys"], ["published", "title", "test_field"])
def add_child_table_to_blog_post():
child_table = frappe.get_doc(
{
@ -1040,7 +1131,7 @@ def create_event(subject="_Test Event", starts_on=None):
from frappe.utils import get_datetime
event = frappe.get_doc(
return frappe.get_doc(
{
"doctype": "Event",
"subject": subject,
@ -1049,8 +1140,6 @@ def create_event(subject="_Test Event", starts_on=None):
}
).insert(ignore_permissions=True)
return event
def create_nested_doctype():
if frappe.db.exists("DocType", "Nested DocType"):

View file

@ -3,9 +3,11 @@
import email
import re
from unittest.mock import patch
import frappe
from frappe.email.doctype.email_account.test_email_account import TestEmailAccount
from frappe.email.doctype.email_queue.email_queue import QueueBuilder
from frappe.tests.utils import FrappeTestCase
test_dependencies = ["Email Account"]
@ -228,12 +230,40 @@ class TestEmail(FrappeTestCase):
self.assertTrue("test1@example.com" in queue_recipients)
self.assertEqual(len(queue_recipients), 2)
def test_sender(self):
def _patched_assertion(email_account, assertion):
with patch.object(QueueBuilder, "get_outgoing_email_account", return_value=email_account):
frappe.sendmail(
recipients=["test1@example.com"],
sender="admin@example.com",
subject="Test Email Queue",
message="This mail is queued!",
now=True,
)
email_queue_sender = frappe.db.get_value("Email Queue", {"status": "Sent"}, "sender")
self.assertEqual(email_queue_sender, assertion)
email_account = frappe.get_doc("Email Account", "_Test Email Account 1")
email_account.default_outgoing = 1
email_account.always_use_account_name_as_sender_name = 0
email_account.always_use_account_email_id_as_sender = 0
_patched_assertion(email_account, "admin@example.com")
email_account.always_use_account_name_as_sender_name = 1
_patched_assertion(email_account, "_Test Email Account 1 <admin@example.com>")
email_account.always_use_account_name_as_sender_name = 0
email_account.always_use_account_email_id_as_sender = 1
_patched_assertion(email_account, '"admin@example.com" <test@example.com>')
email_account.always_use_account_name_as_sender_name = 1
_patched_assertion(email_account, "_Test Email Account 1 <test@example.com>")
def test_unsubscribe(self):
from frappe.email.doctype.email_queue.email_queue import QueueBuilder
from frappe.email.queue import unsubscribe
unsubscribe(doctype="User", name="Administrator", email="test@example.com")
self.assertTrue(
frappe.db.get_value(
"Email Unsubscribe",
@ -241,10 +271,6 @@ class TestEmail(FrappeTestCase):
)
)
before = frappe.db.sql("""select count(name) from `tabEmail Queue` where status='Not Sent'""")[
0
][0]
builder = QueueBuilder(
recipients=["test@example.com", "test1@example.com"],
sender="admin@example.com",
@ -254,13 +280,11 @@ class TestEmail(FrappeTestCase):
message="This is mail is queued!",
unsubscribe_message="Unsubscribe",
)
builder.process()
# this is sent async (?)
email_queue = frappe.db.sql(
"""select name from `tabEmail Queue` where status='Not Sent'""", as_dict=1
)
self.assertEqual(len(email_queue), before + 1)
# don't send right now
builder.process()
email_queue = frappe.db.get_value("Email Queue", {"status": "Not Sent"})
queue_recipients = [
r.recipient
for r in frappe.db.sql(
@ -272,6 +296,8 @@ class TestEmail(FrappeTestCase):
self.assertFalse("test@example.com" in queue_recipients)
self.assertTrue("test1@example.com" in queue_recipients)
self.assertEqual(len(queue_recipients), 1)
frappe.get_doc("Email Queue", email_queue).send()
self.assertTrue("Unsubscribe" in frappe.safe_decode(frappe.flags.sent_mail))
def test_image_parsing(self):
@ -322,10 +348,3 @@ class TestVerifiedRequests(FrappeTestCase):
set_request(method="GET", path="?" + signed_url)
self.assertTrue(verify_request())
frappe.local.request = None
if __name__ == "__main__":
import unittest
frappe.connect()
unittest.main()

View file

@ -8,6 +8,7 @@ from random import choice, sample
from unittest.mock import patch
import frappe
from frappe.core.doctype.doctype.test_doctype import new_doctype
from frappe.exceptions import DoesNotExistError, ValidationError
from frappe.model.base_document import get_controller
from frappe.model.rename_doc import (
@ -271,3 +272,29 @@ class TestRenameDoc(FrappeTestCase):
self.assertEqual(doc.name, new_name)
self.available_documents.append(new_name)
self.available_documents.remove(name)
def test_parenttype(self):
child = new_doctype(istable=1).insert()
table_field = {
"label": "Test Table",
"fieldname": "test_table",
"fieldtype": "Table",
"options": child.name,
}
parent_a = new_doctype(fields=[table_field], allow_rename=1, autoname="Prompt").insert()
parent_b = new_doctype(fields=[table_field], allow_rename=1, autoname="Prompt").insert()
parent_a_instance = frappe.get_doc(
doctype=parent_a.name, test_table=[{"some_fieldname": "x"}], name="XYZ"
).insert()
parent_b_instance = frappe.get_doc(
doctype=parent_b.name, test_table=[{"some_fieldname": "x"}], name="XYZ"
).insert()
parent_b_instance.rename("ABC")
parent_a_instance.reload()
self.assertEqual(len(parent_a_instance.test_table), 1)
self.assertEqual(len(parent_b_instance.test_table), 1)