If you're reading 1000s of rows from MySQL, the default behaviour is to read all of them in memory at once. One of the use case for reading large rows is reporting where a lot of data is read and then processed in Python. The read row is hoever not used again but still consumes memory until entire function exits. SSCursor (Server Side Cursor) allows fetching one row at a time. Note: This is slower than fetching everything at once AND has risk of connection loss. So, don't use this as a crutch. If possible rewrite code so processing is done in SQL.
1014 lines
30 KiB
Python
1014 lines
30 KiB
Python
# Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and Contributors
|
|
# License: MIT. See LICENSE
|
|
|
|
import datetime
|
|
import inspect
|
|
from math import ceil
|
|
from random import choice
|
|
from unittest.mock import patch
|
|
|
|
import frappe
|
|
from frappe.core.utils import find
|
|
from frappe.custom.doctype.custom_field.custom_field import create_custom_field
|
|
from frappe.database import savepoint
|
|
from frappe.database.database import get_query_execution_timeout
|
|
from frappe.database.utils import FallBackDateTimeStr
|
|
from frappe.query_builder import Field
|
|
from frappe.query_builder.functions import Concat_ws
|
|
from frappe.tests.test_query_builder import db_type_is, run_only_if
|
|
from frappe.tests.utils import FrappeTestCase
|
|
from frappe.utils import add_days, cint, now, random_string, set_request
|
|
from frappe.utils.testutils import clear_custom_fields
|
|
|
|
|
|
class TestDB(FrappeTestCase):
|
|
def test_datetime_format(self):
|
|
now_str = now()
|
|
self.assertEqual(frappe.db.format_datetime(None), FallBackDateTimeStr)
|
|
self.assertEqual(frappe.db.format_datetime(now_str), now_str)
|
|
|
|
@run_only_if(db_type_is.MARIADB)
|
|
def test_get_column_type(self):
|
|
desc_data = frappe.db.sql("desc `tabUser`", as_dict=1)
|
|
user_name_type = find(desc_data, lambda x: x["Field"] == "name")["Type"]
|
|
self.assertEqual(frappe.db.get_column_type("User", "name"), user_name_type)
|
|
|
|
def test_get_database_size(self):
|
|
self.assertIsInstance(frappe.db.get_database_size(), (float, int))
|
|
|
|
def test_db_statement_execution_timeout(self):
|
|
frappe.db.set_execution_timeout(2)
|
|
# Setting 0 means no timeout.
|
|
self.addCleanup(frappe.db.set_execution_timeout, 0)
|
|
|
|
try:
|
|
savepoint = "statement_timeout"
|
|
frappe.db.savepoint(savepoint)
|
|
frappe.db.multisql(
|
|
{
|
|
"mariadb": "select sleep(10)",
|
|
"postgres": "select pg_sleep(10)",
|
|
}
|
|
)
|
|
except Exception as e:
|
|
self.assertTrue(frappe.db.is_statement_timeout(e), f"exepcted {e} to be timeout error")
|
|
frappe.db.rollback(save_point=savepoint)
|
|
else:
|
|
frappe.db.rollback(save_point=savepoint)
|
|
self.fail("Long running queries not timing out")
|
|
|
|
def test_skip_locking(self):
|
|
first_conn = frappe.local.db
|
|
name = frappe.db.get_value("User", "Administrator", "name", for_update=True, skip_locked=True)
|
|
self.assertEqual(name, "Administrator")
|
|
|
|
frappe.connect() # Create a 2nd connection
|
|
second_conn = frappe.local.db
|
|
self.assertIsNot(first_conn, second_conn)
|
|
name = frappe.db.get_value("User", "Administrator", "name", for_update=True, skip_locked=True)
|
|
self.assertFalse(name)
|
|
|
|
@patch.dict(frappe.conf, {"http_timeout": 20, "enable_db_statement_timeout": 1})
|
|
def test_db_timeout_computation(self):
|
|
set_request(method="GET", path="/")
|
|
self.assertEqual(get_query_execution_timeout(), 30)
|
|
frappe.local.request = None
|
|
self.assertEqual(get_query_execution_timeout(), 0)
|
|
|
|
def test_get_value(self):
|
|
self.assertEqual(frappe.db.get_value("User", {"name": ["=", "Administrator"]}), "Administrator")
|
|
self.assertEqual(frappe.db.get_value("User", {"name": ["like", "Admin%"]}), "Administrator")
|
|
self.assertNotEqual(frappe.db.get_value("User", {"name": ["!=", "Guest"]}), "Guest")
|
|
self.assertEqual(frappe.db.get_value("User", {"name": ["<", "Adn"]}), "Administrator")
|
|
self.assertEqual(frappe.db.get_value("User", {"name": ["<=", "Administrator"]}), "Administrator")
|
|
self.assertEqual(
|
|
frappe.db.get_value("User", {}, ["Max(name)"], order_by=None),
|
|
frappe.db.sql("SELECT Max(name) FROM tabUser")[0][0],
|
|
)
|
|
self.assertEqual(
|
|
frappe.db.get_value("User", {}, "Min(name)", order_by=None),
|
|
frappe.db.sql("SELECT Min(name) FROM tabUser")[0][0],
|
|
)
|
|
self.assertIn(
|
|
"for update",
|
|
frappe.db.get_value(
|
|
"User", Field("name") == "Administrator", for_update=True, run=False
|
|
).lower(),
|
|
)
|
|
user_doctype = frappe.qb.DocType("User")
|
|
self.assertEqual(
|
|
frappe.qb.from_(user_doctype).select(user_doctype.name, user_doctype.email).run(),
|
|
frappe.db.get_values(
|
|
user_doctype,
|
|
filters={},
|
|
fieldname=[user_doctype.name, user_doctype.email],
|
|
order_by=None,
|
|
),
|
|
)
|
|
self.assertEqual(
|
|
frappe.db.sql("""SELECT name FROM `tabUser` WHERE name > 's' ORDER BY MODIFIED DESC""")[0][0],
|
|
frappe.db.get_value("User", {"name": [">", "s"]}),
|
|
)
|
|
|
|
self.assertEqual(
|
|
frappe.db.sql("""SELECT name FROM `tabUser` WHERE name >= 't' ORDER BY MODIFIED DESC""")[0][0],
|
|
frappe.db.get_value("User", {"name": [">=", "t"]}),
|
|
)
|
|
self.assertEqual(
|
|
frappe.db.get_values(
|
|
"User",
|
|
filters={"name": "Administrator"},
|
|
distinct=True,
|
|
fieldname="email",
|
|
),
|
|
frappe.qb.from_(user_doctype)
|
|
.where(user_doctype.name == "Administrator")
|
|
.select("email")
|
|
.distinct()
|
|
.run(),
|
|
)
|
|
|
|
self.assertIn(
|
|
"concat_ws",
|
|
frappe.db.get_value(
|
|
"User",
|
|
filters={"name": "Administrator"},
|
|
fieldname=Concat_ws(" ", "LastName"),
|
|
run=False,
|
|
).lower(),
|
|
)
|
|
self.assertEqual(
|
|
frappe.db.sql("select email from tabUser where name='Administrator' order by modified DESC"),
|
|
frappe.db.get_values("User", filters=[["name", "=", "Administrator"]], fieldname="email"),
|
|
)
|
|
|
|
# test multiple orderby's
|
|
delimiter = '"' if frappe.db.db_type == "postgres" else "`"
|
|
self.assertIn(
|
|
"ORDER BY {deli}creation{deli} DESC,{deli}modified{deli} ASC,{deli}name{deli} DESC".format(
|
|
deli=delimiter
|
|
),
|
|
frappe.db.get_value("DocType", "DocField", order_by="creation desc, modified asc, name", run=0),
|
|
)
|
|
|
|
def test_escape(self):
|
|
frappe.db.escape("香港濟生堂製藥有限公司 - IT".encode())
|
|
|
|
def test_get_single_value(self):
|
|
# setup
|
|
values_dict = {
|
|
"Float": 1.5,
|
|
"Int": 1,
|
|
"Percent": 55.5,
|
|
"Currency": 12.5,
|
|
"Data": "Test",
|
|
"Date": datetime.datetime.now().date(),
|
|
"Datetime": datetime.datetime.now(),
|
|
"Time": datetime.timedelta(hours=9, minutes=45, seconds=10),
|
|
}
|
|
test_inputs = [
|
|
{"fieldtype": fieldtype, "value": value} for fieldtype, value in values_dict.items()
|
|
]
|
|
for fieldtype in values_dict:
|
|
create_custom_field(
|
|
"Print Settings",
|
|
{
|
|
"fieldname": f"test_{fieldtype.lower()}",
|
|
"label": f"Test {fieldtype}",
|
|
"fieldtype": fieldtype,
|
|
},
|
|
)
|
|
|
|
# test
|
|
for inp in test_inputs:
|
|
fieldname = f"test_{inp['fieldtype'].lower()}"
|
|
frappe.db.set_single_value("Print Settings", fieldname, inp["value"])
|
|
self.assertEqual(frappe.db.get_single_value("Print Settings", fieldname), inp["value"])
|
|
|
|
# teardown
|
|
clear_custom_fields("Print Settings")
|
|
|
|
def test_log_touched_tables(self):
|
|
frappe.flags.in_migrate = True
|
|
frappe.flags.touched_tables = set()
|
|
frappe.db.set_single_value("System Settings", "backup_limit", 5)
|
|
self.assertIn("tabSingles", frappe.flags.touched_tables)
|
|
|
|
frappe.flags.touched_tables = set()
|
|
todo = frappe.get_doc({"doctype": "ToDo", "description": "Random Description"})
|
|
todo.save()
|
|
self.assertIn("tabToDo", frappe.flags.touched_tables)
|
|
|
|
frappe.flags.touched_tables = set()
|
|
todo.description = "Another Description"
|
|
todo.save()
|
|
self.assertIn("tabToDo", frappe.flags.touched_tables)
|
|
|
|
if frappe.db.db_type != "postgres":
|
|
frappe.flags.touched_tables = set()
|
|
frappe.db.sql("UPDATE tabToDo SET description = 'Updated Description'")
|
|
self.assertNotIn("tabToDo SET", frappe.flags.touched_tables)
|
|
self.assertIn("tabToDo", frappe.flags.touched_tables)
|
|
|
|
frappe.flags.touched_tables = set()
|
|
todo.delete()
|
|
self.assertIn("tabToDo", frappe.flags.touched_tables)
|
|
|
|
frappe.flags.touched_tables = set()
|
|
cf = create_custom_field("ToDo", {"label": "ToDo Custom Field"})
|
|
self.assertIn("tabToDo", frappe.flags.touched_tables)
|
|
self.assertIn("tabCustom Field", frappe.flags.touched_tables)
|
|
if cf:
|
|
cf.delete()
|
|
frappe.db.commit()
|
|
frappe.flags.in_migrate = False
|
|
frappe.flags.touched_tables.clear()
|
|
|
|
def test_db_keywords_as_fields(self):
|
|
"""Tests if DB keywords work as docfield names. If they're wrapped with grave accents."""
|
|
# Using random.choices, picked out a list of 40 keywords for testing
|
|
all_keywords = {
|
|
"mariadb": [
|
|
"CHARACTER",
|
|
"DELAYED",
|
|
"LINES",
|
|
"EXISTS",
|
|
"YEAR_MONTH",
|
|
"LOCALTIME",
|
|
"BOTH",
|
|
"MEDIUMINT",
|
|
"LEFT",
|
|
"BINARY",
|
|
"DEFAULT",
|
|
"KILL",
|
|
"WRITE",
|
|
"SQL_SMALL_RESULT",
|
|
"CURRENT_TIME",
|
|
"CROSS",
|
|
"INHERITS",
|
|
"SELECT",
|
|
"TABLE",
|
|
"ALTER",
|
|
"CURRENT_TIMESTAMP",
|
|
"XOR",
|
|
"CASE",
|
|
"ALL",
|
|
"WHERE",
|
|
"INT",
|
|
"TO",
|
|
"SOME",
|
|
"DAY_MINUTE",
|
|
"ERRORS",
|
|
"OPTIMIZE",
|
|
"REPLACE",
|
|
"HIGH_PRIORITY",
|
|
"VARBINARY",
|
|
"HELP",
|
|
"IS",
|
|
"CHAR",
|
|
"DESCRIBE",
|
|
"KEY",
|
|
],
|
|
"postgres": [
|
|
"WORK",
|
|
"LANCOMPILER",
|
|
"REAL",
|
|
"HAVING",
|
|
"REPEATABLE",
|
|
"DATA",
|
|
"USING",
|
|
"BIT",
|
|
"DEALLOCATE",
|
|
"SERIALIZABLE",
|
|
"CURSOR",
|
|
"INHERITS",
|
|
"ARRAY",
|
|
"TRUE",
|
|
"IGNORE",
|
|
"PARAMETER_MODE",
|
|
"ROW",
|
|
"CHECKPOINT",
|
|
"SHOW",
|
|
"BY",
|
|
"SIZE",
|
|
"SCALE",
|
|
"UNENCRYPTED",
|
|
"WITH",
|
|
"AND",
|
|
"CONVERT",
|
|
"FIRST",
|
|
"SCOPE",
|
|
"WRITE",
|
|
"INTERVAL",
|
|
"CHARACTER_SET_SCHEMA",
|
|
"ADD",
|
|
"SCROLL",
|
|
"NULL",
|
|
"WHEN",
|
|
"TRANSACTION_ACTIVE",
|
|
"INT",
|
|
"FORTRAN",
|
|
"STABLE",
|
|
],
|
|
}
|
|
created_docs = []
|
|
|
|
# edit by rushabh: added [:1]
|
|
# don't run every keyword! - if one works, they all do
|
|
fields = all_keywords[frappe.conf.db_type][:1]
|
|
test_doctype = "ToDo"
|
|
|
|
def add_custom_field(field):
|
|
create_custom_field(
|
|
test_doctype,
|
|
{
|
|
"fieldname": field.lower(),
|
|
"label": field.title(),
|
|
"fieldtype": "Data",
|
|
},
|
|
)
|
|
|
|
# Create custom fields for test_doctype
|
|
for field in fields:
|
|
add_custom_field(field)
|
|
|
|
# Create documents under that doctype and query them via ORM
|
|
for _ in range(10):
|
|
docfields = {key.lower(): random_string(10) for key in fields}
|
|
doc = frappe.get_doc({"doctype": test_doctype, "description": random_string(20), **docfields})
|
|
doc.insert()
|
|
created_docs.append(doc.name)
|
|
|
|
random_field = choice(fields).lower()
|
|
random_doc = choice(created_docs)
|
|
random_value = random_string(20)
|
|
|
|
# Testing read
|
|
self.assertEqual(
|
|
list(frappe.get_all("ToDo", fields=[random_field], limit=1)[0])[0], random_field
|
|
)
|
|
self.assertEqual(
|
|
list(frappe.get_all("ToDo", fields=[f"`{random_field}` as total"], limit=1)[0])[0], "total"
|
|
)
|
|
|
|
# Testing read for distinct and sql functions
|
|
self.assertEqual(
|
|
list(
|
|
frappe.get_all(
|
|
"ToDo",
|
|
fields=[f"`{random_field}` as total"],
|
|
distinct=True,
|
|
limit=1,
|
|
)[0]
|
|
)[0],
|
|
"total",
|
|
)
|
|
self.assertEqual(
|
|
list(
|
|
frappe.get_all(
|
|
"ToDo",
|
|
fields=[f"`{random_field}`"],
|
|
distinct=True,
|
|
limit=1,
|
|
)[0]
|
|
)[0],
|
|
random_field,
|
|
)
|
|
self.assertEqual(
|
|
list(frappe.get_all("ToDo", fields=[f"count(`{random_field}`)"], limit=1)[0])[0],
|
|
"count" if frappe.conf.db_type == "postgres" else f"count(`{random_field}`)",
|
|
)
|
|
|
|
# Testing update
|
|
frappe.db.set_value(test_doctype, random_doc, random_field, random_value)
|
|
self.assertEqual(frappe.db.get_value(test_doctype, random_doc, random_field), random_value)
|
|
|
|
# Cleanup - delete records and remove custom fields
|
|
for doc in created_docs:
|
|
frappe.delete_doc(test_doctype, doc)
|
|
clear_custom_fields(test_doctype)
|
|
|
|
def test_savepoints(self):
|
|
frappe.db.rollback()
|
|
save_point = "todonope"
|
|
|
|
created_docs = []
|
|
failed_docs = []
|
|
|
|
for _ in range(5):
|
|
frappe.db.savepoint(save_point)
|
|
doc_gone = frappe.get_doc(doctype="ToDo", description="nope").save()
|
|
failed_docs.append(doc_gone.name)
|
|
frappe.db.rollback(save_point=save_point)
|
|
doc_kept = frappe.get_doc(doctype="ToDo", description="nope").save()
|
|
created_docs.append(doc_kept.name)
|
|
frappe.db.commit()
|
|
|
|
for d in failed_docs:
|
|
self.assertFalse(frappe.db.exists("ToDo", d))
|
|
for d in created_docs:
|
|
self.assertTrue(frappe.db.exists("ToDo", d))
|
|
|
|
def test_savepoints_wrapper(self):
|
|
frappe.db.rollback()
|
|
|
|
class SpecificExc(Exception):
|
|
pass
|
|
|
|
created_docs = []
|
|
failed_docs = []
|
|
|
|
for _ in range(5):
|
|
with savepoint(catch=SpecificExc):
|
|
doc_kept = frappe.get_doc(doctype="ToDo", description="nope").save()
|
|
created_docs.append(doc_kept.name)
|
|
|
|
with savepoint(catch=SpecificExc):
|
|
doc_gone = frappe.get_doc(doctype="ToDo", description="nope").save()
|
|
failed_docs.append(doc_gone.name)
|
|
raise SpecificExc
|
|
|
|
frappe.db.commit()
|
|
|
|
for d in failed_docs:
|
|
self.assertFalse(frappe.db.exists("ToDo", d))
|
|
for d in created_docs:
|
|
self.assertTrue(frappe.db.exists("ToDo", d))
|
|
|
|
def test_transaction_writes_error(self):
|
|
from frappe.database.database import Database
|
|
|
|
frappe.db.rollback()
|
|
|
|
frappe.db.MAX_WRITES_PER_TRANSACTION = 1
|
|
note = frappe.get_last_doc("ToDo")
|
|
note.description = "changed"
|
|
with self.assertRaises(frappe.TooManyWritesError) as tmw:
|
|
note.save()
|
|
|
|
frappe.db.MAX_WRITES_PER_TRANSACTION = Database.MAX_WRITES_PER_TRANSACTION
|
|
|
|
def test_transaction_write_counting(self):
|
|
note = frappe.get_doc(doctype="Note", title="transaction counting").insert()
|
|
|
|
writes = frappe.db.transaction_writes
|
|
frappe.db.set_value("Note", note.name, "content", "abc")
|
|
self.assertEqual(1, frappe.db.transaction_writes - writes)
|
|
writes = frappe.db.transaction_writes
|
|
|
|
frappe.db.sql(
|
|
"""
|
|
update `tabNote`
|
|
set content = 'abc'
|
|
where name = %s
|
|
""",
|
|
note.name,
|
|
)
|
|
self.assertEqual(1, frappe.db.transaction_writes - writes)
|
|
|
|
def test_pk_collision_ignoring(self):
|
|
# note has `name` generated from title
|
|
for _ in range(3):
|
|
frappe.get_doc(doctype="Note", title="duplicate name").insert(ignore_if_duplicate=True)
|
|
|
|
with savepoint():
|
|
self.assertRaises(
|
|
frappe.DuplicateEntryError, frappe.get_doc(doctype="Note", title="duplicate name").insert
|
|
)
|
|
# recover transaction to continue other tests
|
|
raise Exception
|
|
|
|
def test_read_only_errors(self):
|
|
frappe.db.rollback()
|
|
frappe.db.begin(read_only=True)
|
|
self.addCleanup(frappe.db.rollback)
|
|
|
|
with self.assertRaises(frappe.InReadOnlyMode):
|
|
frappe.db.set_value("User", "Administrator", "full_name", "Haxor")
|
|
|
|
def test_exists(self):
|
|
dt, dn = "User", "Administrator"
|
|
self.assertEqual(frappe.db.exists(dt, dn, cache=True), dn)
|
|
self.assertEqual(frappe.db.exists(dt, dn), dn)
|
|
self.assertEqual(frappe.db.exists(dt, {"name": ("=", dn)}), dn)
|
|
|
|
filters = {"doctype": dt, "name": ("like", "Admin%")}
|
|
self.assertEqual(frappe.db.exists(filters), dn)
|
|
self.assertEqual(filters["doctype"], dt) # make sure that doctype was not removed from filters
|
|
|
|
self.assertEqual(frappe.db.exists(dt, [["name", "=", dn]]), dn)
|
|
|
|
def test_bulk_insert(self):
|
|
current_count = frappe.db.count("ToDo")
|
|
test_body = f"test_bulk_insert - {random_string(10)}"
|
|
chunk_size = 10
|
|
|
|
for number_of_values in (1, 2, 5, 27):
|
|
current_transaction_writes = frappe.db.transaction_writes
|
|
|
|
frappe.db.bulk_insert(
|
|
"ToDo",
|
|
["name", "description"],
|
|
[[f"ToDo Test Bulk Insert {i}", test_body] for i in range(number_of_values)],
|
|
ignore_duplicates=True,
|
|
chunk_size=chunk_size,
|
|
)
|
|
|
|
# check that all records were inserted
|
|
self.assertEqual(number_of_values, frappe.db.count("ToDo") - current_count)
|
|
|
|
# check if inserts were done in chunks
|
|
expected_number_of_writes = ceil(number_of_values / chunk_size)
|
|
self.assertEqual(
|
|
expected_number_of_writes, frappe.db.transaction_writes - current_transaction_writes
|
|
)
|
|
|
|
frappe.db.delete("ToDo", {"description": test_body})
|
|
|
|
def test_count(self):
|
|
frappe.db.delete("Note")
|
|
|
|
frappe.get_doc(doctype="Note", title="note1", content="something").insert()
|
|
frappe.get_doc(doctype="Note", title="note2", content="someting else").insert()
|
|
|
|
# Count with no filtes
|
|
self.assertEqual((frappe.db.count("Note")), 2)
|
|
|
|
# simple filters
|
|
self.assertEqual((frappe.db.count("Note", [["title", "=", "note1"]])), 1)
|
|
|
|
frappe.get_doc(doctype="Note", title="note3", content="something other").insert()
|
|
|
|
# List of list filters with tables
|
|
self.assertEqual(
|
|
(
|
|
frappe.db.count(
|
|
"Note",
|
|
[["Note", "title", "like", "note%"], ["Note", "content", "like", "some%"]],
|
|
)
|
|
),
|
|
3,
|
|
)
|
|
|
|
frappe.db.rollback()
|
|
|
|
@run_only_if(db_type_is.POSTGRES)
|
|
def test_modify_query(self):
|
|
from frappe.database.postgres.database import modify_query
|
|
|
|
query = "select * from `tabtree b` where lft > 13 and rgt <= 16 and name =1.0 and parent = 4134qrsdc and isgroup = 1.00045"
|
|
self.assertEqual(
|
|
"select * from \"tabtree b\" where lft > '13' and rgt <= '16' and name = '1' and parent = 4134qrsdc and isgroup = 1.00045",
|
|
modify_query(query),
|
|
)
|
|
|
|
query = (
|
|
'select locate(".io", "frappe.io"), locate("3", cast(3 as varchar)), locate("3", 3::varchar)'
|
|
)
|
|
self.assertEqual(
|
|
'select strpos( "frappe.io", ".io"), strpos( cast(3 as varchar), "3"), strpos( 3::varchar, "3")',
|
|
modify_query(query),
|
|
)
|
|
|
|
@run_only_if(db_type_is.POSTGRES)
|
|
def test_modify_values(self):
|
|
from frappe.database.postgres.database import modify_values
|
|
|
|
self.assertEqual(
|
|
{"a": "23", "b": 23.0, "c": 23.0345, "d": "wow", "e": ("1", "2", "3", "abc")},
|
|
modify_values({"a": 23, "b": 23.0, "c": 23.0345, "d": "wow", "e": [1, 2, 3, "abc"]}),
|
|
)
|
|
self.assertEqual(
|
|
["23", 23.0, 23.00004345, "wow", ("1", "2", "3", "abc")],
|
|
modify_values((23, 23.0, 23.00004345, "wow", [1, 2, 3, "abc"])),
|
|
)
|
|
|
|
def test_callbacks(self):
|
|
|
|
order_of_execution = []
|
|
|
|
def f(val):
|
|
nonlocal order_of_execution
|
|
order_of_execution.append(val)
|
|
|
|
frappe.db.before_commit.add(lambda: f(0))
|
|
frappe.db.before_commit.add(lambda: f(1))
|
|
|
|
frappe.db.after_commit.add(lambda: f(2))
|
|
frappe.db.after_commit.add(lambda: f(3))
|
|
|
|
frappe.db.before_rollback.add(lambda: f("IGNORED"))
|
|
frappe.db.before_rollback.add(lambda: f("IGNORED"))
|
|
|
|
frappe.db.commit()
|
|
|
|
frappe.db.after_commit.add(lambda: f("IGNORED"))
|
|
frappe.db.after_commit.add(lambda: f("IGNORED"))
|
|
|
|
frappe.db.before_rollback.add(lambda: f(4))
|
|
frappe.db.before_rollback.add(lambda: f(5))
|
|
frappe.db.after_rollback.add(lambda: f(6))
|
|
frappe.db.after_rollback.add(lambda: f(7))
|
|
frappe.db.after_rollback(lambda: f(8))
|
|
|
|
frappe.db.rollback()
|
|
|
|
self.assertEqual(order_of_execution, list(range(0, 9)))
|
|
|
|
|
|
@run_only_if(db_type_is.MARIADB)
|
|
class TestDDLCommandsMaria(FrappeTestCase):
|
|
test_table_name = "TestNotes"
|
|
|
|
def setUp(self) -> None:
|
|
frappe.db.sql_ddl(
|
|
f"""
|
|
CREATE TABLE IF NOT EXISTS `tab{self.test_table_name}` (`id` INT NULL, content TEXT, PRIMARY KEY (`id`));
|
|
"""
|
|
)
|
|
|
|
def tearDown(self) -> None:
|
|
frappe.db.sql(f"DROP TABLE tab{self.test_table_name};")
|
|
self.test_table_name = "TestNotes"
|
|
|
|
def test_rename(self) -> None:
|
|
new_table_name = f"{self.test_table_name}_new"
|
|
frappe.db.rename_table(self.test_table_name, new_table_name)
|
|
check_exists = frappe.db.sql(
|
|
f"""
|
|
SELECT * FROM INFORMATION_SCHEMA.TABLES
|
|
WHERE TABLE_NAME = N'tab{new_table_name}';
|
|
"""
|
|
)
|
|
self.assertGreater(len(check_exists), 0)
|
|
self.assertIn(f"tab{new_table_name}", check_exists[0])
|
|
|
|
# * so this table is deleted after the rename
|
|
self.test_table_name = new_table_name
|
|
|
|
def test_describe(self) -> None:
|
|
self.assertSequenceEqual(
|
|
[
|
|
("id", "int(11)", "NO", "PRI", None, ""),
|
|
("content", "text", "YES", "", None, ""),
|
|
],
|
|
frappe.db.describe(self.test_table_name),
|
|
)
|
|
|
|
def test_change_type(self) -> None:
|
|
def get_table_description():
|
|
return frappe.db.sql(f"DESC `tab{self.test_table_name}`")
|
|
|
|
# try changing from int to varchar
|
|
frappe.db.change_column_type("TestNotes", "id", "varchar(255)")
|
|
self.assertIn("varchar(255)", get_table_description()[0])
|
|
|
|
# try changing from varchar to bigint
|
|
frappe.db.change_column_type("TestNotes", "id", "bigint")
|
|
self.assertIn("bigint(20)", get_table_description()[0])
|
|
|
|
def test_add_index(self) -> None:
|
|
index_name = "test_index"
|
|
frappe.db.add_index(self.test_table_name, ["id", "content(50)"], index_name)
|
|
indexs_in_table = frappe.db.sql(
|
|
f"""
|
|
SHOW INDEX FROM tab{self.test_table_name}
|
|
WHERE Key_name = '{index_name}';
|
|
"""
|
|
)
|
|
self.assertEqual(len(indexs_in_table), 2)
|
|
|
|
|
|
class TestDBSetValue(FrappeTestCase):
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
super().setUpClass()
|
|
cls.todo1 = frappe.get_doc(doctype="ToDo", description="test_set_value 1").insert()
|
|
cls.todo2 = frappe.get_doc(doctype="ToDo", description="test_set_value 2").insert()
|
|
|
|
def test_update_single_doctype_field(self):
|
|
value = frappe.db.get_single_value("System Settings", "deny_multiple_sessions")
|
|
changed_value = not value
|
|
|
|
frappe.db.set_single_value("System Settings", "deny_multiple_sessions", changed_value)
|
|
current_value = frappe.db.get_single_value("System Settings", "deny_multiple_sessions")
|
|
self.assertEqual(current_value, changed_value)
|
|
|
|
changed_value = not current_value
|
|
frappe.db.set_single_value("System Settings", "deny_multiple_sessions", changed_value)
|
|
current_value = frappe.db.get_single_value("System Settings", "deny_multiple_sessions")
|
|
self.assertEqual(current_value, changed_value)
|
|
|
|
changed_value = not current_value
|
|
frappe.db.set_single_value("System Settings", "deny_multiple_sessions", changed_value)
|
|
current_value = frappe.db.get_single_value("System Settings", "deny_multiple_sessions")
|
|
self.assertEqual(current_value, changed_value)
|
|
|
|
def test_none_no_set_value(self):
|
|
frappe.db.set_value("User", None, "middle_name", "test")
|
|
with self.assertQueryCount(0):
|
|
frappe.db.set_value("User", None, "middle_name", "test")
|
|
frappe.db.set_value("User", "User", "middle_name", "test")
|
|
|
|
def test_update_single_row_single_column(self):
|
|
frappe.db.set_value("ToDo", self.todo1.name, "description", "test_set_value change 1")
|
|
updated_value = frappe.db.get_value("ToDo", self.todo1.name, "description")
|
|
self.assertEqual(updated_value, "test_set_value change 1")
|
|
|
|
@patch("frappe.db.set_single_value")
|
|
def test_set_single_value_with_set_value(self, single_set):
|
|
frappe.db.set_value("Contact Us Settings", None, "country", "India")
|
|
single_set.assert_called_once()
|
|
|
|
def test_update_single_row_multiple_columns(self):
|
|
description, status = "Upated by test_update_single_row_multiple_columns", "Closed"
|
|
|
|
frappe.db.set_value(
|
|
"ToDo",
|
|
self.todo1.name,
|
|
{
|
|
"description": description,
|
|
"status": status,
|
|
},
|
|
update_modified=False,
|
|
)
|
|
|
|
updated_desciption, updated_status = frappe.db.get_value(
|
|
"ToDo", filters={"name": self.todo1.name}, fieldname=["description", "status"]
|
|
)
|
|
|
|
self.assertEqual(description, updated_desciption)
|
|
self.assertEqual(status, updated_status)
|
|
|
|
def test_update_multiple_rows_single_column(self):
|
|
frappe.db.set_value(
|
|
"ToDo", {"description": ("like", "%test_set_value%")}, "description", "change 2"
|
|
)
|
|
|
|
self.assertEqual(frappe.db.get_value("ToDo", self.todo1.name, "description"), "change 2")
|
|
self.assertEqual(frappe.db.get_value("ToDo", self.todo2.name, "description"), "change 2")
|
|
|
|
def test_update_multiple_rows_multiple_columns(self):
|
|
todos_to_update = frappe.get_all(
|
|
"ToDo",
|
|
filters={"description": ("like", "%test_set_value%"), "status": ("!=", "Closed")},
|
|
pluck="name",
|
|
)
|
|
|
|
frappe.db.set_value(
|
|
"ToDo",
|
|
{"description": ("like", "%test_set_value%"), "status": ("!=", "Closed")},
|
|
{"status": "Closed", "priority": "High"},
|
|
)
|
|
|
|
test_result = frappe.get_all(
|
|
"ToDo", filters={"name": ("in", todos_to_update)}, fields=["status", "priority"]
|
|
)
|
|
|
|
self.assertTrue(all(x for x in test_result if x["status"] == "Closed"))
|
|
self.assertTrue(all(x for x in test_result if x["priority"] == "High"))
|
|
|
|
def test_update_modified_options(self):
|
|
self.todo2.reload()
|
|
|
|
todo = self.todo2
|
|
updated_description = f"{todo.description} - by `test_update_modified_options`"
|
|
custom_modified = datetime.datetime.fromisoformat(add_days(now(), 10))
|
|
custom_modified_by = "user_that_doesnt_exist@example.com"
|
|
|
|
frappe.db.set_value("ToDo", todo.name, "description", updated_description, update_modified=False)
|
|
self.assertEqual(updated_description, frappe.db.get_value("ToDo", todo.name, "description"))
|
|
self.assertEqual(todo.modified, frappe.db.get_value("ToDo", todo.name, "modified"))
|
|
|
|
frappe.db.set_value(
|
|
"ToDo",
|
|
todo.name,
|
|
"description",
|
|
"test_set_value change 1",
|
|
modified=custom_modified,
|
|
modified_by=custom_modified_by,
|
|
)
|
|
self.assertTupleEqual(
|
|
(custom_modified, custom_modified_by),
|
|
frappe.db.get_value("ToDo", todo.name, ["modified", "modified_by"]),
|
|
)
|
|
|
|
def test_set_value(self):
|
|
self.todo1.reload()
|
|
|
|
frappe.db.set_value(
|
|
self.todo1.doctype,
|
|
self.todo1.name,
|
|
"description",
|
|
f"{self.todo1.description}-edit by `test_for_update`",
|
|
)
|
|
query = str(frappe.db.last_query)
|
|
|
|
if frappe.conf.db_type == "postgres":
|
|
from frappe.database.postgres.database import modify_query
|
|
|
|
self.assertTrue(modify_query("UPDATE `tabToDo` SET") in query)
|
|
if frappe.conf.db_type == "mariadb":
|
|
self.assertTrue("UPDATE `tabToDo` SET" in query)
|
|
|
|
def test_cleared_cache(self):
|
|
self.todo2.reload()
|
|
frappe.get_cached_doc(self.todo2.doctype, self.todo2.name) # init cache
|
|
|
|
description = f"{self.todo2.description}-edit by `test_cleared_cache`"
|
|
|
|
frappe.db.set_value(self.todo2.doctype, self.todo2.name, "description", description)
|
|
cached_doc = frappe.get_cached_doc(self.todo2.doctype, self.todo2.name)
|
|
self.assertEqual(cached_doc.description, description)
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
frappe.db.rollback()
|
|
|
|
|
|
@run_only_if(db_type_is.POSTGRES)
|
|
class TestDDLCommandsPost(FrappeTestCase):
|
|
test_table_name = "TestNotes"
|
|
|
|
def setUp(self) -> None:
|
|
frappe.db.sql(
|
|
f"""
|
|
CREATE TABLE "tab{self.test_table_name}" ("id" INT NULL, content text, PRIMARY KEY ("id"))
|
|
"""
|
|
)
|
|
|
|
def tearDown(self) -> None:
|
|
frappe.db.sql(f'DROP TABLE "tab{self.test_table_name}"')
|
|
self.test_table_name = "TestNotes"
|
|
|
|
def test_rename(self) -> None:
|
|
new_table_name = f"{self.test_table_name}_new"
|
|
frappe.db.rename_table(self.test_table_name, new_table_name)
|
|
check_exists = frappe.db.sql(
|
|
f"""
|
|
SELECT EXISTS (
|
|
SELECT FROM information_schema.tables
|
|
WHERE table_name = 'tab{new_table_name}'
|
|
);
|
|
"""
|
|
)
|
|
self.assertTrue(check_exists[0][0])
|
|
|
|
# * so this table is deleted after the rename
|
|
self.test_table_name = new_table_name
|
|
|
|
def test_describe(self) -> None:
|
|
self.assertSequenceEqual([("id",), ("content",)], frappe.db.describe(self.test_table_name))
|
|
|
|
def test_change_type(self) -> None:
|
|
from psycopg2.errors import DatatypeMismatch
|
|
|
|
def get_table_description():
|
|
return frappe.db.sql(
|
|
f"""
|
|
SELECT
|
|
table_name,
|
|
column_name,
|
|
data_type
|
|
FROM
|
|
information_schema.columns
|
|
WHERE
|
|
table_name = 'tab{self.test_table_name}'"""
|
|
)
|
|
|
|
# try changing from int to varchar
|
|
frappe.db.change_column_type(self.test_table_name, "id", "varchar(255)")
|
|
self.assertIn("character varying", get_table_description()[0])
|
|
|
|
# try changing from varchar to int
|
|
try:
|
|
frappe.db.change_column_type(self.test_table_name, "id", "bigint")
|
|
except DatatypeMismatch:
|
|
frappe.db.rollback()
|
|
|
|
# try changing from varchar to int (using cast)
|
|
frappe.db.change_column_type(self.test_table_name, "id", "bigint", use_cast=True)
|
|
self.assertIn("bigint", get_table_description()[0])
|
|
|
|
def test_add_index(self) -> None:
|
|
index_name = "test_index"
|
|
frappe.db.add_index(self.test_table_name, ["id", "content(50)"], index_name)
|
|
indexs_in_table = frappe.db.sql(
|
|
f"""
|
|
SELECT indexname
|
|
FROM pg_indexes
|
|
WHERE tablename = 'tab{self.test_table_name}'
|
|
AND indexname = '{index_name}' ;
|
|
""",
|
|
)
|
|
self.assertEqual(len(indexs_in_table), 1)
|
|
|
|
def test_sequence_table_creation(self):
|
|
from frappe.core.doctype.doctype.test_doctype import new_doctype
|
|
|
|
dt = new_doctype("autoinc_dt_seq_test", autoname="autoincrement").insert(ignore_permissions=True)
|
|
|
|
if frappe.db.db_type == "postgres":
|
|
self.assertTrue(
|
|
frappe.db.sql(
|
|
"""select sequence_name FROM information_schema.sequences
|
|
where sequence_name ilike 'autoinc_dt_seq_test%'"""
|
|
)[0][0]
|
|
)
|
|
else:
|
|
self.assertTrue(
|
|
frappe.db.sql(
|
|
"""select data_type FROM information_schema.tables
|
|
where table_type = 'SEQUENCE' and table_name like 'autoinc_dt_seq_test%'"""
|
|
)[0][0]
|
|
)
|
|
|
|
dt.delete(ignore_permissions=True)
|
|
|
|
def test_is(self):
|
|
user = frappe.qb.DocType("User")
|
|
self.assertIn(
|
|
"is not null", frappe.db.get_values(user, filters={user.name: ("is", "set")}, run=False).lower()
|
|
)
|
|
self.assertIn(
|
|
"is null", frappe.db.get_values(user, filters={user.name: ("is", "not set")}, run=False).lower()
|
|
)
|
|
|
|
|
|
@run_only_if(db_type_is.POSTGRES)
|
|
class TestTransactionManagement(FrappeTestCase):
|
|
def test_create_proper_transactions(self):
|
|
def _get_transaction_id():
|
|
return frappe.db.sql("select txid_current()", pluck=True)
|
|
|
|
self.assertEqual(_get_transaction_id(), _get_transaction_id())
|
|
|
|
frappe.db.rollback()
|
|
self.assertEqual(_get_transaction_id(), _get_transaction_id())
|
|
|
|
frappe.db.commit()
|
|
self.assertEqual(_get_transaction_id(), _get_transaction_id())
|
|
|
|
|
|
# Treat same DB as replica for tests, a separate connection will be opened
|
|
class TestReplicaConnections(FrappeTestCase):
|
|
def test_switching_to_replica(self):
|
|
with patch.dict(frappe.local.conf, {"read_from_replica": 1, "replica_host": "127.0.0.1"}):
|
|
|
|
def db_id():
|
|
return id(frappe.local.db)
|
|
|
|
write_connection = db_id()
|
|
read_only_connection = None
|
|
|
|
@frappe.read_only()
|
|
def outer():
|
|
nonlocal read_only_connection
|
|
read_only_connection = db_id()
|
|
|
|
# A new connection should be opened
|
|
self.assertNotEqual(read_only_connection, write_connection)
|
|
inner()
|
|
# calling nested read only function shouldn't change connection
|
|
self.assertEqual(read_only_connection, db_id())
|
|
|
|
@frappe.read_only()
|
|
def inner():
|
|
# calling nested read only function shouldn't change connection
|
|
self.assertEqual(read_only_connection, db_id())
|
|
|
|
outer()
|
|
self.assertEqual(write_connection, db_id())
|
|
|
|
|
|
class TestSqlIterator(FrappeTestCase):
|
|
def test_db_sql_iterator(self):
|
|
test_queries = [
|
|
"select * from `tabCountry` order by name",
|
|
"select code from `tabCountry` order by name",
|
|
"select code from `tabCountry` order by name limit 5",
|
|
]
|
|
|
|
for query in test_queries:
|
|
self.assertEqual(
|
|
frappe.db.sql(query, as_dict=True),
|
|
list(frappe.db.sql(query, as_dict=True, as_iterator=True)),
|
|
msg=f"{query=} results not same as iterator",
|
|
)
|
|
|
|
self.assertEqual(
|
|
frappe.db.sql(query, pluck=True),
|
|
list(frappe.db.sql(query, pluck=True, as_iterator=True)),
|
|
msg=f"{query=} results not same as iterator",
|
|
)
|
|
|
|
self.assertEqual(
|
|
frappe.db.sql(query, as_list=True),
|
|
list(frappe.db.sql(query, as_list=True, as_iterator=True)),
|
|
msg=f"{query=} results not same as iterator",
|
|
)
|
|
|
|
@run_only_if(db_type_is.MARIADB)
|
|
def test_unbuffered_cursor(self):
|
|
with frappe.db.unbuffered_cursor():
|
|
self.test_db_sql_iterator()
|