fix: add in missing logic, add in a test for creating a site with a specific user

Signed-off-by: Akhil Narang <me@akhilnarang.dev>
This commit is contained in:
Akhil Narang 2023-12-13 11:43:31 +05:30
parent d1906b3c16
commit 1e6eeba9b7
No known key found for this signature in database
GPG key ID: 9DCC61E211BF645F
5 changed files with 108 additions and 9 deletions

View file

@ -53,6 +53,7 @@ from frappe.exceptions import SiteNotSpecifiedError
default=True,
help="Create user and database in mariadb/postgres; only bootstrap if false",
)
@click.option("--db-user", help="Database user if you already have one")
def new_site(
site,
db_root_username=None,

View file

@ -18,6 +18,20 @@ class DbManager:
password_predicate = f" IDENTIFIED BY '{password}'" if password else ""
self.db.sql(f"CREATE USER '{user}'@'{host}'{password_predicate}")
def does_user_exist(self, username: str, host: str | None = None) -> bool:
return (
self.db.sql(
f"SELECT EXISTS(SELECT 1 FROM mysql.user WHERE user = '{username}' and "
f"host = '{host or self.get_current_host()}')"
)[0][0]
== 1
)
def set_user_password(self, username: str, password: str, host: str | None = None) -> None:
self.db.sql(
f"SET PASSWORD FOR '{username}'@'{host or self.get_current_host()}' = PASSWORD('{password}')"
)
def delete_user(self, target, host=None):
host = host or self.get_current_host()
self.db.sql(f"DROP USER IF EXISTS '{target}'@'{host}'")

View file

@ -34,16 +34,21 @@ def setup_database(force, verbose, no_mariadb_socket=False):
if no_mariadb_socket:
dbman_kwargs["host"] = "%"
if dbman.does_user_exist(db_user):
print("User exists", db_user)
dbman.set_user_password(db_user, frappe.conf.db_password, **dbman_kwargs)
if verbose:
print("Re-used existing user %s" % db_user)
else:
dbman.create_user(db_user, frappe.conf.db_password, **dbman_kwargs)
if verbose:
print("Created user %s" % db_user)
if force or (db_name not in dbman.get_database_list()):
dbman.delete_user(db_user, **dbman_kwargs)
dbman.drop_database(db_name)
else:
raise Exception(f"Database {db_name} already exists")
dbman.create_user(db_user, frappe.conf.db_password, **dbman_kwargs)
if verbose:
print("Created user %s" % db_user)
dbman.create_database(db_name)
if verbose:
print("Created database %s" % db_name)

View file

@ -11,15 +11,21 @@ def setup_database():
root_conn.commit()
root_conn.sql("end")
root_conn.sql(f"DROP DATABASE IF EXISTS `{frappe.conf.db_name}`")
root_conn.sql(f"DROP USER IF EXISTS {frappe.conf.db_name}")
# If user exists, just update password
if root_conn.sql(f"SELECT 1 FROM pg_roles WHERE rolname='{frappe.conf.db_user}'"):
root_conn.sql(f"ALTER USER {frappe.conf.db_user} WITH PASSWORD '{frappe.conf.db_password}'")
else:
root_conn.sql(f"CREATE USER {frappe.conf.db_user} WITH PASSWORD '{frappe.conf.db_password}'")
root_conn.sql(f"CREATE DATABASE `{frappe.conf.db_name}`")
root_conn.sql(f"CREATE user {frappe.conf.db_name} password '{frappe.conf.db_password}'")
root_conn.sql("GRANT ALL PRIVILEGES ON DATABASE `{0}` TO {0}".format(frappe.conf.db_name))
root_conn.sql(
f"GRANT ALL PRIVILEGES ON DATABASE `{frappe.conf.db_name}` TO {frappe.conf.db_user}"
)
if psql_version := root_conn.sql("SELECT VERSION()", as_dict=True):
version_string = psql_version[0].get("version") or "PostgreSQL 14"
major_version = cint(re.split(r"[\w\.]", version_string)[1])
if major_version > 15:
root_conn.sql("ALTER DATABASE `{0}` OWNER TO {0}".format(frappe.conf.db_name))
root_conn.sql(f"ALTER DATABASE `{frappe.conf.db_name}` OWNER TO {frappe.conf.db_user}")
root_conn.close()

View file

@ -6,7 +6,9 @@ import gzip
import importlib
import json
import os
import secrets
import shlex
import string
import subprocess
import unittest
from contextlib import contextmanager
@ -511,6 +513,77 @@ class TestCommands(BaseTestCommands):
self.assertEqual(conf[key], value)
def test_different_db_username(self):
site = frappe.generate_hash()
user = "".join(secrets.choice(string.ascii_letters) for _ in range(8))
password = frappe.generate_hash()
kwargs = {
"new_site": site,
"admin_password": frappe.conf.admin_password,
"root_password": frappe.conf.root_password,
"db_type": frappe.conf.db_type,
"db_user": user,
"db_password": password,
}
self.execute(
"bench new-site {new_site} --force --verbose "
"--admin-password {admin_password} "
"--db-root-password {root_password} "
"--db-type {db_type} "
"--db-user {db_user} "
"--db-password {db_password}",
kwargs,
)
self.assertEqual(self.returncode, 0)
self.execute("bench --site {new_site} show-config --format json", kwargs)
self.assertEqual(self.returncode, 0)
config = json.loads(self.stdout)
self.assertEqual(config[site]["db_user"], user)
self.assertEqual(config[site]["db_password"], password)
self.execute("bench drop-site {new_site} --force --db-root-password {root_password}", kwargs)
self.assertEqual(self.returncode, 0)
def test_existing_db_username(self):
site = frappe.generate_hash()
if (user := frappe.conf.db_user) is None:
user = "".join(secrets.choice(string.ascii_letters) for _ in range(8))
if frappe.conf.db_type == "mariadb":
from frappe.database.mariadb.setup_db import get_root_connection
root_conn = get_root_connection(frappe.flags.root_login, frappe.flags.root_password)
root_conn.sql(f"CREATE USER '{user}'@'localhost'")
else:
from frappe.database.postgres.setup_db import get_root_connection
root_conn = get_root_connection(frappe.flags.root_login, frappe.flags.root_password)
root_conn.sql(f"CREATE USER {user}")
password = frappe.conf.db_password or frappe.generate_hash()
kwargs = {
"new_site": site,
"admin_password": frappe.conf.admin_password,
"root_password": frappe.conf.root_password,
"db_type": frappe.conf.db_type,
"db_user": user,
"db_password": password,
}
self.execute(
"bench new-site {new_site} --force --verbose "
"--admin-password {admin_password} "
"--db-root-password {root_password} "
"--db-type {db_type} "
"--db-user {db_user} "
"--db-password {db_password}",
kwargs,
)
self.assertEqual(self.returncode, 0)
self.execute("bench --site {new_site} show-config --format json", kwargs)
self.assertEqual(self.returncode, 0)
config = json.loads(self.stdout)
self.assertEqual(config[site]["db_user"], user)
self.assertEqual(config[site]["db_password"], password)
self.execute("bench drop-site {new_site} --force --db-root-password {root_password}", kwargs)
self.assertEqual(self.returncode, 0)
class TestBackups(BaseTestCommands):
backup_map = {