seitime-frappe/frappe/database/postgres/schema.py
Shllokkk 16e6c40d70
fix(schema): drop unique constraint for deleted doctype fields (#36356)
* fix(schema): drop unique constraint and indexes for deleted doctype fields

* refactor(schema): rename a variable and remove commented code

* test: add test case for dropping unique constraint on field deletion from doctype

* fix(tests): prevent list mutation during iteration

* test(db): guard MariaDB-specific unique index test with db_type_is.MARIADB

* fix(schema): drop unique constraints and indexes for deleted fields on postgres

* fix(schema): make postgres unique cleanup idempotent for deleted fields

* fix(schema): make postgres unique cleanup idempotent on reload

* test: add test case for dropping unique constraint and index on field deletion for postgres

* fix(schema): make postgres unique cleanup idempotent
2026-02-20 20:15:32 +05:30

280 lines
8.2 KiB
Python

import frappe
from frappe import _
from frappe.database.schema import DbColumn, DBTable, get_definition
from frappe.utils import cint, flt
from frappe.utils.defaults import get_not_null_defaults
class PostgresTable(DBTable):
def create(self):
varchar_len = frappe.db.VARCHAR_LEN
name_column = f"name varchar({varchar_len}) primary key"
additional_definitions = ""
# columns
column_defs = self.get_column_definitions()
if column_defs:
additional_definitions += ",\n".join(column_defs)
# child table columns
if self.meta.get("istable", default=0):
if column_defs:
additional_definitions += ",\n"
additional_definitions += ",\n".join(
(
f"parent varchar({varchar_len})",
f"parentfield varchar({varchar_len})",
f"parenttype varchar({varchar_len})",
)
)
# creating sequence(s)
if not self.meta.issingle and self.meta.autoname == "autoincrement":
frappe.db.create_sequence(self.doctype, check_not_exists=True)
name_column = "name bigint primary key"
elif not self.meta.issingle and self.meta.autoname == "UUID":
name_column = "name uuid primary key"
# TODO: set docstatus length
# create table
frappe.db.sql(
f"""create table `{self.table_name}` (
{name_column},
creation timestamp(6),
modified timestamp(6),
modified_by varchar({varchar_len}),
owner varchar({varchar_len}),
docstatus smallint not null default '0',
idx bigint not null default '0',
{additional_definitions}
)""",
)
self.create_indexes()
frappe.db.commit()
def create_indexes(self):
create_index_query = ""
for col in self.columns.values():
if (
col.set_index
and col.fieldtype in frappe.db.type_map
and frappe.db.type_map.get(col.fieldtype)[0] not in ("text", "longtext")
):
create_index_query += (
f'CREATE INDEX IF NOT EXISTS "{col.fieldname}" ON `{self.table_name}`(`{col.fieldname}`);'
)
if create_index_query:
# nosemgrep
frappe.db.sql(create_index_query)
def alter(self):
for col in self.columns.values():
col.build_for_alter_table(self.current_columns.get(col.fieldname.lower()))
query = [f"ADD COLUMN `{col.fieldname}` {col.get_definition()}" for col in self.add_column]
new_column_names = {col.fieldname for col in self.add_column}
for col in self.change_type:
using_clause = ""
if col.fieldtype in ("Datetime"):
# The USING option of SET DATA TYPE can actually specify any expression
# involving the old values of the row
# read more https://www.postgresql.org/docs/9.1/sql-altertable.html
using_clause = f"USING {col.fieldname}::timestamp without time zone"
elif col.fieldtype == "Check":
using_clause = f"USING {col.fieldname}::smallint"
query.append(
"ALTER COLUMN `{}` TYPE {} {}".format(
col.fieldname,
get_definition(col.fieldtype, precision=col.precision, length=col.length),
using_clause,
)
)
if alter_pk := self.alter_primary_key():
query.append(alter_pk)
for col in self.set_default:
if col.fieldname == "name":
continue
if col.fieldtype in ("Check", "Int"):
col_default = cint(col.default)
elif col.fieldtype in ("Currency", "Float", "Percent"):
col_default = flt(col.default)
elif not col.default:
col_default = "NULL"
else:
col_default = f"{frappe.db.escape(col.default)}"
query.append(f"ALTER COLUMN `{col.fieldname}` SET DEFAULT {col_default}")
create_contraint_query = ""
for col in self.add_index:
# if index key not exists
create_contraint_query += (
f'CREATE INDEX IF NOT EXISTS "{col.fieldname}" ON `{self.table_name}`(`{col.fieldname}`);'
)
for col in self.add_unique:
# if index key not exists
if col.fieldname not in new_column_names:
create_contraint_query += 'CREATE UNIQUE INDEX IF NOT EXISTS "unique_{index_name}" ON `{table_name}`(`{field}`);'.format(
index_name=col.fieldname, table_name=self.table_name, field=col.fieldname
)
# logic to drop unique constraint for fields deleted from a doctype
meta_columns = set(self.columns.keys())
db_columns = set(self.current_columns.keys())
for col in db_columns:
if (
col not in meta_columns
and col not in frappe.db.DEFAULT_COLUMNS
and col not in frappe.db.OPTIONAL_COLUMNS
):
has_unique_index = frappe.db.sql(
"""
SELECT 1
FROM pg_indexes
WHERE tablename = %s
AND indexname IN (%s, %s)
LIMIT 1
""",
(
self.table_name,
f"{self.table_name}_{col}_key",
f"unique_{col}",
),
)
if not has_unique_index:
continue
current_col = self.current_columns.get(col)
deleted_col = DbColumn(
table=self,
fieldname=current_col.name,
fieldtype=current_col.type,
length=None,
default=None,
set_index=current_col.index,
options=None,
unique=False,
precision=None,
not_nullable=current_col.not_nullable,
)
self.drop_unique.append(deleted_col)
drop_contraint_query = ""
for col in self.drop_index:
# primary key
if col.fieldname != "name":
# if index key exists
drop_contraint_query += f'DROP INDEX IF EXISTS "{col.fieldname}" ;'
for col in self.drop_unique:
# primary key
if col.fieldname != "name":
# drop unique constraint first if exists which automatically drops the underlying index also
unique_constraint_exists = frappe.db.sql(
"""
SELECT 1
FROM pg_constraint
WHERE conname = %s
""",
(f"{self.table_name}_{col.fieldname}_key",),
)
if unique_constraint_exists:
drop_contraint_query += f'ALTER TABLE "{self.table_name}" DROP CONSTRAINT IF EXISTS "{self.table_name}_{col.fieldname}_key" ;'
# drop the unique index backed by no constraint directly
unique_index_exists = frappe.db.sql(
"""
SELECT 1
FROM pg_indexes
WHERE tablename = %s
AND indexname = %s
""",
(
self.table_name,
f"unique_{col.fieldname}",
),
)
if unique_index_exists:
drop_contraint_query += f'DROP INDEX IF EXISTS "unique_{col.fieldname}" ;'
change_nullability = []
for col in self.change_nullability:
default = col.default or get_not_null_defaults(col.fieldtype)
if isinstance(default, str):
default = frappe.db.escape(default)
change_nullability.append(
f'ALTER COLUMN "{col.fieldname}" {"SET" if col.not_nullable else "DROP"} NOT NULL'
)
change_nullability.append(f'ALTER COLUMN "{col.fieldname}" SET DEFAULT {default}')
if col.not_nullable:
try:
table = frappe.qb.DocType(self.doctype)
frappe.qb.update(table).set(
col.fieldname, col.default or get_not_null_defaults(col.fieldtype)
).where(table[col.fieldname].isnull()).run()
except Exception:
print(f"Failed to update data in {self.table_name} for {col.fieldname}")
raise
try:
if query:
final_alter_query = "ALTER TABLE `{}` {}".format(self.table_name, ", ".join(query))
# nosemgrep
frappe.db.sql(final_alter_query)
if change_nullability:
# nosemgrep
frappe.db.sql(f"ALTER TABLE `{self.table_name}` {','.join(change_nullability)}")
if create_contraint_query:
# nosemgrep
frappe.db.sql(create_contraint_query)
if drop_contraint_query:
# nosemgrep
frappe.db.sql(drop_contraint_query)
except Exception as e:
# sanitize
if frappe.db.is_duplicate_fieldname(e):
frappe.throw(str(e))
elif frappe.db.is_duplicate_entry(e):
fieldname = str(e).split("'")[-2]
frappe.throw(
_(
"{0} field cannot be set as unique in {1}, as there are non-unique existing values"
).format(fieldname, self.table_name)
)
else:
raise e
def alter_primary_key(self) -> str | None:
# If there are no values in table allow migrating to UUID from varchar
autoname = self.meta.autoname
if autoname == "UUID" and frappe.db.get_column_type(self.doctype, "name") != "uuid":
if not frappe.db.get_value(self.doctype, {}, order_by=None):
return "alter column `name` TYPE uuid USING name::uuid"
else:
frappe.throw(
_("Primary key of doctype {0} can not be changed as there are existing values.").format(
self.doctype
)
)
# Reverting from UUID to VARCHAR
if autoname != "UUID" and frappe.db.get_column_type(self.doctype, "name") == "uuid":
return f"alter column `name` TYPE varchar({frappe.db.VARCHAR_LEN})"