refactor(migrate-to): code restructure and bug fixes

This commit is contained in:
Gavin D'souza 2020-05-13 19:42:54 +05:30
parent 451772dfab
commit e559e30d02

View file

@ -1,17 +1,45 @@
# imports - standard imports
import functools
import getpass
import json
import re
import sys
# imports - third party imports
import click
import requests
from terminaltables import AsciiTable
# imports - module imports
import frappe
import frappe.utils.backups
from frappe.utils import get_installed_apps_info
def choose(plans_list):
def render_table(data):
print(AsciiTable(data).table)
def padme(me):
def empty_line(*args, **kwargs):
result = me(*args, **kwargs)
print()
return result
return empty_line
@functools.lru_cache(maxsize=1024)
def get_first_party_apps():
apps = []
for org in ["frappe", "erpnext"]:
req = requests.get(f"https://api.github.com/users/{org}/repos", {"type": "sources", "per_page": 200})
if req.ok:
apps.extend([x["name"] for x in req.json()])
return apps
@padme
def choose_plan(plans_list):
plans_table = []
available_plans = []
selected_plan = None
print(f"{len(plans_list)} plans available")
@ -21,40 +49,229 @@ def choose(plans_list):
available_plans.append(row_data[0])
plans_table.append(row_data)
print(AsciiTable(plans_table).table)
render_table(plans_table)
while True:
if selected_plan:
break
input_plan = input("Send plan?: ").strip()
if input_plan in available_plans:
print(f"{input_plan} Plan selected ✅")
return input_plan
else:
input_plan = input("Send plan?: ")
if input_plan in available_plans:
return selected_plan
else:
print("Invalid selection...try again")
print("Invalid selection...try again ❌")
def filter_apps(app_groups):
from frappe.utils import get_installed_apps_info
def check_app_compat(available_group):
frappe_upgrade_msg = ""
trimmed_available_group = set([(app['scrubbed'], app['branch']) for app in available_group['apps']])
existing_group = set([(app['app_name'], app['branch']) for app in get_installed_apps_info()])
allowed_apps = []
print("Checking availability of existing app group")
incompatible_apps = []
filtered_apps = []
existing_apps = []
branch_msgs = []
for group in app_groups:
for app in group.get("apps"):
app_name = group.get('scrubbed')
branch = group.get('branch')
allowed_apps.append(tuple(app_name, branch))
for (app, branch) in existing_group:
if (app, branch) not in trimmed_available_group:
app_title = [group["name"] for group in available_group["apps"] if group["scrubbed"] == app]
if app_title:
app_title = app_title[0]
is_compat = False
if app not in get_first_party_apps():
incompatible_apps.append(app)
print(f"❌ App {app}:{branch}")
else:
available_branch = [a['branch'] for a in available_group['apps'] if a['scrubbed'] == app]
if not available_branch:
print(f"App {app} doesn't exist in selected group")
continue
else:
available_branch = available_branch[0]
print(f"⚠️ {app}:{branch} => {available_branch}")
branch_msgs.append([app.title(), branch, available_branch])
filtered_apps.append(app_title)
else:
filtered_apps.append(app_title)
for app in get_installed_apps_info():
app_name = app.get('app_name')
branch = app.get('branch')
existing_apps.append(tuple(app_name, branch))
start_msg = "\nSelecting this group will "
incompatible_apps = f"drop {len(incompatible_apps)} apps: " + ", ".join(incompatible_apps) + " and " if incompatible_apps else ""
branch_change = "upgrade:\n" + "\n".join(["{}: {} => {}".format(*x) for x in branch_msgs]) if branch_msgs else ""
changes = (incompatible_apps + branch_change) or "be perfect for you :)"
warning_message = start_msg + changes
filtered_apps = [app[0] for app in existing_apps if app in allowed_apps]
print(warning_message)
return "Vanilla Version 12", filtered_apps
return is_compat, filtered_apps
def generate_app_group_table(app_groups):
app_groups_table = [["#", "App Group", "Apps"]]
for _, app_group in enumerate(app_groups):
row = [_ + 1, app_group["name"], ", ".join([f"{app['scrubbed']}:{app['branch']}" for app in app_group['apps']])]
app_groups_table.append(row)
render_table(app_groups_table)
@padme
def filter_apps(app_groups):
# try for default group first...then let em select which group
default_group = [g for g in app_groups if g['default']][0]
is_compat, filtered_apps = check_app_compat(default_group)
if not is_compat and not click.confirm("Continue anyway?"):
generate_app_group_table(app_groups)
while True:
try:
app_group_index = int(input("Select App Group #: ").strip()) - 1
selected_group = app_groups[app_group_index]
is_compat, filtered_apps = check_app_compat(selected_group)
except:
print("Invalid Selection")
sys.exit(1)
if is_compat or click.confirm("Continue anyway?"):
break
return default_group['name'], filtered_apps
@padme
def create_session():
# take user input from STDIN
username = input("Username: ").strip()
password = getpass.unix_getpass()
auth_credentials = {"usr": username, "pwd": password}
session = requests.Session()
login_sc = session.post(login_url, auth_credentials)
if login_sc.ok:
print(f"Authorization Successful! ✅")
session.headers.update({"X-Press-Team": username})
return session
else:
print(f"Authorization Failed with Error Code {login_sc.status_code}")
def get_new_site_options():
site_options_sc = session.post(options_url)
if site_options_sc.ok:
site_options = site_options_sc.json()["message"]
return site_options
else:
print(f"Couldn't retrive New site information: {site_options_sc.status_code}")
def is_valid_subdomain(subdomain):
matched = re.match("^[a-z0-9][a-z0-9-]*[a-z0-9]$", subdomain)
if matched:
return True
print('Subdomain contains invalid characters. Use lowercase characters, numbers and hyphens')
def is_subdomain_available(subdomain):
res = session.post(site_exists_url, {"subdomain": subdomain})
if res.ok:
available = not res.json()['message']
if not available:
print('Subdomain already exists! Try another one')
return available
@padme
def get_subdomain(domain):
while True:
subdomain = input("Enter subdomain: ").strip()
if is_valid_subdomain(subdomain):
if is_subdomain_available(subdomain):
print(f"Site Domain: {subdomain}.{domain}")
return subdomain
@padme
def upload_backup(local_site):
# take backup
print(f"Taking backup for site {local_site}")
odb = frappe.utils.backups.new_backup(ignore_files=False, force=True)
files_session = {}
# upload files
for file_type, file_path in [
("database", odb.backup_path_db),
("public", odb.backup_path_files),
("private", odb.backup_path_private_files)
]:
file_upload_response = session.post(files_url, data={}, files={
"file": open(file_path, "rb"),
"is_private": 1,
"folder": "Home",
"method": "press.api.site.upload_backup",
"type": file_type
})
if file_upload_response.ok:
files_session[file_type] = file_upload_response.json()["message"]
else:
print(f"Upload failed for: {file_path}")
files_uploaded = { k: v["file_url"] for k, v in files_session.items() }
return files_uploaded
def frappecloud_migrator(local_site, remote_site):
# test (change to https !!!):
global login_url, upload_url, files_url, options_url, site_exists_url, session
login_url = f"http://{remote_site}/api/method/login"
upload_url = f"http://{remote_site}/api/method/press.api.site.new"
files_url = f"http://{remote_site}/api/method/upload_file"
options_url = f"http://{remote_site}/api/method/press.api.site.options_for_new"
site_exists_url = f"http://{remote_site}/api/method/press.api.site.exists"
print(f"Frappe Cloud credentials @ {remote_site}")
# get credentials + auth user + start session
session = create_session()
if session:
# connect to site db
frappe.init(site=local_site)
frappe.connect()
# get new site options
site_options = get_new_site_options()
# set preferences from site options
subdomain = get_subdomain(site_options['domain'])
plan = choose_plan(site_options['plans'])
app_groups = site_options["groups"]
selected_group, filtered_apps = filter_apps(app_groups)
files_uploaded = upload_backup(local_site)
# push to frappe_cloud
payload = json.dumps({
"site": {
"apps": filtered_apps,
"files": files_uploaded,
"group": selected_group,
"name": subdomain,
"plan": plan
}
})
session.headers.update({"Content-Type": "application/json; charset=utf-8"})
site_creation_request = session.post(upload_url, payload)
frappe.destroy()
if site_creation_request.ok:
print(f"Site creation started at {site_creation_request.json()['message']}")
else:
print(f"Request failed with error code {site_creation_request.status_code}")
def migrate_to(local_site, remote_site):
@ -65,87 +282,3 @@ def migrate_to(local_site, remote_site):
else:
print(f"{remote_site} is not supported yet")
sys.exit(1)
def frappecloud_migrator(local_site, remote_site):
# test:
login_url = f"http://{remote_site}/api/method/login"
upload_url = f"http://{remote_site}/api/method/press.api.site.new"
files_url = f"http://{remote_site}/api/method/upload_file"
options_url = f"http://{remote_site}/api/method/press.api.site.options_for_new"
# production:
# login_url = f"https://{remote_site}/api/method/login"
# upload_url = f"https://{remote_site}/api/method/press.api.site.new_from_existing_account"
# options_url = f"https://{remote_site}/api/method/press.api.site.options_for_new"
print(f"Frappe Cloud credentials @ {remote_site}")
username = input("Username: ")
password = getpass.unix_getpass()
auth_credentials = {"usr": username, "pwd": password}
# create frapp_cloud session
session = requests.Session()
login_sc = session.post(login_url, auth_credentials)
if login_sc.ok:
print(f"Authorization Successful!")
# get options
session.headers.update({"X-Press-Team": username})
site_options_sc = session.post(options_url)
if site_options_sc.ok:
site_options = site_options_sc.json()["message"]
app_groups = site_options_sc.json()["groups"]
else:
print(f"Request failed with Status Code: {site_options_sc.status_code}")
sys.exit(1)
# set preferences from options
subdomain = input("Enter subdomain: ")
plan = choose(site_options['plans'])
frappe.init(site=local_site)
frappe.connect()
# apps currently on site....vanilla.
selected_group, filtered_apps = filter_apps(app_groups)
# take backup
print(f"Taking backup for site {local_site}")
odb = frappe.utils.backups.new_backup(ignore_files=False, force=True)
files_session = {}
# upload files
for file_type, file_path in [("database", odb.backup_path_db), ("public", odb.backup_path_files), ("private", odb.backup_path_private_files)]:
file_upload_response = session.post(files_url, data={}, files={
"file": open(file_path, "rb"),
"is_private": 1,
"folder": "Home",
"method": "press.api.site.upload_backup",
"type": file_type
})
if file_upload_response.ok:
files_session[file_type] = file_upload_response.json()["message"]
else:
print(f"Upload failed for: {file_path}")
files_uploaded = { k, v["file_url"] for k, v in files_session.items() }
# push to frappe_cloud
session.post(upload_url, data={
"site": {
"apps": filtered_apps,
"files": files_uploaded,
"group": selected_group,
"name": subdomain,
"plan": plan
}
})
frappe.destroy()
else:
print(f"Request failed with Status Code: {login_sc.status_code}")