100 lines
3.2 KiB
Python
100 lines
3.2 KiB
Python
import os
|
|
import re
|
|
|
|
import frappe
|
|
from frappe.database.db_manager import DbManager
|
|
from frappe.utils import cint
|
|
|
|
|
|
def setup_database():
|
|
root_conn = get_root_connection()
|
|
root_conn.commit()
|
|
root_conn.sql("end")
|
|
root_conn.sql(f'DROP DATABASE IF EXISTS "{frappe.conf.db_name}"')
|
|
|
|
# If user exists, just update password
|
|
if root_conn.sql(f"SELECT 1 FROM pg_roles WHERE rolname='{frappe.conf.db_user}'"):
|
|
root_conn.sql(f"ALTER USER \"{frappe.conf.db_user}\" WITH PASSWORD '{frappe.conf.db_password}'")
|
|
else:
|
|
root_conn.sql(f"CREATE USER \"{frappe.conf.db_user}\" WITH PASSWORD '{frappe.conf.db_password}'")
|
|
root_conn.sql(f'CREATE DATABASE "{frappe.conf.db_name}"')
|
|
root_conn.sql(
|
|
f'GRANT ALL PRIVILEGES ON DATABASE "{frappe.conf.db_name}" TO "{frappe.conf.db_user}"'
|
|
)
|
|
if psql_version := root_conn.sql("SELECT VERSION()", as_dict=True):
|
|
version_string = psql_version[0].get("version") or "PostgreSQL 14"
|
|
major_version = cint(re.split(r"[\w\.]", version_string)[1])
|
|
if major_version > 15:
|
|
root_conn.sql(f'ALTER DATABASE "{frappe.conf.db_name}" OWNER TO "{frappe.conf.db_user}"')
|
|
root_conn.close()
|
|
|
|
|
|
def bootstrap_database(db_name, verbose, source_sql=None):
|
|
frappe.connect(db_name=db_name)
|
|
import_db_from_sql(source_sql, verbose)
|
|
frappe.connect(db_name=db_name)
|
|
|
|
if "tabDefaultValue" not in frappe.db.get_tables():
|
|
import sys
|
|
|
|
from click import secho
|
|
|
|
secho(
|
|
"Table 'tabDefaultValue' missing in the restored site. "
|
|
"This may be due to incorrect permissions or the result of a restore from a bad backup file. "
|
|
"Database not installed correctly.",
|
|
fg="red",
|
|
)
|
|
sys.exit(1)
|
|
|
|
|
|
def import_db_from_sql(source_sql=None, verbose=False):
|
|
if verbose:
|
|
print("Starting database import...")
|
|
db_name = frappe.conf.db_name
|
|
if not source_sql:
|
|
source_sql = os.path.join(os.path.dirname(__file__), "framework_postgres.sql")
|
|
DbManager(frappe.local.db).restore_database(
|
|
verbose, db_name, source_sql, frappe.conf.db_user, frappe.conf.db_password
|
|
)
|
|
if verbose:
|
|
print("Imported from database %s" % source_sql)
|
|
|
|
|
|
def get_root_connection():
|
|
if not frappe.local.flags.root_connection:
|
|
if not frappe.flags.root_login:
|
|
frappe.flags.root_login = frappe.conf.get("root_login") or None
|
|
|
|
if not frappe.flags.root_login:
|
|
frappe.flags.root_login = input("Enter postgres super user: ")
|
|
|
|
if not frappe.flags.root_password:
|
|
frappe.flags.root_password = frappe.conf.get("root_password") or None
|
|
|
|
if not frappe.flags.root_password:
|
|
from getpass import getpass
|
|
|
|
frappe.flags.root_password = getpass("Postgres super user password: ")
|
|
|
|
frappe.local.flags.root_connection = frappe.database.get_db(
|
|
host=frappe.conf.db_host,
|
|
port=frappe.conf.db_port,
|
|
user=frappe.flags.root_login,
|
|
password=frappe.flags.root_password,
|
|
cur_db_name=frappe.flags.root_login,
|
|
)
|
|
|
|
return frappe.local.flags.root_connection
|
|
|
|
|
|
def drop_user_and_database(db_name, db_user):
|
|
root_conn = get_root_connection()
|
|
root_conn.commit()
|
|
root_conn.sql(
|
|
"SELECT pg_terminate_backend (pg_stat_activity.pid) FROM pg_stat_activity WHERE pg_stat_activity.datname = %s",
|
|
(db_name,),
|
|
)
|
|
root_conn.sql("end")
|
|
root_conn.sql(f"DROP DATABASE IF EXISTS {db_name}")
|
|
root_conn.sql(f"DROP USER IF EXISTS {db_user}")
|