From f8ccbfd3d76d0731f122e76cdbda7eafaef88e14 Mon Sep 17 00:00:00 2001 From: Gavin D'souza Date: Thu, 30 Jan 2025 15:43:59 +0530 Subject: [PATCH] feat(sqlite): override some methods Signed-off-by: Akhil Narang --- frappe/database/sqlite/database.py | 43 ++++++++++++++++++++++++++++-- 1 file changed, 41 insertions(+), 2 deletions(-) diff --git a/frappe/database/sqlite/database.py b/frappe/database/sqlite/database.py index 6e464d0090..3df7f31b23 100644 --- a/frappe/database/sqlite/database.py +++ b/frappe/database/sqlite/database.py @@ -98,8 +98,6 @@ class SQLiteDatabase(SQLiteExceptionUtil, Database): return self.create_connection() def create_connection(self): - settings = self.get_connection_settings() - print(settings) return sqlite3.connect(self.get_connection_settings()) def set_execution_timeout(self, seconds: int): @@ -301,3 +299,44 @@ class SQLiteDatabase(SQLiteExceptionUtil, Database): def get_row_size(self, doctype: str) -> int: """Get estimated max row size of any table in bytes.""" raise NotImplementedError("SQLite does not support getting row size directly.") + + def execute_query(self, query, values=None): + query = query.replace("%s", "?") + try: + if isinstance(values, dict): + query = query % {x: f"'{y}'" for x, y in values.items()} + except TypeError: + pass + return self._cursor.execute(query, values) + + def sql(self, *args, **kwargs): + if args: + # since tuple is immutable + args = list(args) + args[0] = modify_query(args[0]) + args = tuple(args) + elif kwargs.get("query"): + kwargs["query"] = modify_query(kwargs.get("query")) + + +def modify_query(query): + """ + Modifies query according to the requirements of SQLite + """ + # Replace ` with " for definitions + query = str(query) + query = query.replace("`", '"') + query = replace_locate_with_strpos(query) + + # Select from requires "" + if re.search("from tab", query, flags=re.IGNORECASE): + query = re.sub("from tab([a-zA-Z]*)", r'from "tab\1"', query, flags=re.IGNORECASE) + + return query + + +def replace_locate_with_strpos(query: str) -> str: + # strpos is the locate equivalent in SQLite + if re.search(r"locate\(", query, flags=re.IGNORECASE): + query = re.sub(r"locate\(([^,]+),([^)]+)\)", r"strpos(\2, \1)", query, flags=re.IGNORECASE) + return query