Merge branch 'develop' into fix-file-uploader-error-handling
This commit is contained in:
commit
3eac0dc9f9
12 changed files with 450 additions and 50 deletions
|
|
@ -3,7 +3,9 @@ frappe.ui.form.on("File", {
|
|||
if (frm.doc.file_url) {
|
||||
frm.add_custom_button(__("View File"), () => {
|
||||
if (!frappe.utils.is_url(frm.doc.file_url)) {
|
||||
window.open(window.location.origin + frm.doc.file_url);
|
||||
window.open(
|
||||
window.location.origin + frm.doc.file_url + "?fid=" + frm.doc.name
|
||||
);
|
||||
} else {
|
||||
window.open(frm.doc.file_url);
|
||||
}
|
||||
|
|
@ -90,7 +92,7 @@ frappe.ui.form.on("File", {
|
|||
},
|
||||
|
||||
download: function (frm) {
|
||||
let file_url = frm.doc.file_url;
|
||||
let file_url = frm.doc.file_url + "?fid=" + frm.doc.name;
|
||||
if (frm.doc.file_name) {
|
||||
file_url = file_url.replace(/#/g, "%23");
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@ from pymysql.constants.ER import DUP_ENTRY
|
|||
|
||||
import frappe
|
||||
from frappe import _
|
||||
from frappe.database.schema import DBTable
|
||||
from frappe.database.schema import DbColumn, DBTable
|
||||
from frappe.utils.defaults import get_not_null_defaults
|
||||
|
||||
|
||||
|
|
@ -96,6 +96,37 @@ class MariaDBTable(DBTable):
|
|||
):
|
||||
add_index_query.append("ADD INDEX `modified`(`modified`)")
|
||||
|
||||
# logic to drop unique constraint for fields deleted from a doctype
|
||||
meta_columns = set(self.columns.keys())
|
||||
db_columns = set(self.current_columns.keys())
|
||||
|
||||
for col in db_columns:
|
||||
if (
|
||||
col not in meta_columns
|
||||
and col not in frappe.db.DEFAULT_COLUMNS
|
||||
and col not in frappe.db.OPTIONAL_COLUMNS
|
||||
):
|
||||
has_unique = frappe.db.get_column_index(self.table_name, col, unique=True)
|
||||
|
||||
if not has_unique:
|
||||
continue
|
||||
|
||||
current_col = self.current_columns.get(col)
|
||||
|
||||
deleted_col = DbColumn(
|
||||
table=self,
|
||||
fieldname=current_col.name,
|
||||
fieldtype=current_col.type,
|
||||
length=None,
|
||||
default=None,
|
||||
set_index=current_col.index,
|
||||
options=None,
|
||||
unique=False,
|
||||
precision=None,
|
||||
not_nullable=current_col.not_nullable,
|
||||
)
|
||||
self.drop_unique.append(deleted_col)
|
||||
|
||||
drop_index_query = []
|
||||
|
||||
for col in {*self.drop_index, *self.drop_unique}:
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
import frappe
|
||||
from frappe import _
|
||||
from frappe.database.schema import DBTable, get_definition
|
||||
from frappe.database.schema import DbColumn, DBTable, get_definition
|
||||
from frappe.utils import cint, flt
|
||||
from frappe.utils.defaults import get_not_null_defaults
|
||||
|
||||
|
|
@ -131,6 +131,50 @@ class PostgresTable(DBTable):
|
|||
index_name=col.fieldname, table_name=self.table_name, field=col.fieldname
|
||||
)
|
||||
|
||||
# logic to drop unique constraint for fields deleted from a doctype
|
||||
meta_columns = set(self.columns.keys())
|
||||
db_columns = set(self.current_columns.keys())
|
||||
|
||||
for col in db_columns:
|
||||
if (
|
||||
col not in meta_columns
|
||||
and col not in frappe.db.DEFAULT_COLUMNS
|
||||
and col not in frappe.db.OPTIONAL_COLUMNS
|
||||
):
|
||||
has_unique_index = frappe.db.sql(
|
||||
"""
|
||||
SELECT 1
|
||||
FROM pg_indexes
|
||||
WHERE tablename = %s
|
||||
AND indexname IN (%s, %s)
|
||||
LIMIT 1
|
||||
""",
|
||||
(
|
||||
self.table_name,
|
||||
f"{self.table_name}_{col}_key",
|
||||
f"unique_{col}",
|
||||
),
|
||||
)
|
||||
|
||||
if not has_unique_index:
|
||||
continue
|
||||
|
||||
current_col = self.current_columns.get(col)
|
||||
|
||||
deleted_col = DbColumn(
|
||||
table=self,
|
||||
fieldname=current_col.name,
|
||||
fieldtype=current_col.type,
|
||||
length=None,
|
||||
default=None,
|
||||
set_index=current_col.index,
|
||||
options=None,
|
||||
unique=False,
|
||||
precision=None,
|
||||
not_nullable=current_col.not_nullable,
|
||||
)
|
||||
self.drop_unique.append(deleted_col)
|
||||
|
||||
drop_contraint_query = ""
|
||||
for col in self.drop_index:
|
||||
# primary key
|
||||
|
|
@ -141,8 +185,35 @@ class PostgresTable(DBTable):
|
|||
for col in self.drop_unique:
|
||||
# primary key
|
||||
if col.fieldname != "name":
|
||||
# if index key exists
|
||||
drop_contraint_query += f'DROP INDEX IF EXISTS "unique_{col.fieldname}" ;'
|
||||
# drop unique constraint first if exists which automatically drops the underlying index also
|
||||
unique_constraint_exists = frappe.db.sql(
|
||||
"""
|
||||
SELECT 1
|
||||
FROM pg_constraint
|
||||
WHERE conname = %s
|
||||
""",
|
||||
(f"{self.table_name}_{col.fieldname}_key",),
|
||||
)
|
||||
|
||||
if unique_constraint_exists:
|
||||
drop_contraint_query += f'ALTER TABLE "{self.table_name}" DROP CONSTRAINT IF EXISTS "{self.table_name}_{col.fieldname}_key" ;'
|
||||
|
||||
# drop the unique index backed by no constraint directly
|
||||
unique_index_exists = frappe.db.sql(
|
||||
"""
|
||||
SELECT 1
|
||||
FROM pg_indexes
|
||||
WHERE tablename = %s
|
||||
AND indexname = %s
|
||||
""",
|
||||
(
|
||||
self.table_name,
|
||||
f"unique_{col.fieldname}",
|
||||
),
|
||||
)
|
||||
|
||||
if unique_index_exists:
|
||||
drop_contraint_query += f'DROP INDEX IF EXISTS "unique_{col.fieldname}" ;'
|
||||
|
||||
change_nullability = []
|
||||
for col in self.change_nullability:
|
||||
|
|
|
|||
|
|
@ -812,7 +812,7 @@ class Engine:
|
|||
if parsed := self._parse_backtick_field_notation(field):
|
||||
table_name, field_name = parsed
|
||||
|
||||
self._check_field_permission(table_name, field_name)
|
||||
self.check_filter_field_permission(table_name, field_name)
|
||||
|
||||
# Return query builder field reference
|
||||
return frappe.qb.DocType(table_name)[field_name]
|
||||
|
|
@ -835,7 +835,7 @@ class Engine:
|
|||
parent_doctype_for_perm = (
|
||||
dynamic_field.parent_doctype if isinstance(dynamic_field, ChildTableField) else None
|
||||
)
|
||||
self._check_field_permission(target_doctype, target_fieldname, parent_doctype_for_perm)
|
||||
self.check_filter_field_permission(target_doctype, target_fieldname, parent_doctype_for_perm)
|
||||
|
||||
self.query = dynamic_field.apply_join(self.query, engine=self)
|
||||
# Return the pypika Field object associated with the dynamic field
|
||||
|
|
@ -879,7 +879,9 @@ class Engine:
|
|||
|
||||
# If it's not a child table, check permissions
|
||||
if not parent_fieldname:
|
||||
self._check_field_permission(target_doctype, target_fieldname, parent_doctype_for_perm)
|
||||
self.check_filter_field_permission(
|
||||
target_doctype, target_fieldname, parent_doctype_for_perm
|
||||
)
|
||||
return frappe.qb.DocType(target_doctype)[target_fieldname]
|
||||
|
||||
# Create a ChildTableField instance to handle join and field access
|
||||
|
|
@ -893,7 +895,7 @@ class Engine:
|
|||
|
||||
# For permission check, the parent is the main doctype
|
||||
parent_doctype_for_perm = self.doctype
|
||||
self._check_field_permission(target_doctype, target_fieldname, parent_doctype_for_perm)
|
||||
self.check_filter_field_permission(target_doctype, target_fieldname, parent_doctype_for_perm)
|
||||
|
||||
# Delegate join logic
|
||||
self.query = child_field_handler.apply_join(self.query, engine=self)
|
||||
|
|
@ -933,18 +935,32 @@ class Engine:
|
|||
parent_fieldname=df.fieldname,
|
||||
)
|
||||
parent_doctype_for_perm = self.doctype
|
||||
self._check_field_permission(
|
||||
self.check_filter_field_permission(
|
||||
df.options, target_fieldname, parent_doctype_for_perm
|
||||
)
|
||||
self.query = child_field_handler.apply_join(self.query, engine=self)
|
||||
return child_field_handler.field
|
||||
|
||||
self._check_field_permission(target_doctype, target_fieldname, parent_doctype_for_perm)
|
||||
self.check_filter_field_permission(target_doctype, target_fieldname, parent_doctype_for_perm)
|
||||
# Convert string field name to pypika Field object for the specified/current doctype
|
||||
return frappe.qb.DocType(target_doctype)[target_fieldname]
|
||||
|
||||
def _check_field_permission(self, doctype: str, fieldname: str, parent_doctype: str | None = None):
|
||||
"""Check if the user has permission to access the given field"""
|
||||
def check_select_field_permission(self, doctype: str, fieldname: str, parent_doctype: str | None = None):
|
||||
"""Check if the user has permission to select the given field."""
|
||||
self._check_field_permission(doctype, fieldname, parent_doctype, for_filtering=False)
|
||||
|
||||
def check_filter_field_permission(self, doctype: str, fieldname: str, parent_doctype: str | None = None):
|
||||
"""Check if the user has permission to filter/order/group by the given field.
|
||||
|
||||
It allows all permlevel 0 fields for users with select permission,
|
||||
and all permitted fields for users with read permission.
|
||||
"""
|
||||
self._check_field_permission(doctype, fieldname, parent_doctype, for_filtering=True)
|
||||
|
||||
def _check_field_permission(
|
||||
self, doctype: str, fieldname: str, parent_doctype: str | None = None, for_filtering: bool = False
|
||||
):
|
||||
"""Check if the user has permission to access the given field."""
|
||||
if not self.apply_permissions:
|
||||
return
|
||||
|
||||
|
|
@ -966,7 +982,10 @@ class Engine:
|
|||
frappe.PermissionError,
|
||||
)
|
||||
|
||||
permitted_fields = self._get_cached_permitted_fields(doctype, parent_doctype, permission_type)
|
||||
permission_source = (
|
||||
self._get_filterable_fields if for_filtering else self._get_cached_permitted_fields
|
||||
)
|
||||
permitted_fields = permission_source(doctype, parent_doctype, permission_type)
|
||||
|
||||
if fieldname not in permitted_fields:
|
||||
frappe.throw(
|
||||
|
|
@ -992,6 +1011,42 @@ class Engine:
|
|||
)
|
||||
return self.permitted_fields_cache[cache_key]
|
||||
|
||||
def _get_filterable_fields(
|
||||
self, doctype: str, parenttype: str | None = None, permission_type: str | None = None
|
||||
) -> set:
|
||||
"""Get fields that can be used in filters/order by/group by.
|
||||
|
||||
For users with only select permission on parent doctypes, this returns
|
||||
all permlevel 0 fields (not just search fields which are used for selected fields).
|
||||
For users with read permission, returns standard permitted fields.
|
||||
"""
|
||||
if permission_type is None:
|
||||
permission_type = self.get_permission_type(doctype, parenttype)
|
||||
|
||||
if permission_type == "select":
|
||||
meta = frappe.get_meta(doctype)
|
||||
|
||||
# Only allow filtering by all permlevel 0 fields for parent doctypes.
|
||||
if meta.istable:
|
||||
return set()
|
||||
|
||||
# for select permission on parent doctype, allow all permlevel 0 fields in filters
|
||||
cache_key = (doctype, None, "_filterable_select")
|
||||
if cache_key not in self.permitted_fields_cache:
|
||||
if doctype in CORE_DOCTYPES:
|
||||
# core doctypes have no restrictions - return all valid columns
|
||||
self.permitted_fields_cache[cache_key] = set(meta.get_valid_columns())
|
||||
else:
|
||||
permlevel_0_fields = set(meta.default_fields) | OPTIONAL_FIELDS
|
||||
for df in meta.get_fieldnames_with_value(with_field_meta=True, with_virtual_fields=False):
|
||||
if df.permlevel == 0:
|
||||
permlevel_0_fields.add(df.fieldname)
|
||||
self.permitted_fields_cache[cache_key] = permlevel_0_fields
|
||||
return self.permitted_fields_cache[cache_key]
|
||||
else:
|
||||
# for read permission, use standard permitted fields
|
||||
return self._get_cached_permitted_fields(doctype, parenttype, permission_type)
|
||||
|
||||
def parse_string_field(self, field: str):
|
||||
"""
|
||||
Parses a field string into a pypika Field object.
|
||||
|
|
@ -1209,7 +1264,7 @@ class Engine:
|
|||
if "`" in field_name:
|
||||
if parsed := self._parse_backtick_field_notation(field_name):
|
||||
table_name, field_name = parsed
|
||||
self._check_field_permission(table_name, field_name)
|
||||
self.check_filter_field_permission(table_name, field_name)
|
||||
return frappe.qb.DocType(table_name)[field_name]
|
||||
|
||||
# If parsing failed, fall through to error handling below
|
||||
|
|
@ -1223,14 +1278,14 @@ class Engine:
|
|||
if dynamic_field:
|
||||
# Check permissions for dynamic field
|
||||
if isinstance(dynamic_field, ChildTableField):
|
||||
self._check_field_permission(
|
||||
self.check_filter_field_permission(
|
||||
dynamic_field.doctype, dynamic_field.fieldname, dynamic_field.parent_doctype
|
||||
)
|
||||
elif isinstance(dynamic_field, LinkTableField):
|
||||
# Check permission for the link field in parent doctype
|
||||
self._check_field_permission(self.doctype, dynamic_field.link_fieldname)
|
||||
self.check_filter_field_permission(self.doctype, dynamic_field.link_fieldname)
|
||||
# Check permission for the target field in linked doctype
|
||||
self._check_field_permission(dynamic_field.doctype, dynamic_field.fieldname)
|
||||
self.check_filter_field_permission(dynamic_field.doctype, dynamic_field.fieldname)
|
||||
|
||||
# Apply join for the dynamic field
|
||||
self.query = dynamic_field.apply_join(self.query, engine=self)
|
||||
|
|
@ -1246,7 +1301,7 @@ class Engine:
|
|||
)
|
||||
|
||||
# Check permissions for simple field
|
||||
self._check_field_permission(self.doctype, field_name)
|
||||
self.check_filter_field_permission(self.doctype, field_name)
|
||||
|
||||
# Create Field object for simple field
|
||||
return self.table[field_name]
|
||||
|
|
@ -2307,7 +2362,7 @@ class SQLFunctionParser:
|
|||
elif "`" in arg:
|
||||
if parsed := self.engine._parse_backtick_field_notation(arg):
|
||||
table_name, field_name = parsed
|
||||
self.engine._check_field_permission(table_name, field_name)
|
||||
self.engine.check_select_field_permission(table_name, field_name)
|
||||
return Table(f"tab{table_name}")[field_name]
|
||||
else:
|
||||
frappe.throw(
|
||||
|
|
@ -2356,4 +2411,4 @@ class SQLFunctionParser:
|
|||
|
||||
def _check_function_field_permission(self, field_name: str):
|
||||
if self.engine.apply_permissions and self.engine.doctype:
|
||||
self.engine._check_field_permission(self.engine.doctype, field_name)
|
||||
self.engine.check_select_field_permission(self.engine.doctype, field_name)
|
||||
|
|
|
|||
|
|
@ -76,7 +76,7 @@
|
|||
},
|
||||
{
|
||||
"depends_on": "eval:doc.channel=='Slack'",
|
||||
"description": "To use Slack Channel, add a <a href=\"#List/Slack%20Webhook%20URL/List\">Slack Webhook URL</a>.",
|
||||
"description": "To use Slack Channel, add a <a href=\"#List/Slack%20Webhook%20URL/List\" rel=\"noopener noreferrer\">Slack Webhook URL</a>.",
|
||||
"fieldname": "slack_webhook_url",
|
||||
"fieldtype": "Link",
|
||||
"label": "Slack Channel",
|
||||
|
|
@ -135,7 +135,7 @@
|
|||
"fieldtype": "Select",
|
||||
"in_list_view": 1,
|
||||
"label": "Send Alert On",
|
||||
"options": "\nNew\nSave\nSubmit\nCancel\nDays After\nDays Before\nMinutes After\nMinutes Before\nValue Change\nMethod\nCustom",
|
||||
"options": "\nNew\nSave\nSubmit\nCancel\nDays After\nDays Before\nMinutes After\nMinutes Before\nValue Change\nMethod",
|
||||
"reqd": 1,
|
||||
"search_index": 1
|
||||
},
|
||||
|
|
@ -363,7 +363,7 @@
|
|||
"icon": "fa fa-envelope",
|
||||
"index_web_pages_for_search": 1,
|
||||
"links": [],
|
||||
"modified": "2025-10-21 23:14:52.345857",
|
||||
"modified": "2026-02-19 18:07:15.888314",
|
||||
"modified_by": "Administrator",
|
||||
"module": "Email",
|
||||
"name": "Notification",
|
||||
|
|
|
|||
|
|
@ -57,7 +57,6 @@ class Notification(Document):
|
|||
"Minutes Before",
|
||||
"Value Change",
|
||||
"Method",
|
||||
"Custom",
|
||||
]
|
||||
filters: DF.Code | None
|
||||
from_attach_field: DF.Literal[None]
|
||||
|
|
|
|||
|
|
@ -277,7 +277,12 @@ frappe.ui.form.ControlInput = class ControlInput extends frappe.ui.form.Control
|
|||
// set has-error if dialog primary button is clicked
|
||||
if (this.layout && this.layout.is_dialog && !this.layout.primary_action_fulfilled) return;
|
||||
|
||||
this.$wrapper.toggleClass("has-error", Boolean(this.df.reqd && is_null(value)));
|
||||
const is_invalid = this.$wrapper.hasClass("has-error-invalid");
|
||||
this.$wrapper.toggleClass("has-error-mandatory", Boolean(this.df.reqd && is_null(value)));
|
||||
this.$wrapper.toggleClass(
|
||||
"has-error",
|
||||
is_invalid || Boolean(this.df.reqd && is_null(value))
|
||||
);
|
||||
}
|
||||
set_invalid() {
|
||||
let invalid = !!this.df.invalid;
|
||||
|
|
@ -286,7 +291,9 @@ frappe.ui.form.ControlInput = class ControlInput extends frappe.ui.form.Control
|
|||
this.$input?.toggleClass("invalid", invalid);
|
||||
this.grid_row.columns[this.df.fieldname].is_invalid = invalid;
|
||||
} else {
|
||||
this.$wrapper.toggleClass("has-error", invalid);
|
||||
const is_mandatory_and_empty = this.$wrapper.hasClass("has-error-mandatory");
|
||||
this.$wrapper.toggleClass("has-error-invalid", invalid);
|
||||
this.$wrapper.toggleClass("has-error", is_mandatory_and_empty || invalid);
|
||||
}
|
||||
}
|
||||
set_required() {
|
||||
|
|
|
|||
|
|
@ -334,6 +334,7 @@ frappe.views.ReportView = class ReportView extends frappe.views.ListView {
|
|||
translations: frappe.utils.datatable.get_translations(),
|
||||
checkboxColumn: true,
|
||||
inlineFilters: true,
|
||||
noDataMessage: __("No matching entries in the current results"),
|
||||
cellHeight: 35,
|
||||
direction: frappe.utils.is_rtl() ? "rtl" : "ltr",
|
||||
events: {
|
||||
|
|
@ -424,6 +425,43 @@ frappe.views.ReportView = class ReportView extends frappe.views.ListView {
|
|||
},
|
||||
],
|
||||
});
|
||||
|
||||
this.setup_inline_filter_observer();
|
||||
}
|
||||
|
||||
setup_inline_filter_observer() {
|
||||
this.$datatable_wrapper.on(
|
||||
"keyup",
|
||||
".dt-filter",
|
||||
frappe.utils.debounce(() => {
|
||||
this.update_count_for_inline_filter();
|
||||
}, 350)
|
||||
);
|
||||
}
|
||||
|
||||
update_count_for_inline_filter() {
|
||||
if (!this.datatable) return;
|
||||
|
||||
const has_active_filters = this.datatable.columnmanager
|
||||
? Object.keys(this.datatable.columnmanager.getAppliedFilters()).length > 0
|
||||
: false;
|
||||
|
||||
const $count = this.get_count_element();
|
||||
|
||||
if (has_active_filters) {
|
||||
const filtered_count = this.datatable.datamanager.getFilteredRowIndices().length;
|
||||
const current_page_count = this.data.length;
|
||||
|
||||
$count.html(
|
||||
`<span>${__("{0} of {1} records match (filtered on visible rows only)", [
|
||||
filtered_count,
|
||||
current_page_count,
|
||||
])}</span>`
|
||||
);
|
||||
} else {
|
||||
// Restore the normal count
|
||||
this.render_count();
|
||||
}
|
||||
}
|
||||
|
||||
toggle_charts() {
|
||||
|
|
|
|||
|
|
@ -14,6 +14,7 @@
|
|||
--dt-border-color: var(--table-border-color);
|
||||
--dt-header-cell-bg: var(--subtle-fg);
|
||||
--dt-selection-highlight-color: var(--highlight-color);
|
||||
--dt-no-data-message-width: max-content;
|
||||
|
||||
background-color: var(--bg-color);
|
||||
margin-left: -1px;
|
||||
|
|
@ -163,6 +164,11 @@
|
|||
.dt-cell--focus .dt-cell__content {
|
||||
border-color: var(--gray-200);
|
||||
}
|
||||
|
||||
.dt-scrollable__no-data .no-data-message {
|
||||
left: 50%;
|
||||
transform: translateX(-50%);
|
||||
}
|
||||
}
|
||||
|
||||
table td.dt-cell {
|
||||
|
|
|
|||
|
|
@ -991,6 +991,52 @@ class TestQuery(IntegrationTestCase):
|
|||
test_user.remove_roles(test_role)
|
||||
frappe.delete_doc("Role", test_role, force=True)
|
||||
|
||||
def test_filter_with_select_permission_allows_permlevel_0_fields(self):
|
||||
"""Test that users with only select permission can filter by all permlevel 0 fields."""
|
||||
|
||||
test_role = "SelectFilterTestRole"
|
||||
test_user_email = "test2@example.com"
|
||||
test_note_title = "Select Filter Test Note"
|
||||
|
||||
# Cleanup previous runs
|
||||
frappe.set_user("Administrator")
|
||||
test_user = frappe.get_doc("User", test_user_email)
|
||||
test_user.remove_roles(test_role)
|
||||
frappe.delete_doc("Role", test_role, ignore_missing=True, force=True)
|
||||
frappe.delete_doc("Note", {"title": test_note_title}, ignore_missing=True, force=True)
|
||||
|
||||
# Setup Role with only 'select' on Note (no read)
|
||||
frappe.get_doc({"doctype": "Role", "role_name": test_role}).insert(ignore_if_duplicate=True)
|
||||
add_permission("Note", test_role, 0, ptype="select")
|
||||
update_permission_property("Note", test_role, 0, "read", 0, validate=False)
|
||||
test_user.add_roles(test_role)
|
||||
|
||||
# Create a test note with specific content
|
||||
note = frappe.get_doc(
|
||||
doctype="Note", title=test_note_title, content="Specific Content", public=1
|
||||
).insert(ignore_permissions=True)
|
||||
|
||||
# Register cleanups in reverse order (LIFO) - Administrator restore must happen first
|
||||
def cleanup():
|
||||
frappe.set_user("Administrator")
|
||||
frappe.delete_doc("Note", note.name, ignore_missing=True, force=True)
|
||||
test_user.remove_roles(test_role)
|
||||
frappe.delete_doc("Role", test_role, ignore_missing=True, force=True)
|
||||
|
||||
self.addCleanup(cleanup)
|
||||
|
||||
frappe.set_user(test_user_email)
|
||||
|
||||
# 'content' is a permlevel 0 field but NOT a search field
|
||||
result = frappe.qb.get_query(
|
||||
"Note",
|
||||
filters={"content": "Specific Content"},
|
||||
fields=["name"], # Only select 'name' which is allowed
|
||||
ignore_permissions=False,
|
||||
).run(as_dict=True)
|
||||
self.assertEqual(len(result), 1, "Should find the note when filtering by permlevel 0 field")
|
||||
self.assertEqual(result[0]["name"], note.name)
|
||||
|
||||
def test_nested_permission(self):
|
||||
"""Test permission on nested doctypes"""
|
||||
frappe.set_user("Administrator")
|
||||
|
|
@ -2308,6 +2354,148 @@ class TestQuery(IntegrationTestCase):
|
|||
self.assertEqual(engine._get_ifnull_fallback("Patch Log", "skipped"), "0")
|
||||
self.assertEqual(engine._get_ifnull_fallback("Patch Log", "patch"), "''")
|
||||
|
||||
@run_only_if(db_type_is.MARIADB)
|
||||
def test_drop_unique_constraint_for_deleted_fields_mariadb(self):
|
||||
trial_dt = new_doctype(
|
||||
"Trial Doctype",
|
||||
fields=[
|
||||
{
|
||||
"fieldname": "field_one",
|
||||
"fieldtype": "Data",
|
||||
"label": "Field One",
|
||||
},
|
||||
{
|
||||
"fieldname": "field_two",
|
||||
"fieldtype": "Data",
|
||||
"label": "Field Two",
|
||||
"unique": 1,
|
||||
},
|
||||
],
|
||||
)
|
||||
|
||||
trial_dt.insert(ignore_if_duplicate=True)
|
||||
|
||||
indexes = frappe.db.get_column_index("tabTrial Doctype", "field_two", unique=True)
|
||||
self.assertTrue(indexes)
|
||||
|
||||
field_to_remove = None
|
||||
|
||||
for field in trial_dt.fields:
|
||||
if field.fieldname == "field_two":
|
||||
field_to_remove = field
|
||||
break
|
||||
|
||||
trial_dt.fields.remove(field_to_remove)
|
||||
trial_dt.save()
|
||||
|
||||
indexes = frappe.db.get_column_index("tabTrial Doctype", "field_two", unique=True)
|
||||
self.assertFalse(indexes)
|
||||
|
||||
@run_only_if(db_type_is.POSTGRES)
|
||||
def test_drop_unique_constraint_and_indexes_for_deleted_fields_postgres(self):
|
||||
# test for unique index backed by constraint at field creation time
|
||||
trial_dt = new_doctype(
|
||||
"Trial Doctype",
|
||||
fields=[
|
||||
{
|
||||
"fieldname": "field_one",
|
||||
"fieldtype": "Data",
|
||||
"label": "Field One",
|
||||
},
|
||||
{
|
||||
"fieldname": "field_two",
|
||||
"fieldtype": "Data",
|
||||
"label": "Field Two",
|
||||
"unique": 1,
|
||||
},
|
||||
],
|
||||
)
|
||||
|
||||
trial_dt.insert(ignore_if_duplicate=True)
|
||||
|
||||
index_exists = frappe.db.sql(
|
||||
"""
|
||||
SELECT 1
|
||||
FROM pg_indexes
|
||||
WHERE tablename = %s
|
||||
AND indexname = %s
|
||||
""",
|
||||
(
|
||||
f"tab{trial_dt.name}",
|
||||
f"tab{trial_dt.name}_field_two_key",
|
||||
),
|
||||
)
|
||||
self.assertTrue(index_exists)
|
||||
|
||||
field_to_remove = None
|
||||
|
||||
for field in trial_dt.fields:
|
||||
if field.fieldname == "field_two":
|
||||
field_to_remove = field
|
||||
break
|
||||
|
||||
trial_dt.fields.remove(field_to_remove)
|
||||
trial_dt.save()
|
||||
|
||||
index_exists = frappe.db.sql(
|
||||
"""
|
||||
SELECT 1
|
||||
FROM pg_indexes
|
||||
WHERE tablename = %s
|
||||
AND indexname = %s
|
||||
""",
|
||||
(
|
||||
f"tab{trial_dt.name}",
|
||||
f"tab{trial_dt.name}_field_two_key",
|
||||
),
|
||||
)
|
||||
self.assertFalse(index_exists)
|
||||
|
||||
# test for unique index backed by no constraint created at field alteration post creation
|
||||
for field in trial_dt.fields:
|
||||
if field.fieldname == "field_one":
|
||||
field.unique = 1
|
||||
|
||||
trial_dt.save()
|
||||
|
||||
index_exists = frappe.db.sql(
|
||||
"""
|
||||
SELECT 1
|
||||
FROM pg_indexes
|
||||
WHERE tablename = %s
|
||||
AND indexname = %s
|
||||
""",
|
||||
(
|
||||
f"tab{trial_dt.name}",
|
||||
"unique_field_one",
|
||||
),
|
||||
)
|
||||
self.assertTrue(index_exists)
|
||||
|
||||
field_to_remove = None
|
||||
|
||||
for field in trial_dt.fields:
|
||||
if field.fieldname == "field_one":
|
||||
field_to_remove = field
|
||||
break
|
||||
|
||||
trial_dt.fields.remove(field_to_remove)
|
||||
trial_dt.save()
|
||||
|
||||
index_exists = frappe.db.sql(
|
||||
"""
|
||||
SELECT 1
|
||||
FROM pg_indexes
|
||||
WHERE tablename = %s
|
||||
AND indexname = %s
|
||||
""",
|
||||
(
|
||||
f"tab{trial_dt.name}",
|
||||
"unique_field_one",
|
||||
),
|
||||
)
|
||||
self.assertFalse(index_exists)
|
||||
|
||||
|
||||
# This function is used as a permission query condition hook
|
||||
def test_permission_hook_condition(user):
|
||||
|
|
|
|||
|
|
@ -1,4 +1,5 @@
|
|||
import os
|
||||
from typing import Any
|
||||
|
||||
import frappe
|
||||
from frappe import _
|
||||
|
|
@ -11,7 +12,7 @@ UI_TEST_USER = "frappe@example.com"
|
|||
|
||||
|
||||
@whitelist_for_tests()
|
||||
def create_if_not_exists(doc):
|
||||
def create_if_not_exists(doc: Any):
|
||||
"""Create records if they dont exist.
|
||||
Will check for uniqueness by checking if a record exists with these field value pairs
|
||||
|
||||
|
|
@ -148,7 +149,7 @@ def create_contact_phone_nos_records():
|
|||
|
||||
|
||||
@whitelist_for_tests()
|
||||
def create_doctype(name, fields):
|
||||
def create_doctype(name: str | int, fields: str | list | dict):
|
||||
fields = frappe.parse_json(fields)
|
||||
if frappe.db.exists("DocType", name):
|
||||
return
|
||||
|
|
@ -166,7 +167,7 @@ def create_doctype(name, fields):
|
|||
|
||||
|
||||
@whitelist_for_tests()
|
||||
def create_child_doctype(name, fields):
|
||||
def create_child_doctype(name: str | int, fields: str | list | dict):
|
||||
fields = frappe.parse_json(fields)
|
||||
if frappe.db.exists("DocType", name):
|
||||
return
|
||||
|
|
@ -328,7 +329,7 @@ def update_webform_to_multistep():
|
|||
|
||||
|
||||
@whitelist_for_tests()
|
||||
def update_child_table(name):
|
||||
def update_child_table(name: str | int):
|
||||
doc = frappe.get_doc("DocType", name)
|
||||
if len(doc.fields) == 1:
|
||||
doc.append(
|
||||
|
|
@ -346,7 +347,7 @@ def update_child_table(name):
|
|||
|
||||
|
||||
@whitelist_for_tests()
|
||||
def insert_doctype_with_child_table_record(name):
|
||||
def insert_doctype_with_child_table_record(name: str | int):
|
||||
if frappe.get_all(name, {"title": "Test Grid Search"}):
|
||||
return
|
||||
|
||||
|
|
@ -425,7 +426,7 @@ def insert_translations():
|
|||
|
||||
|
||||
@whitelist_for_tests()
|
||||
def create_test_user(username=None):
|
||||
def create_test_user(username: str | None = None):
|
||||
name = username or UI_TEST_USER
|
||||
|
||||
if frappe.db.exists("User", name):
|
||||
|
|
@ -506,7 +507,7 @@ def setup_inbox():
|
|||
|
||||
|
||||
@whitelist_for_tests()
|
||||
def setup_default_view(view, force_reroute=None):
|
||||
def setup_default_view(view: Any, force_reroute: int | bool | None = None):
|
||||
frappe.delete_doc_if_exists("Property Setter", "Event-main-default_view")
|
||||
frappe.delete_doc_if_exists("Property Setter", "Event-main-force_re_route_to_default_view")
|
||||
|
||||
|
|
@ -579,12 +580,12 @@ def create_kanban():
|
|||
|
||||
|
||||
@whitelist_for_tests()
|
||||
def create_todo(description):
|
||||
def create_todo(description: str):
|
||||
return frappe.get_doc({"doctype": "ToDo", "description": description}).insert()
|
||||
|
||||
|
||||
@whitelist_for_tests()
|
||||
def create_todo_with_attachment_limit(description):
|
||||
def create_todo_with_attachment_limit(description: str):
|
||||
from frappe.custom.doctype.property_setter.property_setter import make_property_setter
|
||||
|
||||
make_property_setter("ToDo", None, "max_attachments", 12, "int", for_doctype=True)
|
||||
|
|
@ -622,7 +623,7 @@ def create_admin_kanban():
|
|||
|
||||
|
||||
@whitelist_for_tests()
|
||||
def add_remove_role(action, user, role):
|
||||
def add_remove_role(action: str, user: str, role: str):
|
||||
user_doc = frappe.get_doc("User", user)
|
||||
if action == "remove":
|
||||
user_doc.remove_roles(role)
|
||||
|
|
@ -632,13 +633,13 @@ def add_remove_role(action, user, role):
|
|||
|
||||
@whitelist_for_tests()
|
||||
def publish_realtime(
|
||||
event=None,
|
||||
message=None,
|
||||
room=None,
|
||||
user=None,
|
||||
doctype=None,
|
||||
docname=None,
|
||||
task_id=None,
|
||||
event: str | None = None,
|
||||
message: str | dict | None = None,
|
||||
room: str | None = None,
|
||||
user: str | None = None,
|
||||
doctype: str | None = None,
|
||||
docname: str | None = None,
|
||||
task_id: str | None = None,
|
||||
):
|
||||
frappe.publish_realtime(
|
||||
event=event,
|
||||
|
|
@ -652,7 +653,9 @@ def publish_realtime(
|
|||
|
||||
|
||||
@whitelist_for_tests()
|
||||
def publish_progress(duration=3, title=None, doctype=None, docname=None):
|
||||
def publish_progress(
|
||||
duration: int = 3, title: str | None = None, doctype: str | None = None, docname: str | None = None
|
||||
):
|
||||
# This should consider session user and only show it to current user.
|
||||
frappe.enqueue(slow_task, duration=duration, title=title, doctype=doctype, docname=docname)
|
||||
|
||||
|
|
|
|||
|
|
@ -295,15 +295,15 @@ def download_private_file(path: str) -> Response:
|
|||
raise Forbidden(_("You don't have permission to access this file"))
|
||||
|
||||
make_access_log(doctype="File", document=file.name, file_type=os.path.splitext(path)[-1][1:])
|
||||
return send_private_file(path.split("/private", 1)[1])
|
||||
return send_private_file(path.split("/private", 1)[1], filename=file.file_name)
|
||||
|
||||
|
||||
FORCE_DOWNLOAD_EXTENSIONS = (".svg", ".html", ".htm", ".xml")
|
||||
|
||||
|
||||
def send_private_file(path: str) -> Response:
|
||||
def send_private_file(path: str, filename: str | None = None) -> Response:
|
||||
path = os.path.join(frappe.local.conf.get("private_path", "private"), path.strip("/"))
|
||||
filename = os.path.basename(path)
|
||||
filename = filename or os.path.basename(path)
|
||||
|
||||
extension = os.path.splitext(path)[1]
|
||||
as_attachment = extension.lower() in FORCE_DOWNLOAD_EXTENSIONS
|
||||
|
|
@ -329,7 +329,7 @@ def send_private_file(path: str) -> Response:
|
|||
environ=frappe.local.request.environ,
|
||||
conditional=True,
|
||||
as_attachment=as_attachment,
|
||||
download_name=filename if as_attachment else None,
|
||||
download_name=filename,
|
||||
)
|
||||
|
||||
return response
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue