From e2d619504fdb7e8990310ea96e691724721686c5 Mon Sep 17 00:00:00 2001 From: Ankush Menat Date: Mon, 26 May 2025 19:06:53 +0530 Subject: [PATCH] perf: batch bulk_insert (#32675) * fix: reduce bulk insert batch size Back when this feature was added it used to lazily evaluate the input. Now the iterator is consumed upfront so large batch sizes == huge memory usage. * perf: bring back iterator for bulk_insert Bulk insert used to support iterator for consuming arbitrarily large amount of data and inserting it. Since child table support was added, it can't do it anymore because that requires collecting values. This change now brings back iterators by batching input iterator (by default 1000) documents. This is almost as good as original change from design POV. Performance is still meh for flat documents. --- frappe/database/database.py | 2 +- frappe/model/document.py | 47 +++++++++++++++++++++---------------- 2 files changed, 28 insertions(+), 21 deletions(-) diff --git a/frappe/database/database.py b/frappe/database/database.py index 9216eee61f..8662693346 100644 --- a/frappe/database/database.py +++ b/frappe/database/database.py @@ -1447,7 +1447,7 @@ class Database: values: Iterable[Sequence[Any]], ignore_duplicates=False, *, - chunk_size=10_000, + chunk_size=1000, ): """ Insert multiple records at a time diff --git a/frappe/model/document.py b/frappe/model/document.py index ee2d8830cb..e437c26a2c 100644 --- a/frappe/model/document.py +++ b/frappe/model/document.py @@ -1,6 +1,7 @@ # Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors # License: MIT. See LICENSE import hashlib +import itertools import json import time from collections.abc import Generator, Iterable @@ -1876,7 +1877,8 @@ def bulk_insert( doctype: str, documents: Iterable["Document"], ignore_duplicates: bool = False, - chunk_size=10_000, + chunk_size=1000, + commit_chunks=False, ): """Insert simple Documents objects to database in bulk. @@ -1887,31 +1889,36 @@ def bulk_insert( """ doctype_meta = frappe.get_meta(doctype) - documents = list(documents) valid_column_map = { doctype: doctype_meta.get_valid_columns(), } - values_map = { - doctype: _document_values_generator(documents, valid_column_map[doctype]), - } - for child_table in doctype_meta.get_table_fields(): - valid_column_map[child_table.options] = frappe.get_meta(child_table.options).get_valid_columns() - values_map[child_table.options] = _document_values_generator( - [ - ch_doc - for ch_doc in ( - child_docs for doc in documents for child_docs in doc.get(child_table.fieldname) - ) - ], - valid_column_map[child_table.options], - ) + child_table_fields = doctype_meta.get_table_fields() - for dt, docs in values_map.items(): - frappe.db.bulk_insert( - dt, valid_column_map[dt], docs, ignore_duplicates=ignore_duplicates, chunk_size=chunk_size - ) + documents = iter(documents) + while document_batch := list(itertools.islice(documents, chunk_size)): + values_map = { + doctype: _document_values_generator(document_batch, valid_column_map[doctype]), + } + + for child_table in child_table_fields: + valid_column_map[child_table.options] = frappe.get_meta(child_table.options).get_valid_columns() + values_map[child_table.options] = _document_values_generator( + [ + ch_doc + for ch_doc in ( + child_docs for doc in document_batch for child_docs in doc.get(child_table.fieldname) + ) + ], + valid_column_map[child_table.options], + ) + + for dt, docs in values_map.items(): + frappe.db.bulk_insert(dt, valid_column_map[dt], docs, ignore_duplicates=ignore_duplicates) + + if commit_chunks: + frappe.db.commit() def _document_values_generator(