seitime-frappe/frappe/query_builder/utils.py
Gavin D'souza 36fa2122a3 fix: Patch qb for different schemas in same process
You would want to switch schemas in the same process. Eversince the
change
64e5273764
we stopped patching on every frappe.init call which meant, if a MariaDB
site was initialized first, frappe._qb_patched would be set to True and
if a Postgres site was initialized after, _qb_patched would be lying as
the PG engine isn't patched yet. Sooooo we need a Dict instead to
maintain this record of patching. This issue caused weird errors lol -

Traceback:

  File "/home/frappe/Desktop/frappe-bench-dev/env/lib/python3.10/site-packages/click/decorators.py", line 21, in new_func
    return f(get_current_context(), *args, **kwargs)
  File "/home/frappe/Desktop/frappe-bench-dev/apps/frappe/frappe/commands/__init__.py", line 29, in _func
    ret = f(frappe._dict(ctx.obj), *args, **kwargs)
  File "/home/frappe/Desktop/frappe-bench-dev/apps/frappe/frappe/commands/site.py", line 524, in migrate
    SiteMigration(
  File "/home/frappe/Desktop/frappe-bench-dev/apps/frappe/frappe/migrate.py", line 169, in run
    self.setUp()
  File "/home/frappe/Desktop/frappe-bench-dev/apps/frappe/frappe/migrate.py", line 73, in setUp
    clear_global_cache()
  File "/home/frappe/Desktop/frappe-bench-dev/apps/frappe/frappe/cache_manager.py", line 102, in clear_global_cache
    clear_website_cache()
  File "/home/frappe/Desktop/frappe-bench-dev/apps/frappe/frappe/website/utils.py", line 374, in clear_website_cache
    clear_cache(path)
  File "/home/frappe/Desktop/frappe-bench-dev/apps/frappe/frappe/website/utils.py", line 369, in clear_cache
    for method in frappe.get_hooks("website_clear_cache"):
  File "/home/frappe/Desktop/frappe-bench-dev/apps/frappe/frappe/__init__.py", line 1440, in get_hooks
    hooks = _dict(_load_app_hooks())
  File "/home/frappe/Desktop/frappe-bench-dev/apps/frappe/frappe/utils/caching.py", line 57, in wrapper
    return_val = func(*args, **kwargs)
  File "/home/frappe/Desktop/frappe-bench-dev/apps/frappe/frappe/__init__.py", line 1407, in _load_app_hooks
    apps = [app_name] if app_name else get_installed_apps(sort=True)
  File "/home/frappe/Desktop/frappe-bench-dev/apps/frappe/frappe/utils/caching.py", line 57, in wrapper
    return_val = func(*args, **kwargs)
  File "/home/frappe/Desktop/frappe-bench-dev/apps/frappe/frappe/__init__.py", line 1374, in get_installed_apps
    installed = json.loads(db.get_global("installed_apps") or "[]")
  File "/home/frappe/Desktop/frappe-bench-dev/apps/frappe/frappe/database/database.py", line 917, in get_global
    return self.get_default(key, user)
  File "/home/frappe/Desktop/frappe-bench-dev/apps/frappe/frappe/database/database.py", line 921, in get_default
    d = self.get_defaults(key, parent)
  File "/home/frappe/Desktop/frappe-bench-dev/apps/frappe/frappe/database/database.py", line 938, in get_defaults
    defaults = frappe.defaults.get_defaults(parent)
  File "/home/frappe/Desktop/frappe-bench-dev/apps/frappe/frappe/defaults.py", line 88, in get_defaults
    globald = get_defaults_for()
  File "/home/frappe/Desktop/frappe-bench-dev/apps/frappe/frappe/defaults.py", line 218, in get_defaults_for
    frappe.qb.from_(table)
TypeError: 'Field' object is not callable
2022-06-15 15:55:43 +05:30

118 lines
3.3 KiB
Python

from enum import Enum
from importlib import import_module
from typing import Any, Callable, Dict, Union, get_type_hints
from pypika import Query
from pypika.queries import Column
from pypika.terms import PseudoColumn
import frappe
from frappe.query_builder.terms import NamedParameterWrapper
from .builder import MariaDB, Postgres
class db_type_is(Enum):
MARIADB = "mariadb"
POSTGRES = "postgres"
class ImportMapper:
def __init__(self, func_map: Dict[db_type_is, Callable]) -> None:
self.func_map = func_map
def __call__(self, *args: Any, **kwds: Any) -> Callable:
db = db_type_is(frappe.conf.db_type or "mariadb")
return self.func_map[db](*args, **kwds)
class BuilderIdentificationFailed(Exception):
def __init__(self):
super().__init__("Couldn't guess builder")
def get_query_builder(type_of_db: str) -> Union[Postgres, MariaDB]:
"""[return the query builder object]
Args:
type_of_db (str): [string value of the db used]
Returns:
Query: [Query object]
"""
db = db_type_is(type_of_db)
picks = {db_type_is.MARIADB: MariaDB, db_type_is.POSTGRES: Postgres}
return picks[db]
def get_attr(method_string):
modulename = ".".join(method_string.split(".")[:-1])
methodname = method_string.split(".")[-1]
return getattr(import_module(modulename), methodname)
def DocType(*args, **kwargs):
return frappe.qb.DocType(*args, **kwargs)
def Table(*args, **kwargs):
return frappe.qb.Table(*args, **kwargs)
def patch_query_execute():
"""Patch the Query Builder with helper execute method
This excludes the use of `frappe.db.sql` method while
executing the query object
"""
def execute_query(query, *args, **kwargs):
query, params = prepare_query(query)
return frappe.db.sql(query, params, *args, **kwargs) # nosemgrep
def prepare_query(query):
import inspect
from frappe.utils.safe_exec import check_safe_sql_query
param_collector = NamedParameterWrapper()
query = query.get_sql(param_wrapper=param_collector)
if frappe.flags.in_safe_exec and not check_safe_sql_query(query, throw=False):
callstack = inspect.stack()
if len(callstack) >= 3 and ".py" in callstack[2].filename:
# ignore any query builder methods called from python files
# assumption is that those functions are whitelisted already.
# since query objects are patched everywhere any query.run()
# will have callstack like this:
# frame0: this function prepare_query()
# frame1: execute_query()
# frame2: frame that called `query.run()`
#
# if frame2 is server script it wont have a filename and hence
# it shouldn't be allowed.
# p.s. stack() returns `"<unknown>"` as filename if not a file.
pass
else:
raise frappe.PermissionError("Only SELECT SQL allowed in scripting")
return query, param_collector.get_parameters()
query_class = get_attr(str(frappe.qb).split("'")[1])
builder_class = get_type_hints(query_class._builder).get("return")
if not builder_class:
raise BuilderIdentificationFailed
builder_class.run = execute_query
builder_class.walk = prepare_query
frappe._qb_patched[frappe.conf.db_type] = True
def patch_query_aggregation():
"""Patch aggregation functions to frappe.qb"""
from frappe.query_builder.functions import _avg, _max, _min, _sum
frappe.qb.max = _max
frappe.qb.min = _min
frappe.qb.avg = _avg
frappe.qb.sum = _sum
frappe._qb_patched[frappe.conf.db_type] = True