seitime-frappe/frappe/database/database.py
Ankush Menat 0164a87ac9
Merge pull request #17350 from Aradhya-Tripathi/get-all-mod
feat: Adding support to Query engine
2022-07-13 18:36:04 +05:30

1312 lines
36 KiB
Python

# Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and Contributors
# License: MIT. See LICENSE
# Database Module
# --------------------
import datetime
import random
import re
import string
from contextlib import contextmanager
from time import time
from pypika.terms import Criterion, NullValue
import frappe
import frappe.defaults
import frappe.model.meta
from frappe import _
from frappe.exceptions import DoesNotExistError
from frappe.model.utils.link_count import flush_local_link_count
from frappe.query_builder.functions import Count
from frappe.query_builder.utils import DocType
from frappe.utils import cast as cast_fieldtype
from frappe.utils import get_datetime, get_table_name, getdate, now, sbool
IFNULL_PATTERN = re.compile(r"ifnull\(", flags=re.IGNORECASE)
INDEX_PATTERN = re.compile(r"\s*\([^)]+\)\s*")
SINGLE_WORD_PATTERN = re.compile(r'([`"]?)(tab([A-Z]\w+))\1')
MULTI_WORD_PATTERN = re.compile(r'([`"])(tab([A-Z]\w+)( [A-Z]\w+)+)\1')
def is_query_type(query: str, query_type: str | tuple[str]) -> bool:
return query.lstrip().split(maxsplit=1)[0].lower().startswith(query_type)
class Database:
"""
Open a database connection with the given parmeters, if use_default is True, use the
login details from `conf.py`. This is called by the request handler and is accessible using
the `db` global variable. the `sql` method is also global to run queries
"""
VARCHAR_LEN = 140
MAX_COLUMN_LENGTH = 64
OPTIONAL_COLUMNS = ["_user_tags", "_comments", "_assign", "_liked_by"]
DEFAULT_SHORTCUTS = ["_Login", "__user", "_Full Name", "Today", "__today", "now", "Now"]
STANDARD_VARCHAR_COLUMNS = ("name", "owner", "modified_by")
DEFAULT_COLUMNS = ["name", "creation", "modified", "modified_by", "owner", "docstatus", "idx"]
CHILD_TABLE_COLUMNS = ("parent", "parenttype", "parentfield")
MAX_WRITES_PER_TRANSACTION = 200_000
class InvalidColumnName(frappe.ValidationError):
pass
def __init__(self, host=None, user=None, password=None, ac_name=None, use_default=0, port=None):
self.setup_type_map()
self.host = host or frappe.conf.db_host or "127.0.0.1"
self.port = port or frappe.conf.db_port or ""
self.user = user or frappe.conf.db_name
self.db_name = frappe.conf.db_name
self._conn = None
if ac_name:
self.user = ac_name or frappe.conf.db_name
if use_default:
self.user = frappe.conf.db_name
self.transaction_writes = 0
self.auto_commit_on_many_writes = 0
self.password = password or frappe.conf.db_password
self.value_cache = {}
def setup_type_map(self):
pass
def connect(self):
"""Connects to a database as set in `site_config.json`."""
self.cur_db_name = self.user
self._conn = self.get_connection()
self._cursor = self._conn.cursor()
frappe.local.rollback_observers = []
def use(self, db_name):
"""`USE` db_name."""
self._conn.select_db(db_name)
def get_connection(self):
pass
def get_database_size(self):
pass
def sql(
self,
query,
values=(),
as_dict=0,
as_list=0,
formatted=0,
debug=0,
ignore_ddl=0,
as_utf8=0,
auto_commit=0,
update=None,
explain=False,
run=True,
pluck=False,
):
"""Execute a SQL query and fetch all rows.
:param query: SQL query.
:param values: List / dict of values to be escaped and substituted in the query.
:param as_dict: Return as a dictionary.
:param as_list: Always return as a list.
:param formatted: Format values like date etc.
:param debug: Print query and `EXPLAIN` in debug log.
:param ignore_ddl: Catch exception if table, column missing.
:param as_utf8: Encode values as UTF 8.
:param auto_commit: Commit after executing the query.
:param update: Update this dict to all rows (if returned `as_dict`).
:param run: Returns query without executing it if False.
Examples:
# return customer names as dicts
frappe.db.sql("select name from tabCustomer", as_dict=True)
# return names beginning with a
frappe.db.sql("select name from tabCustomer where name like %s", "a%")
# values as dict
frappe.db.sql("select name from tabCustomer where name like %(name)s and owner=%(owner)s",
{"name": "a%", "owner":"test@example.com"})
"""
debug = debug or getattr(self, "debug", False)
query = str(query)
if not run:
return query
# remove whitespace / indentation from start and end of query
query = query.strip()
# replaces ifnull in query with coalesce
query = IFNULL_PATTERN.sub("coalesce(", query)
if not self._conn:
self.connect()
# in transaction validations
self.check_transaction_status(query)
self.clear_db_table_cache(query)
# autocommit
if auto_commit:
self.commit()
# execute
try:
if debug:
time_start = time()
self.log_query(query, values, debug, explain)
if values != ():
# MySQL-python==1.2.5 hack!
if not isinstance(values, (dict, tuple, list)):
values = (values,)
self._cursor.execute(query, values)
if frappe.flags.in_migrate:
self.log_touched_tables(query, values)
else:
self._cursor.execute(query)
if frappe.flags.in_migrate:
self.log_touched_tables(query)
if debug:
time_end = time()
frappe.errprint(("Execution time: {} sec").format(round(time_end - time_start, 2)))
except Exception as e:
if self.is_syntax_error(e):
# only for mariadb
frappe.errprint("Syntax error in query:")
frappe.errprint(query)
elif self.is_deadlocked(e):
raise frappe.QueryDeadlockError(e)
elif self.is_timedout(e):
raise frappe.QueryTimeoutError(e)
elif frappe.conf.db_type == "postgres":
# TODO: added temporarily
import traceback
traceback.print_stack()
print(e)
raise
if ignore_ddl and (
self.is_missing_column(e) or self.is_table_missing(e) or self.cant_drop_field_or_key(e)
):
pass
else:
raise
if auto_commit:
self.commit()
if not self._cursor.description:
return ()
if pluck:
return [r[0] for r in self._cursor.fetchall()]
# scrub output if required
if as_dict:
ret = self.fetch_as_dict(formatted, as_utf8)
if update:
for r in ret:
r.update(update)
return ret
elif as_list:
return self.convert_to_lists(self._cursor.fetchall(), formatted, as_utf8)
elif as_utf8:
return self.convert_to_lists(self._cursor.fetchall(), formatted, as_utf8)
else:
return self._cursor.fetchall()
def log_query(self, query, values, debug, explain):
# for debugging in tests
if frappe.conf.get("allow_tests") and frappe.cache().get_value("flag_print_sql"):
print(self.mogrify(query, values))
# debug
if debug:
if explain and is_query_type(query, "select"):
self.explain_query(query, values)
frappe.errprint(self.mogrify(query, values))
# info
if (frappe.conf.get("logging") or False) == 2:
frappe.log("<<<< query")
frappe.log(self.mogrify(query, values))
frappe.log(">>>>")
def mogrify(self, query, values):
"""build the query string with values"""
if not values:
return query
else:
try:
return self._cursor.mogrify(query, values)
except Exception:
return (query, values)
def explain_query(self, query, values=None):
"""Print `EXPLAIN` in error log."""
try:
frappe.errprint("--- query explain ---")
if values is None:
self._cursor.execute("explain " + query)
else:
self._cursor.execute("explain " + query, values)
import json
frappe.errprint(json.dumps(self.fetch_as_dict(), indent=1))
frappe.errprint("--- query explain end ---")
except Exception:
frappe.errprint("error in query explain")
def sql_list(self, query, values=(), debug=False, **kwargs):
"""Return data as list of single elements (first column).
Example:
# doctypes = ["DocType", "DocField", "User", ...]
doctypes = frappe.db.sql_list("select name from DocType")
"""
return self.sql(query, values, **kwargs, debug=debug, pluck=True)
def sql_ddl(self, query, debug=False):
"""Commit and execute a query. DDL (Data Definition Language) queries that alter schema
autocommit in MariaDB."""
self.commit()
self.sql(query, debug=debug)
def check_transaction_status(self, query):
"""Raises exception if more than 20,000 `INSERT`, `UPDATE` queries are
executed in one transaction. This is to ensure that writes are always flushed otherwise this
could cause the system to hang."""
self.check_implicit_commit(query)
if query and is_query_type(query, ("commit", "rollback")):
self.transaction_writes = 0
if query[:6].lower() in ("update", "insert", "delete"):
self.transaction_writes += 1
if self.transaction_writes > self.MAX_WRITES_PER_TRANSACTION:
if self.auto_commit_on_many_writes:
self.commit()
else:
msg = "<br><br>" + _("Too many changes to database in single action.") + "<br>"
msg += _("The changes have been reverted.") + "<br>"
raise frappe.TooManyWritesError(msg)
def check_implicit_commit(self, query):
if (
self.transaction_writes
and query
and is_query_type(query, ("start", "alter", "drop", "create", "begin", "truncate"))
):
raise Exception("This statement can cause implicit commit")
def fetch_as_dict(self, formatted=0, as_utf8=0):
"""Internal. Converts results to dict."""
result = self._cursor.fetchall()
ret = []
if result:
keys = [column[0] for column in self._cursor.description]
for r in result:
values = []
for value in r:
if as_utf8 and isinstance(value, str):
value = value.encode("utf-8")
values.append(value)
ret.append(frappe._dict(zip(keys, values)))
return ret
@staticmethod
def clear_db_table_cache(query):
if query and is_query_type(query, ("drop", "create")):
frappe.cache().delete_key("db_tables")
@staticmethod
def needs_formatting(result, formatted):
"""Returns true if the first row in the result has a Date, Datetime, Long Int."""
if result and result[0]:
for v in result[0]:
if isinstance(v, (datetime.date, datetime.timedelta, datetime.datetime, int)):
return True
if formatted and isinstance(v, (int, float)):
return True
return False
def get_description(self):
"""Returns result metadata."""
return self._cursor.description
@staticmethod
def convert_to_lists(res, formatted=0, as_utf8=0):
"""Convert tuple output to lists (internal)."""
nres = []
for r in res:
nr = []
for val in r:
if as_utf8 and isinstance(val, str):
val = val.encode("utf-8")
nr.append(val)
nres.append(nr)
return nres
def get(self, doctype, filters=None, as_dict=True, cache=False):
"""Returns `get_value` with fieldname='*'"""
return self.get_value(doctype, filters, "*", as_dict=as_dict, cache=cache)
def get_value(
self,
doctype,
filters=None,
fieldname="name",
ignore=None,
as_dict=False,
debug=False,
order_by="KEEP_DEFAULT_ORDERING",
cache=False,
for_update=False,
*,
run=True,
pluck=False,
distinct=False,
):
"""Returns a document property or list of properties.
:param doctype: DocType name.
:param filters: Filters like `{"x":"y"}` or name of the document. `None` if Single DocType.
:param fieldname: Column name.
:param ignore: Don't raise exception if table, column is missing.
:param as_dict: Return values as dict.
:param debug: Print query in error log.
:param order_by: Column to order by
Example:
# return first customer starting with a
frappe.db.get_value("Customer", {"name": ("like a%")})
# return last login of **User** `test@example.com`
frappe.db.get_value("User", "test@example.com", "last_login")
last_login, last_ip = frappe.db.get_value("User", "test@example.com",
["last_login", "last_ip"])
# returns default date_format
frappe.db.get_value("System Settings", None, "date_format")
"""
result = self.get_values(
doctype,
filters,
fieldname,
ignore,
as_dict,
debug,
order_by,
cache=cache,
for_update=for_update,
run=run,
pluck=pluck,
distinct=distinct,
limit=1,
)
if not run:
return result
if not result:
return None
row = result[0]
if len(row) > 1 or as_dict:
return row
else:
# single field is requested, send it without wrapping in containers
return row[0]
def get_values(
self,
doctype,
filters=None,
fieldname="name",
ignore=None,
as_dict=False,
debug=False,
order_by="KEEP_DEFAULT_ORDERING",
update=None,
cache=False,
for_update=False,
*,
run=True,
pluck=False,
distinct=False,
limit=None,
):
"""Returns multiple document properties.
:param doctype: DocType name.
:param filters: Filters like `{"x":"y"}` or name of the document.
:param fieldname: Column name.
:param ignore: Don't raise exception if table, column is missing.
:param as_dict: Return values as dict.
:param debug: Print query in error log.
:param order_by: Column to order by,
:param distinct: Get Distinct results.
Example:
# return first customer starting with a
customers = frappe.db.get_values("Customer", {"name": ("like a%")})
# return last login of **User** `test@example.com`
user = frappe.db.get_values("User", "test@example.com", "*")[0]
"""
out = None
if cache and isinstance(filters, str) and (doctype, filters, fieldname) in self.value_cache:
return self.value_cache[(doctype, filters, fieldname)]
if distinct:
order_by = None
if isinstance(filters, list):
out = self._get_value_for_many_names(
doctype=doctype,
names=filters,
field=fieldname,
order_by=order_by,
debug=debug,
run=run,
pluck=pluck,
distinct=distinct,
limit=limit,
as_dict=as_dict,
)
else:
fields = fieldname
if fieldname != "*":
if isinstance(fieldname, str):
fields = [fieldname]
if (filters is not None) and (filters != doctype or doctype == "DocType"):
try:
if order_by:
order_by = "modified" if order_by == "KEEP_DEFAULT_ORDERING" else order_by
out = self._get_values_from_table(
fields=fields,
filters=filters,
doctype=doctype,
as_dict=as_dict,
debug=debug,
order_by=order_by,
update=update,
for_update=for_update,
run=run,
pluck=pluck,
distinct=distinct,
limit=limit,
)
except Exception as e:
if ignore and (frappe.db.is_missing_column(e) or frappe.db.is_table_missing(e)):
# table or column not found, return None
out = None
elif (not ignore) and frappe.db.is_table_missing(e):
# table not found, look in singles
out = self.get_values_from_single(
fields, filters, doctype, as_dict, debug, update, run=run, distinct=distinct
)
else:
raise
else:
out = self.get_values_from_single(
fields, filters, doctype, as_dict, debug, update, run=run, pluck=pluck, distinct=distinct
)
if cache and isinstance(filters, str):
self.value_cache[(doctype, filters, fieldname)] = out
return out
def get_values_from_single(
self,
fields,
filters,
doctype,
as_dict=False,
debug=False,
update=None,
*,
run=True,
pluck=False,
distinct=False,
):
"""Get values from `tabSingles` (Single DocTypes) (internal).
:param fields: List of fields,
:param filters: Filters (dict).
:param doctype: DocType name.
"""
# TODO
# if not frappe.model.meta.is_single(doctype):
# raise frappe.DoesNotExistError("DocType", doctype)
if fields == "*" or isinstance(filters, dict):
# check if single doc matches with filters
values = self.get_singles_dict(doctype)
if isinstance(filters, dict):
for key, value in filters.items():
if values.get(key) != value:
return []
if as_dict:
return values and [values] or []
if isinstance(fields, list):
return [map(values.get, fields)]
else:
r = frappe.qb.engine.get_query(
"Singles",
filters={"field": ("in", tuple(fields)), "doctype": doctype},
fields=["field", "value"],
distinct=distinct,
).run(pluck=pluck, debug=debug, as_dict=False)
if not run:
return r
if as_dict:
if r:
r = frappe._dict(r)
if update:
r.update(update)
return [r]
else:
return []
else:
return r and [[i[1] for i in r]] or []
def get_singles_dict(self, doctype, debug=False, *, for_update=False, cast=False):
"""Get Single DocType as dict.
:param doctype: DocType of the single object whose value is requested
:param debug: Execute query in debug mode - print to STDOUT
:param for_update: Take `FOR UPDATE` lock on the records
:param cast: Cast values to Python data types based on field type
Example:
# Get coulmn and value of the single doctype Accounts Settings
account_settings = frappe.db.get_singles_dict("Accounts Settings")
"""
queried_result = frappe.qb.engine.get_query(
"Singles",
filters={"doctype": doctype},
fields=["field", "value"],
for_update=for_update,
).run(debug=debug)
if not cast:
return frappe._dict(queried_result)
try:
meta = frappe.get_meta(doctype)
except DoesNotExistError:
return frappe._dict(queried_result)
return_value = frappe._dict()
for fieldname, value in queried_result:
if df := meta.get_field(fieldname):
casted_value = cast_fieldtype(df.fieldtype, value)
else:
casted_value = value
return_value[fieldname] = casted_value
return return_value
@staticmethod
def get_all(*args, **kwargs):
return frappe.get_all(*args, **kwargs)
@staticmethod
def get_list(*args, **kwargs):
return frappe.get_list(*args, **kwargs)
def set_single_value(
self,
doctype: str,
fieldname: str | dict,
value: str | int | None = None,
*args,
**kwargs,
):
"""Set field value of Single DocType.
:param doctype: DocType of the single object
:param fieldname: `fieldname` of the property
:param value: `value` of the property
Example:
# Update the `deny_multiple_sessions` field in System Settings DocType.
company = frappe.db.set_single_value("System Settings", "deny_multiple_sessions", True)
"""
return self.set_value(doctype, doctype, fieldname, value, *args, **kwargs)
def get_single_value(self, doctype, fieldname, cache=True):
"""Get property of Single DocType. Cache locally by default
:param doctype: DocType of the single object whose value is requested
:param fieldname: `fieldname` of the property whose value is requested
Example:
# Get the default value of the company from the Global Defaults doctype.
company = frappe.db.get_single_value('Global Defaults', 'default_company')
"""
if doctype not in self.value_cache:
self.value_cache[doctype] = {}
if cache and fieldname in self.value_cache[doctype]:
return self.value_cache[doctype][fieldname]
val = frappe.qb.engine.get_query(
table="Singles",
filters={"doctype": doctype, "field": fieldname},
fields="value",
).run()
val = val[0][0] if val else None
df = frappe.get_meta(doctype).get_field(fieldname)
if not df:
frappe.throw(
_("Invalid field name: {0}").format(frappe.bold(fieldname)), self.InvalidColumnName
)
val = cast_fieldtype(df.fieldtype, val)
self.value_cache[doctype][fieldname] = val
return val
def get_singles_value(self, *args, **kwargs):
"""Alias for get_single_value"""
return self.get_single_value(*args, **kwargs)
def _get_values_from_table(
self,
fields,
filters,
doctype,
as_dict,
*,
debug=False,
order_by=None,
update=None,
for_update=False,
run=True,
pluck=False,
distinct=False,
limit=None,
):
field_objects = []
query = frappe.qb.engine.get_query(
table=doctype,
filters=filters,
orderby=order_by,
for_update=for_update,
field_objects=field_objects,
fields=fields,
distinct=distinct,
limit=limit,
)
if fields == "*" and not isinstance(fields, (list, tuple)) and not isinstance(fields, Criterion):
as_dict = True
r = self.sql(query, as_dict=as_dict, debug=debug, update=update, run=run, pluck=pluck)
return r
def _get_value_for_many_names(
self,
doctype,
names,
field,
order_by,
*,
debug=False,
run=True,
pluck=False,
distinct=False,
limit=None,
as_dict=False,
):
names = list(filter(None, names))
if names:
return self.get_all(
doctype,
fields=field,
filters=names,
order_by=order_by,
pluck=pluck,
debug=debug,
as_list=not as_dict,
run=run,
distinct=distinct,
limit_page_length=limit,
)
else:
return {}
def update(self, *args, **kwargs):
"""Update multiple values. Alias for `set_value`."""
return self.set_value(*args, **kwargs)
def set_value(
self,
dt,
dn,
field,
val=None,
modified=None,
modified_by=None,
update_modified=True,
debug=False,
for_update=True,
):
"""Set a single value in the database, do not call the ORM triggers
but update the modified timestamp (unless specified not to).
**Warning:** this function will not call Document events and should be avoided in normal cases.
:param dt: DocType name.
:param dn: Document name.
:param field: Property / field name or dictionary of values to be updated
:param value: Value to be updated.
:param modified: Use this as the `modified` timestamp.
:param modified_by: Set this user as `modified_by`.
:param update_modified: default True. Set as false, if you don't want to update the timestamp.
:param debug: Print the query in the developer / js console.
:param for_update: Will add a row-level lock to the value that is being set so that it can be released on commit.
"""
is_single_doctype = not (dn and dt != dn)
to_update = field if isinstance(field, dict) else {field: val}
if update_modified:
modified = modified or now()
modified_by = modified_by or frappe.session.user
to_update.update({"modified": modified, "modified_by": modified_by})
if is_single_doctype:
frappe.db.delete(
"Singles", filters={"field": ("in", tuple(to_update)), "doctype": dt}, debug=debug
)
singles_data = ((dt, key, sbool(value)) for key, value in to_update.items())
query = (
frappe.qb.into("Singles").columns("doctype", "field", "value").insert(*singles_data)
).run(debug=debug)
frappe.clear_document_cache(dt, dt)
else:
table = DocType(dt)
if for_update:
docnames = tuple(
self.get_values(dt, dn, "name", debug=debug, for_update=for_update, pluck=True)
) or (NullValue(),)
query = frappe.qb.update(table).where(table.name.isin(docnames))
for docname in docnames:
frappe.clear_document_cache(dt, docname)
else:
query = frappe.qb.engine.build_conditions(table=dt, filters=dn, update=True)
# TODO: Fix this; doesn't work rn - gavin@frappe.io
# frappe.cache().hdel_keys(dt, "document_cache")
# Workaround: clear all document caches
frappe.cache().delete_value("document_cache")
for column, value in to_update.items():
query = query.set(column, value)
query.run(debug=debug)
if dt in self.value_cache:
del self.value_cache[dt]
@staticmethod
def set(doc, field, val):
"""Set value in document. **Avoid**"""
doc.db_set(field, val)
def touch(self, doctype, docname):
"""Update the modified timestamp of this document."""
modified = now()
self.sql(
"""update `tab{doctype}` set `modified`=%s
where name=%s""".format(
doctype=doctype
),
(modified, docname),
)
return modified
@staticmethod
def set_temp(value):
"""Set a temperory value and return a key."""
key = frappe.generate_hash()
frappe.cache().hset("temp", key, value)
return key
@staticmethod
def get_temp(key):
"""Return the temperory value and delete it."""
return frappe.cache().hget("temp", key)
def set_global(self, key, val, user="__global"):
"""Save a global key value. Global values will be automatically set if they match fieldname."""
self.set_default(key, val, user)
def get_global(self, key, user="__global"):
"""Returns a global key value."""
return self.get_default(key, user)
def get_default(self, key, parent="__default"):
"""Returns default value as a list if multiple or single"""
d = self.get_defaults(key, parent)
return isinstance(d, list) and d[0] or d
@staticmethod
def set_default(key, val, parent="__default", parenttype=None):
"""Sets a global / user default value."""
frappe.defaults.set_default(key, val, parent, parenttype)
@staticmethod
def add_default(key, val, parent="__default", parenttype=None):
"""Append a default value for a key, there can be multiple default values for a particular key."""
frappe.defaults.add_default(key, val, parent, parenttype)
@staticmethod
def get_defaults(key=None, parent="__default"):
"""Get all defaults"""
defaults = frappe.defaults.get_defaults_for(parent)
if not key:
return defaults
if key in defaults:
return defaults[key]
return defaults.get(frappe.scrub(key))
def begin(self):
self.sql("START TRANSACTION")
def commit(self):
"""Commit current transaction. Calls SQL `COMMIT`."""
for method in frappe.local.before_commit:
frappe.call(method[0], *(method[1] or []), **(method[2] or {}))
self.sql("commit")
if frappe.conf.db_type == "postgres":
# Postgres requires explicitly starting new transaction
self.begin()
frappe.local.rollback_observers = []
self.flush_realtime_log()
enqueue_jobs_after_commit()
flush_local_link_count()
def add_before_commit(self, method, args=None, kwargs=None):
frappe.local.before_commit.append([method, args, kwargs])
@staticmethod
def flush_realtime_log():
for args in frappe.local.realtime_log:
frappe.realtime.emit_via_redis(*args)
frappe.local.realtime_log = []
def savepoint(self, save_point):
"""Savepoints work as a nested transaction.
Changes can be undone to a save point by doing frappe.db.rollback(save_point)
Note: rollback watchers can not work with save points.
so only changes to database are undone when rolling back to a savepoint.
Avoid using savepoints when writing to filesystem."""
self.sql(f"savepoint {save_point}")
def release_savepoint(self, save_point):
self.sql(f"release savepoint {save_point}")
def rollback(self, *, save_point=None):
"""`ROLLBACK` current transaction. Optionally rollback to a known save_point."""
if save_point:
self.sql(f"rollback to savepoint {save_point}")
else:
self.sql("rollback")
self.begin()
for obj in dict.fromkeys(frappe.local.rollback_observers):
if hasattr(obj, "on_rollback"):
obj.on_rollback()
frappe.local.rollback_observers = []
def field_exists(self, dt, fn):
"""Return true of field exists."""
return self.exists("DocField", {"fieldname": fn, "parent": dt})
def table_exists(self, doctype, cached=True):
"""Returns True if table for given doctype exists."""
return ("tab" + doctype) in self.get_tables(cached=cached)
def has_table(self, doctype):
return self.table_exists(doctype)
def get_tables(self, cached=True):
tables = frappe.cache().get_value("db_tables")
if not tables or not cached:
table_rows = self.sql(
"""
SELECT table_name
FROM information_schema.tables
WHERE table_schema NOT IN ('pg_catalog', 'information_schema')
"""
)
tables = {d[0] for d in table_rows}
frappe.cache().set_value("db_tables", tables)
return tables
def a_row_exists(self, doctype):
"""Returns True if atleast one row exists."""
return self.sql(f"select name from `tab{doctype}` limit 1")
def exists(self, dt, dn=None, cache=False):
"""Return the document name of a matching document, or None.
Note: `cache` only works if `dt` and `dn` are of type `str`.
## Examples
Pass doctype and docname (only in this case we can cache the result)
```
exists("User", "jane@example.org", cache=True)
```
Pass a dict of filters including the `"doctype"` key:
```
exists({"doctype": "User", "full_name": "Jane Doe"})
```
Pass the doctype and a dict of filters:
```
exists("User", {"full_name": "Jane Doe"})
```
"""
if dt != "DocType" and dt == dn:
# single always exists (!)
return dn
if isinstance(dt, dict):
dt = dt.copy() # don't modify the original dict
dt, dn = dt.pop("doctype"), dt
return self.get_value(dt, dn, ignore=True, cache=cache)
def count(self, dt, filters=None, debug=False, cache=False, distinct: bool = True):
"""Returns `COUNT(*)` for given DocType and filters."""
if cache and not filters:
cache_count = frappe.cache().get_value(f"doctype:count:{dt}")
if cache_count is not None:
return cache_count
query = frappe.qb.engine.get_query(
table=dt, filters=filters, fields=Count("*"), distinct=distinct
)
count = self.sql(query, debug=debug)[0][0]
if not filters and cache:
frappe.cache().set_value(f"doctype:count:{dt}", count, expires_in_sec=86400)
return count
@staticmethod
def format_date(date):
return getdate(date).strftime("%Y-%m-%d")
@staticmethod
def format_datetime(datetime):
if not datetime:
return "0001-01-01 00:00:00.000000"
if isinstance(datetime, str):
if ":" not in datetime:
datetime = datetime + " 00:00:00.000000"
else:
datetime = datetime.strftime("%Y-%m-%d %H:%M:%S.%f")
return datetime
def get_creation_count(self, doctype, minutes):
"""Get count of records created in the last x minutes"""
from dateutil.relativedelta import relativedelta
from frappe.utils import now_datetime
return self.sql(
"""select count(name) from `tab{doctype}`
where creation >= %s""".format(
doctype=doctype
),
now_datetime() - relativedelta(minutes=minutes),
)[0][0]
def get_db_table_columns(self, table) -> list[str]:
"""Returns list of column names from given table."""
columns = frappe.cache().hget("table_columns", table)
if columns is None:
columns = [
r[0]
for r in self.sql(
"""
select column_name
from information_schema.columns
where table_name = %s """,
table,
)
]
if columns:
frappe.cache().hset("table_columns", table, columns)
return columns
def get_table_columns(self, doctype):
"""Returns list of column names from given doctype."""
columns = self.get_db_table_columns("tab" + doctype)
if not columns:
raise self.TableMissingError("DocType", doctype)
return columns
def has_column(self, doctype, column):
"""Returns True if column exists in database."""
return column in self.get_table_columns(doctype)
def get_column_type(self, doctype, column):
return self.sql(
"""SELECT column_type FROM INFORMATION_SCHEMA.COLUMNS
WHERE table_name = 'tab{}' AND column_name = '{}' """.format(
doctype, column
)
)[0][0]
def has_index(self, table_name, index_name):
raise NotImplementedError
def add_index(self, doctype, fields, index_name=None):
raise NotImplementedError
def add_unique(self, doctype, fields, constraint_name=None):
raise NotImplementedError
@staticmethod
def get_index_name(fields):
index_name = "_".join(fields) + "_index"
# remove index length if present e.g. (10) from index name
return INDEX_PATTERN.sub(r"", index_name)
def get_system_setting(self, key):
return frappe.get_system_settings(key)
def close(self):
"""Close database connection."""
if self._conn:
# self._cursor.close()
self._conn.close()
self._cursor = None
self._conn = None
@staticmethod
def escape(s, percent=True):
"""Excape quotes and percent in given string."""
# implemented in specific class
raise NotImplementedError
@staticmethod
def is_column_missing(e):
return frappe.db.is_missing_column(e)
def get_descendants(self, doctype, name):
"""Return descendants of the group node in tree"""
from frappe.utils.nestedset import get_descendants_of
try:
return get_descendants_of(doctype, name, ignore_permissions=True)
except Exception:
# Can only happen if document doesn't exists - kept for backward compatibility
return []
def is_missing_table_or_column(self, e):
return self.is_missing_column(e) or self.is_table_missing(e)
def multisql(self, sql_dict, values=(), **kwargs):
current_dialect = frappe.db.db_type or "mariadb"
query = sql_dict.get(current_dialect)
return self.sql(query, values, **kwargs)
def delete(self, doctype: str, filters: dict | list = None, debug=False, **kwargs):
"""Delete rows from a table in site which match the passed filters. This
does trigger DocType hooks. Simply runs a DELETE query in the database.
Doctype name can be passed directly, it will be pre-pended with `tab`.
"""
filters = filters or kwargs.get("conditions")
query = frappe.qb.engine.build_conditions(table=doctype, filters=filters).delete()
if "debug" not in kwargs:
kwargs["debug"] = debug
return query.run(**kwargs)
def truncate(self, doctype: str):
"""Truncate a table in the database. This runs a DDL command `TRUNCATE TABLE`.
This cannot be rolled back.
Doctype name can be passed directly, it will be pre-pended with `tab`.
"""
return self.sql_ddl(f"truncate `{get_table_name(doctype)}`")
def clear_table(self, doctype):
return self.truncate(doctype)
def get_last_created(self, doctype):
last_record = self.get_all(doctype, ("creation"), limit=1, order_by="creation desc")
if last_record:
return get_datetime(last_record[0].creation)
else:
return None
def log_touched_tables(self, query, values=None):
if values:
query = frappe.safe_decode(self._cursor.mogrify(query, values))
if is_query_type(query, ("insert", "delete", "update", "alter", "drop", "rename")):
# single_word_regex is designed to match following patterns
# `tabXxx`, tabXxx and "tabXxx"
# multi_word_regex is designed to match following patterns
# `tabXxx Xxx` and "tabXxx Xxx"
# ([`"]?) Captures " or ` at the begining of the table name (if provided)
# \1 matches the first captured group (quote character) at the end of the table name
# multi word table name must have surrounding quotes.
# (tab([A-Z]\w+)( [A-Z]\w+)*) Captures table names that start with "tab"
# and are continued with multiple words that start with a captital letter
# e.g. 'tabXxx' or 'tabXxx Xxx' or 'tabXxx Xxx Xxx' and so on
tables = []
for regex in (SINGLE_WORD_PATTERN, MULTI_WORD_PATTERN):
tables += [groups[1] for groups in regex.findall(query)]
if frappe.flags.touched_tables is None:
frappe.flags.touched_tables = set()
frappe.flags.touched_tables.update(tables)
def bulk_insert(self, doctype, fields, values, ignore_duplicates=False, *, chunk_size=10_000):
"""
Insert multiple records at a time
:param doctype: Doctype name
:param fields: list of fields
:params values: list of list of values
"""
values = list(values)
table = frappe.qb.DocType(doctype)
for start_index in range(0, len(values), chunk_size):
query = frappe.qb.into(table)
if ignore_duplicates:
# Pypika does not have same api for ignoring duplicates
if frappe.conf.db_type == "mariadb":
query = query.ignore()
elif frappe.conf.db_type == "postgres":
query = query.on_conflict().do_nothing()
values_to_insert = values[start_index : start_index + chunk_size]
query.columns(fields).insert(*values_to_insert).run()
def create_sequence(self, *args, **kwargs):
from frappe.database.sequence import create_sequence
return create_sequence(*args, **kwargs)
def set_next_sequence_val(self, *args, **kwargs):
from frappe.database.sequence import set_next_val
set_next_val(*args, **kwargs)
def get_next_sequence_val(self, *args, **kwargs):
from frappe.database.sequence import get_next_val
return get_next_val(*args, **kwargs)
def enqueue_jobs_after_commit():
from frappe.utils.background_jobs import execute_job, get_queue
if frappe.flags.enqueue_after_commit and len(frappe.flags.enqueue_after_commit) > 0:
for job in frappe.flags.enqueue_after_commit:
q = get_queue(job.get("queue"), is_async=job.get("is_async"))
q.enqueue_call(execute_job, timeout=job.get("timeout"), kwargs=job.get("queue_args"))
frappe.flags.enqueue_after_commit = []
@contextmanager
def savepoint(catch: type | tuple[type, ...] = Exception):
"""Wrapper for wrapping blocks of DB operations in a savepoint.
as contextmanager:
for doc in docs:
with savepoint(catch=DuplicateError):
doc.insert()
as decorator (wraps FULL function call):
@savepoint(catch=DuplicateError)
def process_doc(doc):
doc.insert()
"""
try:
savepoint = "".join(random.sample(string.ascii_lowercase, 10))
frappe.db.savepoint(savepoint)
yield # control back to calling function
except catch:
frappe.db.rollback(save_point=savepoint)
else:
frappe.db.release_savepoint(savepoint)