From a06e402f38c4e823ad995749d405bdc488fe3cba Mon Sep 17 00:00:00 2001 From: Akhil Narang Date: Mon, 6 Nov 2023 13:42:36 +0530 Subject: [PATCH] refactor: use a function with context manager for backup decryption Signed-off-by: Akhil Narang --- frappe/commands/site.py | 145 ++++++++++++++++++++++++---------------- frappe/utils/backups.py | 62 +++++++---------- 2 files changed, 112 insertions(+), 95 deletions(-) diff --git a/frappe/commands/site.py b/frappe/commands/site.py index b5253ecf8d..859f2cfc85 100644 --- a/frappe/commands/site.py +++ b/frappe/commands/site.py @@ -177,16 +177,9 @@ def _restore( with_public_files=None, with_private_files=None, ): - from frappe.installer import ( - _new_site, - extract_files, - is_downgrade, - is_partial, - validate_database_sql, - ) - from frappe.utils.backups import Backup, get_or_generate_backup_encryption_key + from frappe.installer import extract_files + from frappe.utils.backups import decrypt_backup, get_or_generate_backup_encryption_key - _backup = Backup(sql_file_path) err, out = frappe.utils.execute_in_shell(f"file {sql_file_path}", check_exit_code=True) if err: click.secho("Failed to detect type of backup file", fg="red") @@ -195,17 +188,79 @@ def _restore( if "cipher" in out.decode().split(":")[-1].strip(): if encryption_key: click.secho("Encrypted backup file detected. Decrypting using provided key.", fg="yellow") - _backup.backup_decryption(encryption_key) else: click.secho("Encrypted backup file detected. Decrypting using site config.", fg="yellow") encryption_key = get_or_generate_backup_encryption_key() - _backup.backup_decryption(encryption_key) - # Rollback on unsuccessful decryption - if not os.path.exists(sql_file_path): - click.secho("Decryption failed. Please provide a valid key and try again.", fg="red") - sys.exit(1) + with decrypt_backup(sql_file_path, encryption_key): + # Rollback on unsuccessful decryption + if not os.path.exists(sql_file_path): + click.secho("Decryption failed. Please provide a valid key and try again.", fg="red") + sys.exit(1) + + restore_backup( + sql_file_path, + site, + db_root_username, + db_root_password, + verbose, + install_app, + admin_password, + force, + ) + else: + restore_backup( + sql_file_path, + site, + db_root_username, + db_root_password, + verbose, + install_app, + admin_password, + force, + ) + + # Extract public and/or private files to the restored site, if user has given the path + if with_public_files: + # Decrypt data if there is a Key + if encryption_key: + with decrypt_backup(with_public_files, encryption_key): + public = extract_files(site, with_public_files) + else: + public = extract_files(site, with_public_files) + + # Removing temporarily created file + os.remove(public) + + if with_private_files: + # Decrypt data if there is a Key + if encryption_key: + with decrypt_backup(with_private_files, encryption_key): + private = extract_files(site, with_private_files) + else: + private = extract_files(site, with_private_files) + + # Removing temporarily created file + os.remove(private) + + success_message = "Site {} has been restored{}".format( + site, " with files" if (with_public_files or with_private_files) else "" + ) + click.secho(success_message, fg="green") + + +def restore_backup( + sql_file_path: str, + site, + db_root_username, + db_root_password, + verbose, + install_app, + admin_password, + force, +): + from frappe.installer import _new_site, is_downgrade, is_partial, validate_database_sql if is_partial(sql_file_path): click.secho( @@ -247,32 +302,6 @@ def _restore( print(err.args[1]) sys.exit(1) - # Extract public and/or private files to the restored site, if user has given the path - if with_public_files: - # Decrypt data if there is a Key - if encryption_key: - _backup = Backup(with_public_files) - _backup.backup_decryption(encryption_key) - public = extract_files(site, with_public_files) - - # Removing temporarily created file - os.remove(public) - - if with_private_files: - # Decrypt data if there is a Key - if encryption_key: - _backup = Backup(with_private_files) - _backup.backup_decryption(encryption_key) - private = extract_files(site, with_private_files) - - # Removing temporarily created file - os.remove(private) - - success_message = "Site {} has been restored{}".format( - site, " with files" if (with_public_files or with_private_files) else "" - ) - click.secho(success_message, fg="green") - @click.command("partial-restore") @click.argument("sql-file-path") @@ -281,21 +310,16 @@ def _restore( @pass_context def partial_restore(context, sql_file_path, verbose, encryption_key=None): from frappe.installer import is_partial, partial_restore - from frappe.utils.backups import Backup, get_or_generate_backup_encryption_key + from frappe.utils.backups import decrypt_backup, get_or_generate_backup_encryption_key if not os.path.exists(sql_file_path): print("Invalid path", sql_file_path) sys.exit(1) site = get_site(context) - frappe.init(site=site) - - _backup = Backup(sql_file_path) - verbose = context.verbose or verbose - + frappe.init(site=site) frappe.connect(site=site) - _backup = Backup(sql_file_path) err, out = frappe.utils.execute_in_shell(f"file {sql_file_path}", check_exit_code=True) if err: click.secho("Failed to detect type of backup file", fg="red") @@ -310,21 +334,30 @@ def partial_restore(context, sql_file_path, verbose, encryption_key=None): click.secho("Encrypted backup file detected. Decrypting using site config.", fg="yellow") key = get_or_generate_backup_encryption_key() - _backup.backup_decryption(key) + with decrypt_backup(sql_file_path, key): + if not is_partial(sql_file_path): + click.secho( + "Full backup file detected.Use `bench restore` to restore a Frappe Site.", + fg="red", + ) + sys.exit(1) + + partial_restore(sql_file_path, verbose) # Rollback on unsuccessful decryption if not os.path.exists(sql_file_path): click.secho("Decryption failed. Please provide a valid key and try again.", fg="red") sys.exit(1) - if not is_partial(sql_file_path): - click.secho( - "Full backup file detected.Use `bench restore` to restore a Frappe Site.", - fg="red", - ) - sys.exit(1) + else: + if not is_partial(sql_file_path): + click.secho( + "Full backup file detected.Use `bench restore` to restore a Frappe Site.", + fg="red", + ) + sys.exit(1) - partial_restore(sql_file_path, verbose) + partial_restore(sql_file_path, verbose) frappe.destroy() diff --git a/frappe/utils/backups.py b/frappe/utils/backups.py index ba546801e5..0e52580927 100644 --- a/frappe/utils/backups.py +++ b/frappe/utils/backups.py @@ -1,5 +1,6 @@ # Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors # License: MIT. See LICENSE +import contextlib # imports - standard imports import gzip @@ -632,46 +633,29 @@ def get_or_generate_backup_encryption_key(): return key -class Backup: - def __init__(self, file_path): - self.file_path = file_path +@contextlib.contextmanager +def decrypt_backup(file_path: str, passphrase: str): + if not os.path.exists(file_path): + print("Invalid path: ", file_path) + return + else: + file_path_with_ext = file_path + ".gpg" + os.rename(file_path, file_path_with_ext) - def backup_decryption(self, passphrase): - """ - Decrypts backup at the given path using the passphrase. - """ - if not os.path.exists(self.file_path): - print("Invalid path", self.file_path) - return - else: - file_path_with_ext = self.file_path + ".gpg" - os.rename(self.file_path, file_path_with_ext) - - cmd_string = "gpg --yes --passphrase {passphrase} --pinentry-mode loopback -o {decrypted_file} -d {file_location}" - command = cmd_string.format( - passphrase=passphrase, - file_location=file_path_with_ext, - decrypted_file=self.file_path, - ) - frappe.utils.execute_in_shell(command) - - def decryption_rollback(self): - """ - Checks if the decrypted file exists at the given path. - if exists - Renames the orginal encrypted file. - else - Removes the decrypted file and rename the original file. - """ - if os.path.exists(self.file_path + ".gpg"): - if os.path.exists(self.file_path): - os.remove(self.file_path) - if os.path.exists(self.file_path.rstrip(".gz")): - os.remove(self.file_path.rstrip(".gz")) - os.rename(self.file_path + ".gpg", self.file_path) - - def __del__(self): - self.decryption_rollback() + cmd_string = "gpg --yes --passphrase {passphrase} --pinentry-mode loopback -o {decrypted_file} -d {file_location}" + command = cmd_string.format( + passphrase=passphrase, + file_location=file_path_with_ext, + decrypted_file=file_path, + ) + frappe.utils.execute_in_shell(command) + yield + if os.path.exists(file_path + ".gpg"): + if os.path.exists(file_path): + os.remove(file_path) + if os.path.exists(file_path.rstrip(".gz")): + os.remove(file_path.rstrip(".gz")) + os.rename(file_path + ".gpg", file_path) def backup(