303 lines
8.7 KiB
Python
303 lines
8.7 KiB
Python
import re
|
|
import sqlite3
|
|
from contextlib import contextmanager
|
|
|
|
import frappe
|
|
from frappe.database.database import Database
|
|
from frappe.database.sqlite.schema import SQLiteTable
|
|
from frappe.utils import UnicodeWithAttrs, cstr, get_datetime, get_table_name
|
|
|
|
_PARAM_COMP = re.compile(r"%\([\w]*\)s")
|
|
|
|
|
|
class SQLiteExceptionUtil:
|
|
ProgrammingError = sqlite3.ProgrammingError
|
|
TableMissingError = sqlite3.OperationalError
|
|
OperationalError = sqlite3.OperationalError
|
|
InternalError = sqlite3.InternalError
|
|
SQLError = sqlite3.OperationalError
|
|
DataError = sqlite3.DataError
|
|
|
|
@staticmethod
|
|
def is_deadlocked(e: sqlite3.Error) -> bool:
|
|
return "database is locked" in str(e)
|
|
|
|
@staticmethod
|
|
def is_timedout(e: sqlite3.Error) -> bool:
|
|
return "database is locked" in str(e)
|
|
|
|
@staticmethod
|
|
def is_read_only_mode_error(e: sqlite3.Error) -> bool:
|
|
return "attempt to write a readonly database" in str(e)
|
|
|
|
@staticmethod
|
|
def is_table_missing(e: sqlite3.Error) -> bool:
|
|
return "no such table" in str(e)
|
|
|
|
@staticmethod
|
|
def is_missing_column(e: sqlite3.Error) -> bool:
|
|
return "no such column" in str(e)
|
|
|
|
@staticmethod
|
|
def is_duplicate_fieldname(e: sqlite3.Error) -> bool:
|
|
return "duplicate column name" in str(e)
|
|
|
|
@staticmethod
|
|
def is_duplicate_entry(e: sqlite3.Error) -> bool:
|
|
return "UNIQUE constraint failed" in str(e)
|
|
|
|
@staticmethod
|
|
def is_access_denied(e: sqlite3.Error) -> bool:
|
|
return "access denied" in str(e)
|
|
|
|
@staticmethod
|
|
def cant_drop_field_or_key(e: sqlite3.Error) -> bool:
|
|
return "cannot drop" in str(e)
|
|
|
|
@staticmethod
|
|
def is_syntax_error(e: sqlite3.Error) -> bool:
|
|
return "syntax error" in str(e)
|
|
|
|
@staticmethod
|
|
def is_statement_timeout(e: sqlite3.Error) -> bool:
|
|
return "statement timeout" in str(e)
|
|
|
|
@staticmethod
|
|
def is_data_too_long(e: sqlite3.Error) -> bool:
|
|
return "string or blob too big" in str(e)
|
|
|
|
@staticmethod
|
|
def is_db_table_size_limit(e: sqlite3.Error) -> bool:
|
|
return "too many columns" in str(e)
|
|
|
|
@staticmethod
|
|
def is_primary_key_violation(e: sqlite3.Error) -> bool:
|
|
return "UNIQUE constraint failed" in str(e)
|
|
|
|
@staticmethod
|
|
def is_unique_key_violation(e: sqlite3.Error) -> bool:
|
|
return "UNIQUE constraint failed" in str(e)
|
|
|
|
@staticmethod
|
|
def is_interface_error(e: sqlite3.Error):
|
|
return isinstance(e, sqlite3.InterfaceError)
|
|
|
|
|
|
class SQLiteDatabase(SQLiteExceptionUtil, Database):
|
|
REGEX_CHARACTER = "regexp"
|
|
default_port = None
|
|
MAX_ROW_SIZE_LIMIT = None
|
|
|
|
def get_connection(self):
|
|
conn = self._get_connection()
|
|
conn.isolation_level = None
|
|
return conn
|
|
|
|
def _get_connection(self):
|
|
"""Return SQLite connection object."""
|
|
return self.create_connection()
|
|
|
|
def create_connection(self):
|
|
settings = self.get_connection_settings()
|
|
print(settings)
|
|
return sqlite3.connect(self.get_connection_settings())
|
|
|
|
def set_execution_timeout(self, seconds: int):
|
|
self.sql(f"PRAGMA busy_timeout = {int(seconds) * 1000}")
|
|
|
|
def get_connection_settings(self) -> str:
|
|
return self.cur_db_name
|
|
|
|
def setup_type_map(self):
|
|
self.db_type = "sqlite"
|
|
self.type_map = {
|
|
"Currency": ("REAL", None),
|
|
"Int": ("INTEGER", None),
|
|
"Long Int": ("INTEGER", None),
|
|
"Float": ("REAL", None),
|
|
"Percent": ("REAL", None),
|
|
"Check": ("INTEGER", None),
|
|
"Small Text": ("TEXT", None),
|
|
"Long Text": ("TEXT", None),
|
|
"Code": ("TEXT", None),
|
|
"Text Editor": ("TEXT", None),
|
|
"Markdown Editor": ("TEXT", None),
|
|
"HTML Editor": ("TEXT", None),
|
|
"Date": ("TEXT", None),
|
|
"Datetime": ("TEXT", None),
|
|
"Time": ("TEXT", None),
|
|
"Text": ("TEXT", None),
|
|
"Data": ("TEXT", None),
|
|
"Link": ("TEXT", None),
|
|
"Dynamic Link": ("TEXT", None),
|
|
"Password": ("TEXT", None),
|
|
"Select": ("TEXT", None),
|
|
"Rating": ("REAL", None),
|
|
"Read Only": ("TEXT", None),
|
|
"Attach": ("TEXT", None),
|
|
"Attach Image": ("TEXT", None),
|
|
"Signature": ("TEXT", None),
|
|
"Color": ("TEXT", None),
|
|
"Barcode": ("TEXT", None),
|
|
"Geolocation": ("TEXT", None),
|
|
"Duration": ("REAL", None),
|
|
"Icon": ("TEXT", None),
|
|
"Phone": ("TEXT", None),
|
|
"Autocomplete": ("TEXT", None),
|
|
"JSON": ("TEXT", None),
|
|
}
|
|
|
|
def get_database_size(self):
|
|
"""Return database size in MB."""
|
|
import os
|
|
|
|
return os.path.getsize(self.db_name) / (1024 * 1024)
|
|
|
|
def log_query(self, query, values, debug, explain):
|
|
self.last_query = query
|
|
self._log_query(self.last_query, debug, explain, query)
|
|
return self.last_query
|
|
|
|
def _clean_up(self):
|
|
pass
|
|
|
|
@staticmethod
|
|
def escape(s, percent=True):
|
|
"""Escape quotes and percent in given string."""
|
|
s = s.replace("'", "''")
|
|
if percent:
|
|
s = s.replace("%", "%%")
|
|
return "'" + s + "'"
|
|
|
|
@staticmethod
|
|
def is_type_number(code):
|
|
return code in (sqlite3.NUMERIC, sqlite3.INTEGER, sqlite3.REAL)
|
|
|
|
@staticmethod
|
|
def is_type_datetime(code):
|
|
return code == sqlite3.TEXT
|
|
|
|
def rename_table(self, old_name: str, new_name: str) -> list | tuple:
|
|
old_name = get_table_name(old_name)
|
|
new_name = get_table_name(new_name)
|
|
return self.sql(f"ALTER TABLE `{old_name}` RENAME TO `{new_name}`")
|
|
|
|
def describe(self, doctype: str) -> list | tuple:
|
|
table_name = get_table_name(doctype)
|
|
return self.sql(f"PRAGMA table_info(`{table_name}`)")
|
|
|
|
def change_column_type(
|
|
self, doctype: str, column: str, type: str, nullable: bool = False
|
|
) -> list | tuple:
|
|
raise NotImplementedError("SQLite does not support altering column types directly.")
|
|
|
|
def rename_column(self, doctype: str, old_column_name, new_column_name):
|
|
raise NotImplementedError("SQLite does not support renaming columns directly.")
|
|
|
|
def create_auth_table(self):
|
|
self.sql_ddl(
|
|
"""CREATE TABLE IF NOT EXISTS `__Auth` (
|
|
`doctype` TEXT NOT NULL,
|
|
`name` TEXT NOT NULL,
|
|
`fieldname` TEXT NOT NULL,
|
|
`password` TEXT NOT NULL,
|
|
`encrypted` INTEGER NOT NULL DEFAULT 0,
|
|
PRIMARY KEY (`doctype`, `name`, `fieldname`)
|
|
)"""
|
|
)
|
|
|
|
def create_global_search_table(self):
|
|
if "__global_search" not in self.get_tables():
|
|
self.sql(
|
|
"""CREATE VIRTUAL TABLE __global_search USING FTS5(
|
|
doctype,
|
|
name,
|
|
title,
|
|
content,
|
|
route,
|
|
published
|
|
)"""
|
|
)
|
|
|
|
def create_user_settings_table(self):
|
|
self.sql_ddl(
|
|
"""CREATE TABLE IF NOT EXISTS __UserSettings (
|
|
`user` TEXT NOT NULL,
|
|
`doctype` TEXT NOT NULL,
|
|
`data` TEXT,
|
|
UNIQUE(user, doctype)
|
|
)"""
|
|
)
|
|
|
|
@staticmethod
|
|
def get_on_duplicate_update():
|
|
return "ON CONFLICT DO UPDATE SET "
|
|
|
|
def get_table_columns_description(self, table_name):
|
|
"""Return list of columns with descriptions."""
|
|
return self.sql(f"PRAGMA table_info(`{table_name}`)", as_dict=1)
|
|
|
|
def get_column_type(self, doctype, column):
|
|
"""Return column type from database."""
|
|
table_name = get_table_name(doctype)
|
|
result = self.sql(f"PRAGMA table_info(`{table_name}`)", as_dict=1)
|
|
for row in result:
|
|
if row["name"] == column:
|
|
return row["type"]
|
|
return None
|
|
|
|
def has_index(self, table_name, index_name):
|
|
return self.sql(f"PRAGMA index_list(`{table_name}`)")
|
|
|
|
def get_column_index(self, table_name: str, fieldname: str, unique: bool = False) -> frappe._dict | None:
|
|
"""Check if column exists for a specific fields in specified order."""
|
|
indexes = self.sql(f"PRAGMA index_list(`{table_name}`)", as_dict=True)
|
|
for index in indexes:
|
|
index_info = self.sql(f"PRAGMA index_info(`{index['name']}`)", as_dict=True)
|
|
if index_info and index_info[0]["name"] == fieldname:
|
|
return index
|
|
return None
|
|
|
|
def add_index(self, doctype: str, fields: list, index_name: str | None = None):
|
|
"""Creates an index with given fields if not already created."""
|
|
index_name = index_name or self.get_index_name(fields)
|
|
table_name = get_table_name(doctype)
|
|
if not self.has_index(table_name, index_name):
|
|
self.commit()
|
|
self.sql(f"CREATE INDEX `{index_name}` ON `{table_name}` ({', '.join(fields)})")
|
|
|
|
def add_unique(self, doctype, fields, constraint_name=None):
|
|
raise NotImplementedError("SQLite does not support adding unique constraints directly.")
|
|
|
|
def updatedb(self, doctype, meta=None):
|
|
"""Syncs a `DocType` to the table."""
|
|
res = self.sql("SELECT issingle FROM `tabDocType` WHERE name=%s", (doctype,))
|
|
if not res:
|
|
raise Exception(f"Wrong doctype {doctype} in updatedb")
|
|
|
|
if not res[0][0]:
|
|
db_table = SQLiteTable(doctype, meta)
|
|
db_table.validate()
|
|
db_table.sync()
|
|
self.commit()
|
|
|
|
def get_database_list(self):
|
|
return [self.db_name]
|
|
|
|
def get_tables(self, cached=True):
|
|
"""Return list of tables."""
|
|
to_query = not cached
|
|
|
|
if cached:
|
|
tables = frappe.cache.get_value("db_tables")
|
|
to_query = not tables
|
|
|
|
if to_query:
|
|
tables = self.sql("SELECT name FROM sqlite_master WHERE type='table';", pluck=True)
|
|
frappe.cache.set_value("db_tables", tables)
|
|
|
|
return tables
|
|
|
|
def get_row_size(self, doctype: str) -> int:
|
|
"""Get estimated max row size of any table in bytes."""
|
|
raise NotImplementedError("SQLite does not support getting row size directly.")
|