refactor: support generators in db.bulk_insert

When you want to insert a huge amount of docs in DB, using bulk insert
currently requries creating a huge number of objects, this is
ineffecient from memory perspective and also delays failures if any till
last step in execution.
This commit is contained in:
Ankush Menat 2022-06-11 21:58:34 +05:30
parent a534e7a66a
commit 7ec120d2ba

View file

@ -3,12 +3,14 @@
import datetime
import json
import itertools
import random
import re
import string
import traceback
from contextlib import contextmanager, suppress
from time import time
from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple, Union
from pypika.dialects import MySQLQueryBuilder, PostgreSQLQueryBuilder
from pypika.terms import Criterion, NullValue
@ -1204,28 +1206,36 @@ class Database:
frappe.flags.touched_tables = set()
frappe.flags.touched_tables.update(tables)
def bulk_insert(self, doctype, fields, values, ignore_duplicates=False, *, chunk_size=10_000):
def bulk_insert(
self,
doctype: str,
fields: List[str],
values: Iterable[Sequence[Any]],
ignore_duplicates=False,
*,
chunk_size=10_000,
):
"""
Insert multiple records at a time
:param doctype: Doctype name
:param fields: list of fields
:params values: list of list of values
:params values: iterable of values
"""
values = list(values)
table = frappe.qb.DocType(doctype)
for start_index in range(0, len(values), chunk_size):
query = frappe.qb.into(table)
if ignore_duplicates:
# Pypika does not have same api for ignoring duplicates
if self.db_type == "mariadb":
query = query.ignore()
elif self.db_type == "postgres":
query = query.on_conflict().do_nothing()
query = frappe.qb.into(table).columns(fields)
values_to_insert = values[start_index : start_index + chunk_size]
query.columns(fields).insert(*values_to_insert).run()
if ignore_duplicates:
# Pypika does not have same api for ignoring duplicates
if frappe.conf.db_type == "mariadb":
query = query.ignore()
elif frappe.conf.db_type == "postgres":
query = query.on_conflict().do_nothing()
value_iterator = iter(values)
while value_chunk := tuple(itertools.islice(value_iterator, chunk_size)):
query.insert(*value_chunk).run()
def create_sequence(self, *args, **kwargs):
from frappe.database.sequence import create_sequence