fix!: disallow setting link values not matching filters

This commit is contained in:
Sagar Vora 2025-12-03 10:35:39 +05:30
parent 7a5c870059
commit 40c7d27e4f
5 changed files with 215 additions and 157 deletions

View file

@ -79,7 +79,7 @@ context("Control Link", () => {
it("should unset invalid value", () => {
get_dialog_with_link().as("dialog");
cy.intercept("/api/method/frappe.client.validate_link*").as("validate_link");
cy.intercept("/api/method/frappe.client.validate_link_and_fetch*").as("validate_link");
cy.get(".frappe-control[data-fieldname=link] input").focus().as("input");
// Wait for dropdown to appear (request might be cached)
cy.get("@input").parent().findByRole("listbox").should("be.visible");
@ -92,7 +92,7 @@ context("Control Link", () => {
it("should be possible set empty value explicitly", () => {
get_dialog_with_link().as("dialog");
cy.intercept("/api/method/frappe.client.validate_link*").as("validate_link");
cy.intercept("/api/method/frappe.client.validate_link_and_fetch*").as("validate_link");
cy.get(".frappe-control[data-fieldname=link] input").focus().as("input");
// Wait for dropdown to appear (request might be cached)
@ -179,7 +179,7 @@ context("Control Link", () => {
it("should update dependant fields (via fetch_from)", () => {
cy.get("@todos").then((todos) => {
cy.visit(`/desk/todo/${todos[0]}`);
cy.intercept("/api/method/frappe.client.validate_link*").as("validate_link");
cy.intercept("/api/method/frappe.client.validate_link_and_fetch*").as("validate_link");
cy.fill_field("assigned_by", cy.config("testUser"), "Link");
cy.call("frappe.client.get_value", {
@ -203,7 +203,7 @@ context("Control Link", () => {
""
);
cy.window().its("cur_frm.doc.assigned_by").should("eq", null);
cy.window().its("cur_frm.doc.assigned_by").should("eq", undefined);
// set valid value again
cy.get("@input").clear().focus();

View file

@ -9,6 +9,7 @@ import frappe.model
import frappe.utils
from frappe import _
from frappe.desk.reportview import validate_args
from frappe.desk.search import make_filter_tuple, search_widget
from frappe.model.utils import is_virtual_doctype
from frappe.utils import attach_expanded_links, get_safe_filters
from frappe.utils.caching import http_cache
@ -406,52 +407,76 @@ def is_document_amended(doctype: str, docname: str):
return False
@frappe.whitelist()
def validate_link(doctype: str, docname: str, fields=None):
if not isinstance(doctype, str):
frappe.throw(_("DocType must be a string"))
if not isinstance(docname, str):
frappe.throw(_("Document Name must be a string"))
parent_doctype = None
if doctype != "DocType":
if frappe.get_meta(doctype).istable: # needed for links to child rows
parent_doctype = frappe.db.get_value(doctype, docname, "parenttype")
if not (
frappe.has_permission(doctype, "select", parent_doctype=parent_doctype)
or frappe.has_permission(doctype, "read", parent_doctype=parent_doctype)
):
frappe.throw(
_("You do not have Read or Select Permissions for {}").format(frappe.bold(doctype)),
frappe.PermissionError,
)
values = frappe._dict()
@frappe.whitelist(methods=["GET", "POST"])
def validate_link_and_fetch(
doctype: str,
docname: str,
fields_to_fetch: list[str] | str | None = None,
# search_widget parameters
query: str | None = None,
filters: dict | list | str | None = None,
**search_args,
):
if not docname:
frappe.throw(_("Document Name must not be empty"))
if is_virtual_doctype(doctype):
try:
frappe.get_doc(doctype, docname)
values.name = docname
doc = frappe.get_doc(doctype, docname)
doc.check_permission("select" if frappe.only_has_select_perm(doctype) else "read")
return {"name": doc.name}
except frappe.DoesNotExistError:
frappe.clear_last_message()
frappe.msgprint(
_("Document {0} {1} does not exist").format(frappe.bold(doctype), frappe.bold(docname)),
return {}
fields_to_fetch = frappe.parse_json(fields_to_fetch)
# only cache is no fields to fetch and no filters specified by user
can_cache = not (fields_to_fetch or filters)
# Use search_widget to validate - ensures filters/custom queries are respected
# in addition to standard permission checks
search_args["txt"] = docname
search_result = frappe.call(
search_widget,
doctype=doctype,
query=query,
filters=filters,
**search_args,
for_link_validation=True,
)
if not search_result:
return {} # does not exist or filtered out
# get value in the right case and type (str | int)
# for matching with search result
columns_to_fetch = ["name"]
if frappe.is_table(doctype):
columns_to_fetch.append("parenttype") # for child table permission check
values = frappe.db.get_value(doctype, docname, columns_to_fetch, as_dict=True)
name_to_compare = values.name
# this will be used to fetch fields later
parent_doctype = values.pop("parenttype", None)
if not name_to_compare:
return {} # does not exist
# for custom queries that don't respect filters
if not any(item[0] == name_to_compare for item in search_result):
return {} # no permission or filtered out
if not fields_to_fetch:
if can_cache:
frappe.local.response_headers.set(
"Cache-Control", "private,max-age=1800,stale-while-revalidate=7200"
)
return values
values.name = frappe.db.get_value(doctype, docname, cache=True)
fields = frappe.parse_json(fields)
if not values.name:
return values
if not fields:
frappe.local.response_headers.set("Cache-Control", "private,max-age=1800,stale-while-revalidate=7200")
return values
try:
values.update(get_value(doctype, fields, docname, parent=parent_doctype))
values.update(get_value(doctype, fields_to_fetch, docname, parent=parent_doctype))
except frappe.PermissionError:
frappe.clear_last_message()
frappe.msgprint(

View file

@ -76,6 +76,7 @@ def search_widget(
ignore_user_permissions: bool = False,
*,
link_fieldname: str | None = None,
for_link_validation: bool = False,
):
if ignore_user_permissions:
if reference_doctype and link_fieldname:
@ -103,6 +104,34 @@ def search_widget(
if not query and doctype in standard_queries:
query = standard_queries[doctype][-1]
if filters is None:
filters = {}
are_filters_dict = isinstance(filters, dict)
include_disabled = False
if not query and are_filters_dict:
if "include_disabled" in filters:
if filters["include_disabled"] == 1:
include_disabled = True
filters.pop("include_disabled")
filters = [make_filter_tuple(doctype, key, value) for key, value in filters.items()]
are_filters_dict = False
if for_link_validation:
if are_filters_dict:
# we add filter if possible, otherwise rely on txt
if "name" not in filters:
filters["name"] = txt
else:
filters.append([doctype, "name", "=", txt])
as_dict = False
# for custom queries that don't respect filters but respect limit (rare)
# or for when we have to rely on txt
# we want to match "A" with "A" only and not "A1", "BA" etc.
page_length = 100_000
if query: # Query = custom search query i.e. python function
try:
is_whitelisted(frappe.get_attr(query))
@ -132,17 +161,6 @@ def search_widget(
return []
meta = frappe.get_meta(doctype)
include_disabled = False
if filters and "include_disabled" in filters:
if filters["include_disabled"] == 1:
include_disabled = True
filters.pop("include_disabled")
if isinstance(filters, dict):
filters = [make_filter_tuple(doctype, key, value) for key, value in filters.items()]
elif filters is None:
filters = []
or_filters = []
# build from doctype
@ -189,7 +207,7 @@ def search_widget(
# `idx` is number of times a document is referred, check link_count.py
order_by = f"idx desc, {order_by_based_on_meta}"
if not meta.translated_doctype:
if not for_link_validation and not meta.translated_doctype:
_txt = frappe.db.escape((txt or "").replace("%", "").replace("@", ""))
# locate returns 0 if string is not found, convert 0 to null and then sort null to end in order by
_relevance_expr = {"DIV": [1, {"NULLIF": [{"LOCATE": [_txt, "name"]}, 0]}]}
@ -214,29 +232,30 @@ def search_widget(
strict=False,
)
if meta.translated_doctype:
# Filtering the values array so that query is included in very element
values = (
result
for result in values
if any(
re.search(f"{re.escape(txt)}.*", _(cstr(value)) or "", re.IGNORECASE)
for value in (result.values() if as_dict else result)
if not for_link_validation:
if meta.translated_doctype:
# Filtering the values array so that query is included in very element
values = (
result
for result in values
if any(
re.search(f"{re.escape(txt)}.*", _(cstr(value)) or "", re.IGNORECASE)
for value in (result.values() if as_dict else result)
)
)
)
# Sorting the values array so that relevant results always come first
# This will first bring elements on top in which query is a prefix of element
# Then it will bring the rest of the elements and sort them in lexicographical order
values = sorted(values, key=lambda x: relevance_sorter(x, txt, as_dict))
# Sorting the values array so that relevant results always come first
# This will first bring elements on top in which query is a prefix of element
# Then it will bring the rest of the elements and sort them in lexicographical order
values = sorted(values, key=lambda x: relevance_sorter(x, txt, as_dict))
# remove _relevance from results
if not meta.translated_doctype:
if as_dict:
for r in values:
r.pop("_relevance", None)
else:
values = [r[:-1] for r in values]
# remove _relevance from results
if not meta.translated_doctype:
if as_dict:
for r in values:
r.pop("_relevance", None)
else:
values = [r[:-1] for r in values]
return values

View file

@ -27,13 +27,8 @@ frappe.ui.form.ControlLink = class ControlLink extends frappe.ui.form.ControlDat
this.$input.on("focus", function () {
if (!me.$input.val()) {
me.$input.val("");
// Create a fake input event
const e = $.Event("input");
e.target = me.$input[0];
// Pass it to on_input directly, bypassing debounce, so the dropdown opens immediately
me.on_input(e);
// trigger dropdown immediately
me.on_input();
}
me.show_link_and_clear_buttons();
@ -154,7 +149,7 @@ frappe.ui.form.ControlLink = class ControlLink extends frappe.ui.form.ControlDat
return null;
}
get_label_value() {
return this.$input ? this.$input.val() : "";
return this.$input?.val() || "";
}
set_input_value(value) {
this.$input && this.$input.val(value);
@ -272,7 +267,8 @@ frappe.ui.form.ControlLink = class ControlLink extends frappe.ui.form.ControlDat
this.custom_awesomplete_filter && this.custom_awesomplete_filter(this.awesomplete);
this.$input.on("input", frappe.utils.debounce(this.on_input.bind(this), 500));
this._debounced_input_handler = frappe.utils.debounce(this.on_input.bind(this), 500);
this.$input.on("input", this._debounced_input_handler);
this.$input.on("blur", function () {
if (me.selected) {
@ -406,22 +402,13 @@ frappe.ui.form.ControlLink = class ControlLink extends frappe.ui.form.ControlDat
return estimated_size > max_get_size;
}
on_input(e) {
var doctype = this.get_options();
get_search_args(txt) {
const doctype = this.get_options();
if (!doctype) return;
if (!this.$input.cache[doctype]) {
this.$input.cache[doctype] = {};
}
var term = e.target.value;
if (this.$input.cache[doctype][term] != null) {
// immediately show from cache
this.awesomplete.list = this.$input.cache[doctype][term];
}
var args = {
txt: term,
doctype: doctype,
const args = {
txt,
doctype,
ignore_user_permissions: this.df.ignore_user_permissions,
reference_doctype: this.get_reference_doctype() || "",
page_length: cint(frappe.boot.sysdefaults?.link_field_results_limit) || 10,
@ -429,6 +416,24 @@ frappe.ui.form.ControlLink = class ControlLink extends frappe.ui.form.ControlDat
};
this.set_custom_query(args);
return args;
}
on_input(e) {
const term = e ? e.target.value : this.$input.val();
const args = this.get_search_args(term);
if (!args) return;
const doctype = args.doctype;
const cache = this.$input.cache;
if (!cache[doctype]) {
cache[doctype] = {};
}
if (cache[doctype][term] != null) {
// immediately show from cache
this.awesomplete.list = cache[doctype][term];
}
const use_get = !this.should_use_post_for_search(term, args.filters);
frappe.call({
@ -496,8 +501,8 @@ frappe.ui.form.ControlLink = class ControlLink extends frappe.ui.form.ControlDat
});
}
}
this.$input.cache[doctype][term] = r.message;
this.awesomplete.list = this.$input.cache[doctype][term];
cache[doctype][term] = r.message;
this.awesomplete.list = cache[doctype][term];
this.toggle_href(doctype);
r.message.forEach((item) => {
frappe.utils.add_link_title(doctype, item.value, item.label);
@ -850,38 +855,31 @@ frappe.ui.form.ControlLink = class ControlLink extends frappe.ui.form.ControlDat
return this.validate_link_and_fetch(value);
}
validate_link_and_fetch(value) {
const options = this.get_options();
if (!options) {
return;
}
const args = this.get_search_args(value);
if (!args.doctype) return;
const columns_to_fetch = Object.values(this.fetch_map);
const nothing_to_fetch = !columns_to_fetch.length;
// if default and no fetch, no need to validate
if (nothing_to_fetch && this.df.__default_value === value) {
return value;
}
if (
nothing_to_fetch &&
value &&
this.awesomplete?._list?.find((item) => item.value === value && !item.action)
) {
// if value is in the suggestion list, must be correct
if (!columns_to_fetch.length && this.df.__default_value === value) {
return value;
}
const update_dependant_fields = (response) => {
let field_value = "";
for (const [target_field, source_field] of Object.entries(this.fetch_map)) {
if (value) {
field_value = response[source_field];
}
if (!columns_to_fetch.length) return;
if (this.layout?.set_value) {
this.layout.set_value(target_field, field_value);
} else if (this.frm) {
const layout_set_value = this.layout?.set_value;
if (!layout_set_value && (!this.frm || !this.docname)) {
return;
}
const has_value = Boolean(response?.name);
for (const [target_field, source_field] of Object.entries(this.fetch_map)) {
const field_value = has_value ? response[source_field] : "";
if (layout_set_value) {
layout_set_value(target_field, field_value);
} else {
frappe.model.set_value(
this.df.parent,
this.docname,
@ -894,32 +892,35 @@ frappe.ui.form.ControlLink = class ControlLink extends frappe.ui.form.ControlDat
};
// to avoid unnecessary request
if (value) {
return frappe
.xcall(
"frappe.client.validate_link",
{
doctype: options,
docname: value,
fields: columns_to_fetch,
},
"GET",
{ cache: !columns_to_fetch.length }
)
.then((response) => {
if (this.frm && !this.docname) {
return response.name;
}
if (!columns_to_fetch.length) {
return response.name;
}
update_dependant_fields(response);
return response.name;
});
} else {
update_dependant_fields({});
if (!value) {
update_dependant_fields();
return value;
}
// if there is a search_link call scheduled, cancel it
// validation will do it
this._debounced_input_handler?.cancel();
// filters may be too large to be sent as GET
const can_cache = !columns_to_fetch.length && !args.filters;
return frappe
.xcall(
"frappe.client.validate_link_and_fetch",
{
...args,
docname: value,
fields_to_fetch: columns_to_fetch,
},
can_cache ? "GET" : "POST",
{ cache: can_cache }
)
.then((response) => {
if (!response) return;
update_dependant_fields(response);
return response.name;
});
}
fetch_map_for_quick_entry() {

View file

@ -155,30 +155,43 @@ class TestClient(IntegrationTestCase):
self.assertEqual(get("ToDo", filters={}), get("ToDo", filters="{}"))
todo.delete()
def test_client_validatate_link(self):
from frappe.client import validate_link
def test_client_validate_link_and_fetch(self):
from frappe.client import validate_link_and_fetch
# Use Role doctype (no custom query like User has)
# Basic test
self.assertTrue(validate_link("User", "Guest"))
self.assertTrue(validate_link_and_fetch("Role", "System Manager"))
# fixes capitalization
if frappe.db.db_type == "mariadb":
self.assertEqual(validate_link("User", "GueSt"), {"name": "Guest"})
self.assertEqual(validate_link_and_fetch("Role", "system manager"), {"name": "System Manager"})
# Fetch
self.assertEqual(validate_link("User", "Guest", fields=["enabled"]), {"name": "Guest", "enabled": 1})
result = validate_link_and_fetch("Role", "System Manager", fields_to_fetch=["desk_access"])
self.assertEqual(result.get("name"), "System Manager")
self.assertIn("desk_access", result)
# Non-existent document returns empty
result = validate_link_and_fetch("Role", "Non Existent Role")
self.assertEqual(result, {})
# Filters - Role exists but filter excludes it
result = validate_link_and_fetch("Role", "System Manager", filters={"desk_access": 0})
self.assertEqual(result, {})
# Filters - Role exists and filter matches
result = validate_link_and_fetch("Role", "System Manager", filters={"desk_access": 1})
self.assertEqual(result.get("name"), "System Manager")
# Permissions
with self.set_user("Guest"), self.assertRaises(frappe.PermissionError):
self.assertEqual(
validate_link("User", "Guest", fields=["enabled"]), {"name": "Guest", "enabled": 1}
)
validate_link_and_fetch("Role", "System Manager")
def test_validate_link_child_table(self):
def test_validate_link_and_fetch_for_child_table(self):
"""
Test validate_link works for child table doctypes with field fetch.
Test validate_link_and_fetch works for child table doctypes with field fetch.
"""
from frappe.client import validate_link
from frappe.client import validate_link_and_fetch
self.addCleanup(frappe.db.rollback)
@ -188,7 +201,7 @@ class TestClient(IntegrationTestCase):
child_row = user.block_modules[-1]
result = validate_link("Block Module", child_row.name, fields=["module"])
result = validate_link_and_fetch("Block Module", child_row.name, fields_to_fetch=["module"])
self.assertEqual(result.get("name"), child_row.name)
self.assertEqual(result.get("module"), "Setup")