diff --git a/frappe/database/database.py b/frappe/database/database.py index 424bcbbc63..411888df34 100644 --- a/frappe/database/database.py +++ b/frappe/database/database.py @@ -1228,7 +1228,7 @@ class Database(object): frappe.flags.touched_tables = set() frappe.flags.touched_tables.update(tables) - def bulk_insert(self, doctype, fields, values, ignore_duplicates=False): + def bulk_insert(self, doctype, fields, values, ignore_duplicates=False, *, chunk_size=10_000): """ Insert multiple records at a time @@ -1236,22 +1236,19 @@ class Database(object): :param fields: list of fields :params values: list of list of values """ - insert_list = [] - fields = ", ".join("`" + field + "`" for field in fields) - for idx, value in enumerate(values): - insert_list.append(tuple(value)) - if idx and (idx % 10000 == 0 or idx < len(values) - 1): - self.sql( - """INSERT {ignore_duplicates} INTO `tab{doctype}` ({fields}) VALUES {values}""".format( - ignore_duplicates="IGNORE" if ignore_duplicates else "", - doctype=doctype, - fields=fields, - values=", ".join(["%s"] * len(insert_list)), - ), - tuple(insert_list), - ) - insert_list = [] + table = frappe.qb.DocType(doctype) + for start_index in range(0, len(values), chunk_size): + query = frappe.qb.into(table) + if ignore_duplicates: + # Pypika does not have same api for ignoring duplicates + if frappe.conf.db_type == "mariadb": + query = query.ignore() + elif frappe.conf.db_type == "postgres": + query = query.on_conflict().do_nothing() + + values_to_insert = values[start_index : start_index + chunk_size] + query.columns(fields).insert(*values_to_insert).run() def enqueue_jobs_after_commit(): diff --git a/frappe/tests/test_db.py b/frappe/tests/test_db.py index f722ad1d65..5b469cd5db 100644 --- a/frappe/tests/test_db.py +++ b/frappe/tests/test_db.py @@ -4,6 +4,7 @@ import datetime import inspect import unittest +from math import ceil from random import choice from unittest.mock import patch @@ -445,6 +446,33 @@ class TestDB(unittest.TestCase): self.assertEqual(frappe.db.exists(dt, [["name", "=", dn]]), dn) + def test_bulk_insert(self): + current_count = frappe.db.count("ToDo") + test_body = f"test_bulk_insert - {random_string(10)}" + chunk_size = 10 + + for number_of_values in (1, 2, 5, 27): + current_transaction_writes = frappe.db.transaction_writes + + frappe.db.bulk_insert( + "ToDo", + ["name", "description"], + [[f"ToDo Test Bulk Insert {i}", test_body] for i in range(number_of_values)], + ignore_duplicates=True, + chunk_size=chunk_size, + ) + + # check that all records were inserted + self.assertEqual(number_of_values, frappe.db.count("ToDo") - current_count) + + # check if inserts were done in chunks + expected_number_of_writes = ceil(number_of_values / chunk_size) + self.assertEqual( + expected_number_of_writes, frappe.db.transaction_writes - current_transaction_writes + ) + + frappe.db.delete("ToDo", {"description": test_body}) + @run_only_if(db_type_is.MARIADB) class TestDDLCommandsMaria(unittest.TestCase):