Merge pull request #16804 from saxenabhishek/aks-test-query_builder
test: increase coverage for query builder
This commit is contained in:
commit
211fb8e97d
3 changed files with 100 additions and 10 deletions
|
|
@ -45,7 +45,7 @@ class MATCH(DistinctOptionFunction):
|
|||
|
||||
if self._Against:
|
||||
return f"{s} AGAINST ({frappe.db.escape(f'+{self._Against}*')} IN BOOLEAN MODE)"
|
||||
return s
|
||||
raise Exception("Chain the `Against()` method with match to complete the query")
|
||||
|
||||
@builder
|
||||
def Against(self, text: str):
|
||||
|
|
|
|||
|
|
@ -1,11 +1,12 @@
|
|||
from pypika.functions import *
|
||||
from pypika.terms import Arithmetic, ArithmeticExpression, CustomFunction, Function
|
||||
|
||||
import frappe
|
||||
from frappe.database.query import Query
|
||||
from frappe.query_builder.custom import GROUP_CONCAT, MATCH, STRING_AGG, TO_TSVECTOR
|
||||
from frappe.query_builder.utils import ImportMapper, db_type_is
|
||||
|
||||
from .utils import Column
|
||||
from .utils import PseudoColumn
|
||||
|
||||
|
||||
class Concat_ws(Function):
|
||||
|
|
@ -45,7 +46,7 @@ DateFormat = ImportMapper(
|
|||
|
||||
class Cast_(Function):
|
||||
def __init__(self, value, as_type, alias=None):
|
||||
if db_type_is.MARIADB and (
|
||||
if frappe.db.db_type == "mariadb" and (
|
||||
(hasattr(as_type, "get_sql") and as_type.get_sql().lower() == "varchar")
|
||||
or str(as_type).lower() == "varchar"
|
||||
):
|
||||
|
|
@ -72,7 +73,10 @@ class Cast_(Function):
|
|||
|
||||
def _aggregate(function, dt, fieldname, filters, **kwargs):
|
||||
return (
|
||||
Query().build_conditions(dt, filters).select(function(Column(fieldname))).run(**kwargs)[0][0]
|
||||
Query()
|
||||
.build_conditions(dt, filters)
|
||||
.select(function(PseudoColumn(fieldname)))
|
||||
.run(**kwargs)[0][0]
|
||||
or 0
|
||||
)
|
||||
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ from typing import Callable
|
|||
|
||||
import frappe
|
||||
from frappe.query_builder import Case
|
||||
from frappe.query_builder.builder import Function
|
||||
from frappe.query_builder.custom import ConstantColumn
|
||||
from frappe.query_builder.functions import Cast_, Coalesce, CombineDatetime, GroupConcat, Match
|
||||
from frappe.query_builder.utils import db_type_is
|
||||
|
|
@ -18,7 +19,10 @@ class TestCustomFunctionsMariaDB(unittest.TestCase):
|
|||
self.assertEqual("GROUP_CONCAT('Notes')", GroupConcat("Notes").get_sql())
|
||||
|
||||
def test_match(self):
|
||||
query = Match("Notes").Against("text")
|
||||
query = Match("Notes")
|
||||
with self.assertRaises(Exception):
|
||||
query.get_sql()
|
||||
query = query.Against("text")
|
||||
self.assertEqual(" MATCH('Notes') AGAINST ('+text*' IN BOOLEAN MODE)", query.get_sql())
|
||||
|
||||
def test_constant_column(self):
|
||||
|
|
@ -71,8 +75,12 @@ class TestCustomFunctionsMariaDB(unittest.TestCase):
|
|||
|
||||
def test_cast(self):
|
||||
note = frappe.qb.DocType("Note")
|
||||
self.assertEqual("CONCAT(`tabnote`.`name`, '')", Cast_(note.name, "varchar"))
|
||||
self.assertEqual("CAST(`tabnote`.`name` AS INTEGER)", Cast_(note.name, "integer"))
|
||||
self.assertEqual("CONCAT(name,'')", Cast_(note.name, "varchar").get_sql())
|
||||
self.assertEqual("CAST(name AS INTEGER)", Cast_(note.name, "integer").get_sql())
|
||||
self.assertEqual(
|
||||
frappe.qb.from_("red").from_(note).select("other", Cast_(note.name, "varchar")).get_sql(),
|
||||
"SELECT `tabred`.`other`,CONCAT(`tabNote`.`name`,'') FROM `tabred`,`tabNote`",
|
||||
)
|
||||
|
||||
|
||||
@run_only_if(db_type_is.POSTGRES)
|
||||
|
|
@ -81,6 +89,8 @@ class TestCustomFunctionsPostgres(unittest.TestCase):
|
|||
self.assertEqual("STRING_AGG('Notes',',')", GroupConcat("Notes").get_sql())
|
||||
|
||||
def test_match(self):
|
||||
query = Match("Notes")
|
||||
self.assertEqual("TO_TSVECTOR('Notes')", query.get_sql())
|
||||
query = Match("Notes").Against("text")
|
||||
self.assertEqual("TO_TSVECTOR('Notes') @@ PLAINTO_TSQUERY('text')", query.get_sql())
|
||||
|
||||
|
|
@ -132,8 +142,12 @@ class TestCustomFunctionsPostgres(unittest.TestCase):
|
|||
|
||||
def test_cast(self):
|
||||
note = frappe.qb.DocType("Note")
|
||||
self.assertEqual("CAST(`tabnote`.`name` AS VARCHAR)", Cast_(note.name, "varchar"))
|
||||
self.assertEqual("CAST(`tabnote`.`name` AS INTEGER)", Cast_(note.name, "integer"))
|
||||
self.assertEqual("CAST(name AS VARCHAR)", Cast_(note.name, "varchar").get_sql())
|
||||
self.assertEqual("CAST(name AS INTEGER)", Cast_(note.name, "integer").get_sql())
|
||||
self.assertEqual(
|
||||
frappe.qb.from_("red").from_(note).select("other", Cast_(note.name, "varchar")).get_sql(),
|
||||
'SELECT "tabred"."other",CAST("tabNote"."name" AS VARCHAR) FROM "tabred","tabNote"',
|
||||
)
|
||||
|
||||
|
||||
class TestBuilderBase(object):
|
||||
|
|
@ -149,6 +163,25 @@ class TestBuilderBase(object):
|
|||
self.assertIsInstance(query.run, Callable)
|
||||
self.assertIsInstance(data, list)
|
||||
|
||||
def test_agg_funcs(self):
|
||||
frappe.db.truncate("Communication")
|
||||
sample_data = {
|
||||
"doctype": "Communication",
|
||||
"communication_type": "Communication",
|
||||
"content": "testing",
|
||||
"rating": 1,
|
||||
}
|
||||
frappe.get_doc(sample_data).insert()
|
||||
sample_data["rating"] = 3
|
||||
frappe.get_doc(sample_data).insert()
|
||||
sample_data["rating"] = 4
|
||||
frappe.get_doc(sample_data).insert()
|
||||
self.assertEqual(frappe.qb.max("Communication", "rating"), 4)
|
||||
self.assertEqual(frappe.qb.min("Communication", "rating"), 1)
|
||||
self.assertAlmostEqual(frappe.qb.avg("Communication", "rating"), 2.666, places=2)
|
||||
self.assertEqual(frappe.qb.sum("Communication", "rating"), 8.0)
|
||||
frappe.db.rollback()
|
||||
|
||||
|
||||
class TestParameterization(unittest.TestCase):
|
||||
def test_where_conditions(self):
|
||||
|
|
@ -163,7 +196,7 @@ class TestParameterization(unittest.TestCase):
|
|||
self.assertIn("param1", params)
|
||||
self.assertEqual(params["param1"], "Administrator' --")
|
||||
|
||||
def test_set_cnoditions(self):
|
||||
def test_set_conditions(self):
|
||||
DocType = frappe.qb.DocType("DocType")
|
||||
query = frappe.qb.update(DocType).set(DocType.value, "some_value")
|
||||
|
||||
|
|
@ -230,6 +263,19 @@ class TestParameterization(unittest.TestCase):
|
|||
self.assertEqual(params["param4"], "true_value")
|
||||
self.assertEqual(params["param5"], "Overdue")
|
||||
|
||||
def test_named_parameter_wrapper(self):
|
||||
from frappe.query_builder.terms import NamedParameterWrapper
|
||||
|
||||
test_npw = NamedParameterWrapper()
|
||||
self.assertTrue(hasattr(test_npw, "parameters"))
|
||||
self.assertEqual(test_npw.get_sql("test_string_one"), "%(param1)s")
|
||||
self.assertEqual(test_npw.get_sql("test_string_two"), "%(param2)s")
|
||||
params = test_npw.get_parameters()
|
||||
for key in params.keys():
|
||||
# checks for param# format
|
||||
self.assertRegex(key, r"param\d")
|
||||
self.assertEqual(params["param1"], "test_string_one")
|
||||
|
||||
|
||||
@run_only_if(db_type_is.MARIADB)
|
||||
class TestBuilderMaria(unittest.TestCase, TestBuilderBase):
|
||||
|
|
@ -237,6 +283,12 @@ class TestBuilderMaria(unittest.TestCase, TestBuilderBase):
|
|||
self.assertEqual("SELECT * FROM `tabNotes`", frappe.qb.from_("Notes").select("*").get_sql())
|
||||
self.assertEqual("SELECT * FROM `__Auth`", frappe.qb.from_("__Auth").select("*").get_sql())
|
||||
|
||||
def test_get_qb_type(self):
|
||||
from frappe.query_builder import get_query_builder
|
||||
|
||||
qb = get_query_builder(frappe.db.db_type)
|
||||
self.assertEqual("SELECT * FROM `tabDocType`", qb().from_("DocType").select("*").get_sql())
|
||||
|
||||
|
||||
@run_only_if(db_type_is.POSTGRES)
|
||||
class TestBuilderPostgres(unittest.TestCase, TestBuilderBase):
|
||||
|
|
@ -253,3 +305,37 @@ class TestBuilderPostgres(unittest.TestCase, TestBuilderBase):
|
|||
|
||||
def test_replace_fields_post(self):
|
||||
self.assertEqual("relname", frappe.qb.Field("table_name").get_sql())
|
||||
|
||||
def test_get_qb_type(self):
|
||||
from frappe.query_builder import get_query_builder
|
||||
|
||||
qb = get_query_builder(frappe.db.db_type)
|
||||
self.assertEqual('SELECT * FROM "tabDocType"', qb().from_("DocType").select("*").get_sql())
|
||||
|
||||
|
||||
class TestMisc(unittest.TestCase):
|
||||
def test_custom_func(self):
|
||||
rand_func = frappe.qb.functions("rand", "45")
|
||||
self.assertIsInstance(rand_func, Function)
|
||||
self.assertEqual(rand_func.get_sql(), "rand('45')")
|
||||
|
||||
def test_function_with_schema(self):
|
||||
from frappe.query_builder import ParameterizedFunction
|
||||
|
||||
x = ParameterizedFunction("rand", "45")
|
||||
x.schema = frappe.qb.DocType("DocType")
|
||||
self.assertEqual("tabDocType.rand('45')", x.get_sql())
|
||||
|
||||
def test_util_table(self):
|
||||
from frappe.query_builder.utils import Table
|
||||
|
||||
DocType = Table("DocType")
|
||||
self.assertEqual(DocType.get_sql(), "DocType")
|
||||
|
||||
def test_error_on_query_class(self):
|
||||
import frappe.query_builder.utils
|
||||
|
||||
frappe.query_builder.utils.get_type_hints = lambda x: {"return": None}
|
||||
|
||||
with self.assertRaises(frappe.query_builder.utils.BuilderIdentificationFailed):
|
||||
frappe.query_builder.utils.patch_query_execute()
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue