refactor!: Link field search (#22745)

* refactor!: Drop handling for SQL queries

This hasn't been supported in really long time, no need to check that
use cases.

It will still fail but with no special error message.

* fix: Catch all import related errors

* fix!: Use last query from hooks

* refactor!: Return search results like any other function

Search results are returned in `results` key which is incosistent from
most other functions

* refactor: simplify search_link
This commit is contained in:
Ankush Menat 2023-10-16 15:41:02 +05:30 committed by GitHub
parent 89aab5d748
commit c476c5e6d7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 187 additions and 217 deletions

View file

@ -1,20 +1,24 @@
# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors
# License: MIT. See LICENSE
import functools
import json
import re
from typing import TypedDict
from typing_extensions import NotRequired # not required in 3.11+
import frappe
# Backward compatbility
from frappe import _, is_whitelisted, validate_and_sanitize_search_inputs
from frappe.database.schema import SPECIAL_CHAR_PATTERN
from frappe.model.db_query import get_order_by
from frappe.permissions import has_permission
from frappe.utils import cint, cstr, unique
from frappe.utils.data import make_filter_tuple
def sanitize_searchfield(searchfield):
def sanitize_searchfield(searchfield: str):
if not searchfield:
return
@ -22,19 +26,25 @@ def sanitize_searchfield(searchfield):
frappe.throw(_("Invalid Search Field {0}").format(searchfield), frappe.DataError)
class LinkSearchResults(TypedDict):
value: str
description: str
label: NotRequired[str]
# this is called by the Link Field
@frappe.whitelist()
def search_link(
doctype,
txt,
query=None,
filters=None,
page_length=10,
searchfield=None,
reference_doctype=None,
ignore_user_permissions=False,
):
search_widget(
doctype: str,
txt: str,
query: str | None = None,
filters: str | dict | list | None = None,
page_length: int = 10,
searchfield: str | None = None,
reference_doctype: str | None = None,
ignore_user_permissions: bool = False,
) -> list[LinkSearchResults]:
results = search_widget(
doctype,
txt.strip(),
query,
@ -44,25 +54,23 @@ def search_link(
reference_doctype=reference_doctype,
ignore_user_permissions=ignore_user_permissions,
)
frappe.response["results"] = build_for_autosuggest(frappe.response["values"], doctype=doctype)
del frappe.response["values"]
return build_for_autosuggest(results, doctype=doctype)
# this is called by the search box
@frappe.whitelist()
def search_widget(
doctype,
txt,
query=None,
searchfield=None,
start=0,
page_length=10,
filters=None,
doctype: str,
txt: str,
query: str | None = None,
searchfield: str = None,
start: int = 0,
page_length: int = 10,
filters: str | None | dict | list = None,
filter_fields=None,
as_dict=False,
reference_doctype=None,
ignore_user_permissions=False,
as_dict: bool = False,
reference_doctype: str | None = None,
ignore_user_permissions: bool = False,
):
start = cint(start)
@ -78,11 +86,13 @@ def search_widget(
standard_queries = frappe.get_hooks().standard_queries or {}
if query and query.split(maxsplit=1)[0].lower() != "select":
# by method
if not query and doctype in standard_queries:
query = standard_queries[doctype][-1]
if query: # Query = custom search query i.e. python function
try:
is_whitelisted(frappe.get_attr(query))
frappe.response["values"] = frappe.call(
return frappe.call(
query,
doctype,
txt,
@ -93,9 +103,9 @@ def search_widget(
as_dict=as_dict,
reference_doctype=reference_doctype,
)
except frappe.exceptions.PermissionError as e:
except (frappe.PermissionError, frappe.AppNotInstalledError, ImportError):
if frappe.local.conf.developer_mode:
raise e
raise
else:
frappe.respond_as_web_page(
title="Invalid Method",
@ -103,156 +113,123 @@ def search_widget(
indicator_color="red",
http_status_code=404,
)
return
except Exception as e:
raise e
elif not query and doctype in standard_queries:
# from standard queries
search_widget(
doctype=doctype,
txt=txt,
query=standard_queries[doctype][0],
searchfield=searchfield,
start=start,
page_length=page_length,
filters=filters,
filter_fields=filter_fields,
as_dict=as_dict,
reference_doctype=reference_doctype,
ignore_user_permissions=ignore_user_permissions,
return []
meta = frappe.get_meta(doctype)
if isinstance(filters, dict):
filters_items = filters.items()
filters = []
for key, value in filters_items:
filters.append(make_filter_tuple(doctype, key, value))
if filters is None:
filters = []
or_filters = []
# build from doctype
if txt:
field_types = {
"Data",
"Text",
"Small Text",
"Long Text",
"Link",
"Select",
"Read Only",
"Text Editor",
}
search_fields = ["name"]
if meta.title_field:
search_fields.append(meta.title_field)
if meta.search_fields:
search_fields.extend(meta.get_search_fields())
for f in search_fields:
fmeta = meta.get_field(f.strip())
if not meta.translated_doctype and (f == "name" or (fmeta and fmeta.fieldtype in field_types)):
or_filters.append([doctype, f.strip(), "like", f"%{txt}%"])
if meta.get("fields", {"fieldname": "enabled", "fieldtype": "Check"}):
filters.append([doctype, "enabled", "=", 1])
if meta.get("fields", {"fieldname": "disabled", "fieldtype": "Check"}):
filters.append([doctype, "disabled", "!=", 1])
# format a list of fields combining search fields and filter fields
fields = get_std_fields_list(meta, searchfield or "name")
if filter_fields:
fields = list(set(fields + json.loads(filter_fields)))
formatted_fields = [f"`tab{meta.name}`.`{f.strip()}`" for f in fields]
# Insert title field query after name
if meta.show_title_field_in_link:
formatted_fields.insert(1, f"`tab{meta.name}`.{meta.title_field} as `label`")
order_by_based_on_meta = get_order_by(doctype, meta)
# `idx` is number of times a document is referred, check link_count.py
order_by = f"`tab{doctype}`.idx desc, {order_by_based_on_meta}"
if not meta.translated_doctype:
_txt = frappe.db.escape((txt or "").replace("%", "").replace("@", ""))
# locate returns 0 if string is not found, convert 0 to null and then sort null to end in order by
_relevance = f"(1 / nullif(locate({_txt}, `tab{doctype}`.`name`), 0))"
formatted_fields.append(f"""{_relevance} as `_relevance`""")
# Since we are sorting by alias postgres needs to know number of column we are sorting
if frappe.db.db_type == "mariadb":
order_by = f"ifnull(_relevance, -9999) desc, {order_by}"
elif frappe.db.db_type == "postgres":
# Since we are sorting by alias postgres needs to know number of column we are sorting
order_by = f"{len(formatted_fields)} desc nulls last, {order_by}"
ignore_permissions = doctype == "DocType" or (
cint(ignore_user_permissions)
and has_permission(
doctype,
ptype="select" if frappe.only_has_select_perm(doctype) else "read",
parent_doctype=reference_doctype,
)
else:
meta = frappe.get_meta(doctype)
)
if query:
frappe.throw(_("This query style is discontinued"))
# custom query
# frappe.response["values"] = frappe.db.sql(scrub_custom_query(query, searchfield, txt))
values = frappe.get_list(
doctype,
filters=filters,
fields=formatted_fields,
or_filters=or_filters,
limit_start=start,
limit_page_length=None if meta.translated_doctype else page_length,
order_by=order_by,
ignore_permissions=ignore_permissions,
reference_doctype=reference_doctype,
as_list=not as_dict,
strict=False,
)
if meta.translated_doctype:
# Filtering the values array so that query is included in very element
values = (
result
for result in values
if any(
re.search(f"{re.escape(txt)}.*", _(cstr(value)) or "", re.IGNORECASE)
for value in (result.values() if as_dict else result)
)
)
# Sorting the values array so that relevant results always come first
# This will first bring elements on top in which query is a prefix of element
# Then it will bring the rest of the elements and sort them in lexicographical order
values = sorted(values, key=lambda x: relevance_sorter(x, txt, as_dict))
# remove _relevance from results
if not meta.translated_doctype:
if as_dict:
for r in values:
r.pop("_relevance", None)
else:
if isinstance(filters, dict):
filters_items = filters.items()
filters = []
for f in filters_items:
if isinstance(f[1], (list, tuple)):
filters.append([doctype, f[0], f[1][0], f[1][1]])
else:
filters.append([doctype, f[0], "=", f[1]])
values = [r[:-1] for r in values]
if filters is None:
filters = []
or_filters = []
# build from doctype
if txt:
field_types = [
"Data",
"Text",
"Small Text",
"Long Text",
"Link",
"Select",
"Read Only",
"Text Editor",
]
search_fields = ["name"]
if meta.title_field:
search_fields.append(meta.title_field)
if meta.search_fields:
search_fields.extend(meta.get_search_fields())
for f in search_fields:
fmeta = meta.get_field(f.strip())
if not meta.translated_doctype and (
f == "name" or (fmeta and fmeta.fieldtype in field_types)
):
or_filters.append([doctype, f.strip(), "like", f"%{txt}%"])
if meta.get("fields", {"fieldname": "enabled", "fieldtype": "Check"}):
filters.append([doctype, "enabled", "=", 1])
if meta.get("fields", {"fieldname": "disabled", "fieldtype": "Check"}):
filters.append([doctype, "disabled", "!=", 1])
# format a list of fields combining search fields and filter fields
fields = get_std_fields_list(meta, searchfield or "name")
if filter_fields:
fields = list(set(fields + json.loads(filter_fields)))
formatted_fields = [f"`tab{meta.name}`.`{f.strip()}`" for f in fields]
# Insert title field query after name
if meta.show_title_field_in_link:
formatted_fields.insert(1, f"`tab{meta.name}`.{meta.title_field} as `label`")
# In order_by, `idx` gets second priority, because it stores link count
from frappe.model.db_query import get_order_by
order_by_based_on_meta = get_order_by(doctype, meta)
# 2 is the index of _relevance column
order_by = f"`tab{doctype}`.idx desc, {order_by_based_on_meta}"
if not meta.translated_doctype:
_txt = frappe.db.escape((txt or "").replace("%", "").replace("@", ""))
_relevance = f"(1 / nullif(locate({_txt}, `tab{doctype}`.`name`), 0))"
formatted_fields.append(f"""{_relevance} as `_relevance`""")
# Since we are sorting by alias postgres needs to know number of column we are sorting
if frappe.db.db_type == "mariadb":
order_by = f"ifnull(_relevance, -9999) desc, {order_by}"
elif frappe.db.db_type == "postgres":
# Since we are sorting by alias postgres needs to know number of column we are sorting
order_by = f"{len(formatted_fields)} desc nulls last, {order_by}"
ignore_permissions = (
True
if doctype == "DocType"
else (
cint(ignore_user_permissions)
and has_permission(
doctype,
ptype="select" if frappe.only_has_select_perm(doctype) else "read",
parent_doctype=reference_doctype,
)
)
)
values = frappe.get_list(
doctype,
filters=filters,
fields=formatted_fields,
or_filters=or_filters,
limit_start=start,
limit_page_length=None if meta.translated_doctype else page_length,
order_by=order_by,
ignore_permissions=ignore_permissions,
reference_doctype=reference_doctype,
as_list=not as_dict,
strict=False,
)
if meta.translated_doctype:
# Filtering the values array so that query is included in very element
values = (
result
for result in values
if any(
re.search(f"{re.escape(txt)}.*", _(cstr(value)) or "", re.IGNORECASE)
for value in (result.values() if as_dict else result)
)
)
# Sorting the values array so that relevant results always come first
# This will first bring elements on top in which query is a prefix of element
# Then it will bring the rest of the elements and sort them in lexicographical order
values = sorted(values, key=lambda x: relevance_sorter(x, txt, as_dict))
# remove _relevance from results
if not meta.translated_doctype:
if as_dict:
for r in values:
r.pop("_relevance")
else:
values = [r[:-1] for r in values]
frappe.response["values"] = values
return values
def get_std_fields_list(meta, key):
@ -273,7 +250,7 @@ def get_std_fields_list(meta, key):
return sflist
def build_for_autosuggest(res: list[tuple], doctype: str) -> list[dict]:
def build_for_autosuggest(res: list[tuple], doctype: str) -> list[LinkSearchResults]:
def to_string(parts):
return ", ".join(
unique(_(cstr(part)) if meta.translated_doctype else cstr(part) for part in parts if part)

View file

@ -124,7 +124,7 @@ frappe.db = {
filters,
},
callback(r) {
resolve(r.results);
resolve(r.message);
},
});
});

View file

@ -265,7 +265,7 @@ frappe.ui.form.ControlLink = class ControlLink extends frappe.ui.form.ControlDat
if (!window.Cypress && !me.$input.is(":focus")) {
return;
}
r.results = me.merge_duplicates(r.results);
r.message = me.merge_duplicates(r.message);
// show filter description in awesomplete
let filter_string = me.df.filter_description
@ -274,7 +274,7 @@ frappe.ui.form.ControlLink = class ControlLink extends frappe.ui.form.ControlDat
? me.get_filter_description(args.filters)
: null;
if (filter_string) {
r.results.push({
r.message.push({
html: `<span class="text-muted" style="line-height: 1.5">${filter_string}</span>`,
value: "",
action: () => {},
@ -284,7 +284,7 @@ frappe.ui.form.ControlLink = class ControlLink extends frappe.ui.form.ControlDat
if (!me.df.only_select) {
if (frappe.model.can_create(doctype)) {
// new item
r.results.push({
r.message.push({
html:
"<span class='link-option'>" +
"<i class='fa fa-plus' style='margin-right: 5px;'></i> " +
@ -302,13 +302,13 @@ frappe.ui.form.ControlLink = class ControlLink extends frappe.ui.form.ControlDat
frappe.ui.form.ControlLink.link_options(me);
if (custom__link_options) {
r.results = r.results.concat(custom__link_options);
r.message = r.message.concat(custom__link_options);
}
// advanced search
if (locals && locals["DocType"]) {
// not applicable in web forms
r.results.push({
r.message.push({
html:
"<span class='link-option'>" +
"<i class='fa fa-search' style='margin-right: 5px;'></i> " +
@ -320,7 +320,7 @@ frappe.ui.form.ControlLink = class ControlLink extends frappe.ui.form.ControlDat
});
}
}
me.$input.cache[doctype][term] = r.results;
me.$input.cache[doctype][term] = r.message;
me.awesomplete.list = me.$input.cache[doctype][term];
me.toggle_href(doctype);
},

View file

@ -86,14 +86,14 @@ frappe.ui.form.LinkSelector = class LinkSelector {
frappe.link_search(
this.doctype,
args,
function (r) {
function (results) {
var parent = me.dialog.fields_dict.results.$wrapper;
if (args.start === 0) {
parent.empty();
}
if (r.values.length) {
for (const v of r.values) {
if (results.length) {
for (const v of results) {
var row = $(
repl(
'<div class="row link-select-row">\
@ -149,7 +149,7 @@ frappe.ui.form.LinkSelector = class LinkSelector {
}
var more_btn = me.dialog.fields_dict.more.$wrapper;
if (r.values.length < me.page_length) {
if (results.length < me.page_length) {
more_btn.hide();
} else {
more_btn.show();
@ -246,7 +246,7 @@ frappe.link_search = function (doctype, args, callback, btn) {
type: "GET",
args: args,
callback: function (r) {
callback && callback(r);
callback && callback(r.message);
},
btn: btn,
});

View file

@ -576,22 +576,22 @@ frappe.ui.form.MultiSelectDialog = class MultiSelectDialog {
no_spinner: true,
args: args,
});
const more = res.values.length && res.values.length > this.page_length ? 1 : 0;
const more = res.message.length && res.message.length > this.page_length ? 1 : 0;
return [res, more];
return [res.message, more];
}
async get_results() {
const args = this.get_args_for_search();
const [res, more] = await this.perform_search(args);
let [results, more] = await this.perform_search(args);
if (more) {
res.values = res.values.splice(0, this.page_length);
results = results.splice(0, this.page_length);
}
this.results = [];
if (res.values.length) {
res.values.forEach((result) => {
if (results.length) {
results.forEach((result) => {
result.checked = 0;
this.results.push(result);
});
@ -602,11 +602,11 @@ frappe.ui.form.MultiSelectDialog = class MultiSelectDialog {
async get_filtered_parents_for_child_search() {
const parent_search_args = this.get_args_for_search();
parent_search_args.filter_fields = ["name"];
const [response, _] = await this.perform_search(parent_search_args);
const [results, _] = await this.perform_search(parent_search_args);
let parent_names = [];
if (response.values.length) {
parent_names = response.values.map((v) => v.name);
if (results.length) {
parent_names = results.map((v) => v.name);
}
return parent_names;
}

View file

@ -19,10 +19,10 @@ class TestSearch(FrappeTestCase):
self.addCleanup(teardown_test_link_field_order, self)
def test_search_field_sanitizer(self):
# pass
search_link("DocType", "User", query=None, filters=None, page_length=20, searchfield="name")
result = frappe.response["results"][0]
self.assertTrue("User" in result["value"])
results = search_link(
"DocType", "User", query=None, filters=None, page_length=20, searchfield="name"
)
self.assertTrue("User" in results[0]["value"])
# raise exception on injection
for searchfield in (
@ -67,7 +67,7 @@ class TestSearch(FrappeTestCase):
def test_link_field_order(self):
# Making a request to the search_link with the tree doctype
search_link(
results = search_link(
doctype=self.tree_doctype_name,
txt="all",
query=None,
@ -75,20 +75,18 @@ class TestSearch(FrappeTestCase):
page_length=20,
searchfield=None,
)
result = frappe.response["results"]
# Check whether the result is sorted or not
self.assertEqual(self.parent_doctype_name, result[0]["value"])
self.assertEqual(self.parent_doctype_name, results[0]["value"])
# Check whether searching for parent also list out children
self.assertEqual(len(result), len(self.child_doctypes_names) + 1)
self.assertEqual(len(results), len(self.child_doctypes_names) + 1)
# Search for the word "pay", part of the word "pays" (country) in french.
def test_link_search_in_foreign_language(self):
try:
frappe.local.lang = "fr"
search_widget(doctype="DocType", txt="pay", page_length=20)
output = frappe.response["values"]
output = search_widget(doctype="DocType", txt="pay", page_length=20)
result = [["found" for x in y if x == "Country"] for y in output]
self.assertTrue(["found"] in result)
@ -155,12 +153,12 @@ class TestSearch(FrappeTestCase):
self.assertListEqual(frappe.call("frappe.tests.test_search.get_data", *args, **kwargs), [])
# should not fail if query has @ symbol in it
search_link("User", "user@random", searchfield="name")
self.assertListEqual(frappe.response["results"], [])
results = search_link("User", "user@random", searchfield="name")
self.assertListEqual(results, [])
def test_reference_doctype(self):
"""search query methods should get reference_doctype if they want"""
results = test_search(
results = search_link(
doctype="User",
txt="",
filters=None,
@ -171,7 +169,7 @@ class TestSearch(FrappeTestCase):
self.assertListEqual(results, [])
def test_search_relevance(self):
search = partial(test_search, doctype="Language", filters=None, page_length=10)
search = partial(search_link, doctype="Language", filters=None, page_length=10)
for row in search(txt="e"):
self.assertTrue(row["value"].startswith("e"))
@ -183,11 +181,6 @@ class TestSearch(FrappeTestCase):
self.assertEqual("es", search(txt="es")[0]["value"])
def test_search(*args, **kwargs):
search_link(*args, **kwargs)
return frappe.response["results"]
@frappe.validate_and_sanitize_search_inputs
def get_data(doctype, txt, searchfield, start, page_len, filters):
return [doctype, txt, searchfield, start, page_len, filters]