fix: Run Import in background job

This commit is contained in:
Faris Ansari 2019-09-06 02:56:10 +05:30
parent d960affe19
commit 87e525d40c
3 changed files with 75 additions and 46 deletions

View file

@ -319,12 +319,13 @@ class Importer:
if warnings:
return {"warnings": warnings}
# setup import log
if self.data_import.import_log:
import_log = frappe.parse_json(self.data_import.import_log)
else:
import_log = []
# remove failures
# remove previous failures from import log
import_log = [l for l in import_log if l.get("success") == True]
# get successfully imported rows
@ -342,47 +343,47 @@ class Importer:
total_payload_count = len(payloads)
batch_size = frappe.conf.data_import_batch_size or 1000
for batched_payloads in frappe.utils.create_batch(payloads, batch_size):
for batch_index, batched_payloads in enumerate(frappe.utils.create_batch(payloads, batch_size)):
for i, payload in enumerate(batched_payloads):
doc = payload.doc
row_indexes = [row[0] for row in payload.rows]
current_index = i + 1
doc = payload.doc
row_indexes = [row[0] for row in payload.rows]
current_index = (i + 1) + (batch_index * batch_size)
if set(row_indexes).intersection(set(imported_rows)):
print("Skipping imported rows", row_indexes)
frappe.publish_realtime(
"data_import_progress",
{"current": current_index, "total": total_payload_count, "skipping": True},
)
continue
try:
print("Importing", doc)
doc = self.process_doc(doc)
frappe.publish_realtime(
"data_import_progress",
{
"current": current_index,
"total": total_payload_count,
"docname": doc.name,
"success": True,
"row_indexes": row_indexes,
},
)
import_log.append(
frappe._dict(success=True, docname=doc.name, row_indexes=row_indexes)
)
except Exception as e:
import_log.append(
frappe._dict(
success=False,
exception=frappe.get_traceback(),
messages=frappe.local.message_log,
row_indexes=row_indexes,
if set(row_indexes).intersection(set(imported_rows)):
print("Skipping imported rows", row_indexes)
frappe.publish_realtime(
"data_import_progress",
{"current": current_index, "total": total_payload_count, "skipping": True},
)
)
frappe.clear_messages()
continue
try:
print("Importing", doc)
doc = self.process_doc(doc)
frappe.publish_realtime(
"data_import_progress",
{
"current": current_index,
"total": total_payload_count,
"docname": doc.name,
"success": True,
"row_indexes": row_indexes,
},
)
import_log.append(
frappe._dict(success=True, docname=doc.name, row_indexes=row_indexes)
)
except Exception as e:
import_log.append(
frappe._dict(
success=False,
exception=frappe.get_traceback(),
messages=frappe.local.message_log,
row_indexes=row_indexes,
)
)
frappe.clear_messages()
# rollback to savepoint if something went wrong
# frappe.db.sql('ROLLBACK TO SAVEPOINT import')
@ -401,6 +402,7 @@ class Importer:
self.data_import.db_set("import_log", json.dumps(import_log))
frappe.flags.in_import = False
frappe.publish_realtime("data_import_refresh")
def get_payloads_for_import(self, fields, data):
payloads = []

View file

@ -3,6 +3,12 @@
frappe.ui.form.on('Data Import Beta', {
setup(frm) {
frappe.realtime.on('data_import_refresh', () => {
frappe.model.clear_doc('Data Import Beta', frm.doc.name);
frappe.model.with_doc('Data Import Beta', frm.doc.name).then(() => {
frm.refresh();
});
});
frappe.realtime.on('data_import_progress', data => {
let percent = Math.floor((data.current * 100) / data.total);
let message;

View file

@ -7,6 +7,8 @@ import frappe
from frappe.model.document import Document
from frappe.core.doctype.data_import.importer_new import Importer
from frappe.core.doctype.data_import.exporter_new import Exporter
from frappe.core.page.background_jobs.background_jobs import get_info
from frappe.utils.background_jobs import enqueue
class DataImportBeta(Document):
@ -25,8 +27,17 @@ class DataImportBeta(Document):
return i.get_data_for_import_preview()
def start_import(self):
i = self.get_importer()
return i.import_data()
enqueued_jobs = [d.get("job_name") for d in get_info()]
if self.name not in enqueued_jobs:
enqueue(
start_import,
queue="default",
timeout=6000,
event="data_import",
job_name=self.name,
data_import=self.name,
)
def get_importer(self):
return Importer(self.reference_doctype, data_import=self)
@ -42,10 +53,10 @@ class DataImportBeta(Document):
values = d.missing_values
meta = frappe.get_meta(doctype)
# find the autoname field
if meta.autoname and meta.autoname.startswith('field:'):
autoname_field = meta.autoname[len('field:') :]
if meta.autoname and meta.autoname.startswith("field:"):
autoname_field = meta.autoname[len("field:") :]
else:
autoname_field = 'name'
autoname_field = "name"
for value in values:
new_doc = frappe.new_doc(doctype)
@ -53,8 +64,18 @@ class DataImportBeta(Document):
docs.append(new_doc.insert())
return docs
def start_import(data_import):
"""This method runs in background job"""
data_import = frappe.get_doc("Data Import Beta", data_import)
i = Importer(data_import.reference_doctype, data_import=data_import)
return i.import_data()
@frappe.whitelist()
def download_template(doctype, export_fields=None, export_records=None, export_filters=None, file_type="CSV"):
def download_template(
doctype, export_fields=None, export_records=None, export_filters=None, file_type="CSV"
):
"""
Download template from Exporter
:param doctype: Document Type
@ -66,7 +87,7 @@ def download_template(doctype, export_fields=None, export_records=None, export_f
export_fields = frappe.parse_json(export_fields)
export_filters = frappe.parse_json(export_filters)
export_data = export_records != 'blank_template'
export_data = export_records != "blank_template"
e = Exporter(
doctype,