refactor: use a function with context manager for backup decryption

Signed-off-by: Akhil Narang <me@akhilnarang.dev>
This commit is contained in:
Akhil Narang 2023-11-06 13:42:36 +05:30
parent 2e382040cd
commit a06e402f38
No known key found for this signature in database
GPG key ID: 9DCC61E211BF645F
2 changed files with 112 additions and 95 deletions

View file

@ -177,16 +177,9 @@ def _restore(
with_public_files=None,
with_private_files=None,
):
from frappe.installer import (
_new_site,
extract_files,
is_downgrade,
is_partial,
validate_database_sql,
)
from frappe.utils.backups import Backup, get_or_generate_backup_encryption_key
from frappe.installer import extract_files
from frappe.utils.backups import decrypt_backup, get_or_generate_backup_encryption_key
_backup = Backup(sql_file_path)
err, out = frappe.utils.execute_in_shell(f"file {sql_file_path}", check_exit_code=True)
if err:
click.secho("Failed to detect type of backup file", fg="red")
@ -195,17 +188,79 @@ def _restore(
if "cipher" in out.decode().split(":")[-1].strip():
if encryption_key:
click.secho("Encrypted backup file detected. Decrypting using provided key.", fg="yellow")
_backup.backup_decryption(encryption_key)
else:
click.secho("Encrypted backup file detected. Decrypting using site config.", fg="yellow")
encryption_key = get_or_generate_backup_encryption_key()
_backup.backup_decryption(encryption_key)
# Rollback on unsuccessful decryption
if not os.path.exists(sql_file_path):
click.secho("Decryption failed. Please provide a valid key and try again.", fg="red")
sys.exit(1)
with decrypt_backup(sql_file_path, encryption_key):
# Rollback on unsuccessful decryption
if not os.path.exists(sql_file_path):
click.secho("Decryption failed. Please provide a valid key and try again.", fg="red")
sys.exit(1)
restore_backup(
sql_file_path,
site,
db_root_username,
db_root_password,
verbose,
install_app,
admin_password,
force,
)
else:
restore_backup(
sql_file_path,
site,
db_root_username,
db_root_password,
verbose,
install_app,
admin_password,
force,
)
# Extract public and/or private files to the restored site, if user has given the path
if with_public_files:
# Decrypt data if there is a Key
if encryption_key:
with decrypt_backup(with_public_files, encryption_key):
public = extract_files(site, with_public_files)
else:
public = extract_files(site, with_public_files)
# Removing temporarily created file
os.remove(public)
if with_private_files:
# Decrypt data if there is a Key
if encryption_key:
with decrypt_backup(with_private_files, encryption_key):
private = extract_files(site, with_private_files)
else:
private = extract_files(site, with_private_files)
# Removing temporarily created file
os.remove(private)
success_message = "Site {} has been restored{}".format(
site, " with files" if (with_public_files or with_private_files) else ""
)
click.secho(success_message, fg="green")
def restore_backup(
sql_file_path: str,
site,
db_root_username,
db_root_password,
verbose,
install_app,
admin_password,
force,
):
from frappe.installer import _new_site, is_downgrade, is_partial, validate_database_sql
if is_partial(sql_file_path):
click.secho(
@ -247,32 +302,6 @@ def _restore(
print(err.args[1])
sys.exit(1)
# Extract public and/or private files to the restored site, if user has given the path
if with_public_files:
# Decrypt data if there is a Key
if encryption_key:
_backup = Backup(with_public_files)
_backup.backup_decryption(encryption_key)
public = extract_files(site, with_public_files)
# Removing temporarily created file
os.remove(public)
if with_private_files:
# Decrypt data if there is a Key
if encryption_key:
_backup = Backup(with_private_files)
_backup.backup_decryption(encryption_key)
private = extract_files(site, with_private_files)
# Removing temporarily created file
os.remove(private)
success_message = "Site {} has been restored{}".format(
site, " with files" if (with_public_files or with_private_files) else ""
)
click.secho(success_message, fg="green")
@click.command("partial-restore")
@click.argument("sql-file-path")
@ -281,21 +310,16 @@ def _restore(
@pass_context
def partial_restore(context, sql_file_path, verbose, encryption_key=None):
from frappe.installer import is_partial, partial_restore
from frappe.utils.backups import Backup, get_or_generate_backup_encryption_key
from frappe.utils.backups import decrypt_backup, get_or_generate_backup_encryption_key
if not os.path.exists(sql_file_path):
print("Invalid path", sql_file_path)
sys.exit(1)
site = get_site(context)
frappe.init(site=site)
_backup = Backup(sql_file_path)
verbose = context.verbose or verbose
frappe.init(site=site)
frappe.connect(site=site)
_backup = Backup(sql_file_path)
err, out = frappe.utils.execute_in_shell(f"file {sql_file_path}", check_exit_code=True)
if err:
click.secho("Failed to detect type of backup file", fg="red")
@ -310,21 +334,30 @@ def partial_restore(context, sql_file_path, verbose, encryption_key=None):
click.secho("Encrypted backup file detected. Decrypting using site config.", fg="yellow")
key = get_or_generate_backup_encryption_key()
_backup.backup_decryption(key)
with decrypt_backup(sql_file_path, key):
if not is_partial(sql_file_path):
click.secho(
"Full backup file detected.Use `bench restore` to restore a Frappe Site.",
fg="red",
)
sys.exit(1)
partial_restore(sql_file_path, verbose)
# Rollback on unsuccessful decryption
if not os.path.exists(sql_file_path):
click.secho("Decryption failed. Please provide a valid key and try again.", fg="red")
sys.exit(1)
if not is_partial(sql_file_path):
click.secho(
"Full backup file detected.Use `bench restore` to restore a Frappe Site.",
fg="red",
)
sys.exit(1)
else:
if not is_partial(sql_file_path):
click.secho(
"Full backup file detected.Use `bench restore` to restore a Frappe Site.",
fg="red",
)
sys.exit(1)
partial_restore(sql_file_path, verbose)
partial_restore(sql_file_path, verbose)
frappe.destroy()

View file

@ -1,5 +1,6 @@
# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors
# License: MIT. See LICENSE
import contextlib
# imports - standard imports
import gzip
@ -632,46 +633,29 @@ def get_or_generate_backup_encryption_key():
return key
class Backup:
def __init__(self, file_path):
self.file_path = file_path
@contextlib.contextmanager
def decrypt_backup(file_path: str, passphrase: str):
if not os.path.exists(file_path):
print("Invalid path: ", file_path)
return
else:
file_path_with_ext = file_path + ".gpg"
os.rename(file_path, file_path_with_ext)
def backup_decryption(self, passphrase):
"""
Decrypts backup at the given path using the passphrase.
"""
if not os.path.exists(self.file_path):
print("Invalid path", self.file_path)
return
else:
file_path_with_ext = self.file_path + ".gpg"
os.rename(self.file_path, file_path_with_ext)
cmd_string = "gpg --yes --passphrase {passphrase} --pinentry-mode loopback -o {decrypted_file} -d {file_location}"
command = cmd_string.format(
passphrase=passphrase,
file_location=file_path_with_ext,
decrypted_file=self.file_path,
)
frappe.utils.execute_in_shell(command)
def decryption_rollback(self):
"""
Checks if the decrypted file exists at the given path.
if exists
Renames the orginal encrypted file.
else
Removes the decrypted file and rename the original file.
"""
if os.path.exists(self.file_path + ".gpg"):
if os.path.exists(self.file_path):
os.remove(self.file_path)
if os.path.exists(self.file_path.rstrip(".gz")):
os.remove(self.file_path.rstrip(".gz"))
os.rename(self.file_path + ".gpg", self.file_path)
def __del__(self):
self.decryption_rollback()
cmd_string = "gpg --yes --passphrase {passphrase} --pinentry-mode loopback -o {decrypted_file} -d {file_location}"
command = cmd_string.format(
passphrase=passphrase,
file_location=file_path_with_ext,
decrypted_file=file_path,
)
frappe.utils.execute_in_shell(command)
yield
if os.path.exists(file_path + ".gpg"):
if os.path.exists(file_path):
os.remove(file_path)
if os.path.exists(file_path.rstrip(".gz")):
os.remove(file_path.rstrip(".gz"))
os.rename(file_path + ".gpg", file_path)
def backup(