refactor: make frappe.db.bulk_insert work as expected (#16527)

## Issue

`frappe.db.bulk_insert` is not working as expected:
 - It will not insert any row if there are less than 3 values
 - It will not add 1st row at all. e.g if I'm adding 5 values, it will only add 4.
 - It will add values one by one after 2 values, instead it should have inserted items (in db) in chunk of 10000 (as per the code written before).

## Changes Made
 - Solved above issues 
 - use better way to chunk list
 - Added Postgres support for bulk_insert API
 
And now `bulk_insert` will only do **1 db call for each 10000** values.

Note: For testing purpose I made `Test Bulk Insert` doctype and keep chunk size of 100.

## Before
![image](https://user-images.githubusercontent.com/43115036/161979365-c1100745-7602-47d2-a9b8-62d797d2039f.png)

## After
![image](https://user-images.githubusercontent.com/43115036/161978344-3c17d56b-2195-40f4-b00c-e9478d4083f1.png)

nodocs
This commit is contained in:
Pruthvi Patel 2022-04-19 20:17:22 +05:30 committed by GitHub
parent d3e7090688
commit 8b010e1732
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 41 additions and 16 deletions

View file

@ -1228,7 +1228,7 @@ class Database(object):
frappe.flags.touched_tables = set()
frappe.flags.touched_tables.update(tables)
def bulk_insert(self, doctype, fields, values, ignore_duplicates=False):
def bulk_insert(self, doctype, fields, values, ignore_duplicates=False, *, chunk_size=10_000):
"""
Insert multiple records at a time
@ -1236,22 +1236,19 @@ class Database(object):
:param fields: list of fields
:params values: list of list of values
"""
insert_list = []
fields = ", ".join("`" + field + "`" for field in fields)
for idx, value in enumerate(values):
insert_list.append(tuple(value))
if idx and (idx % 10000 == 0 or idx < len(values) - 1):
self.sql(
"""INSERT {ignore_duplicates} INTO `tab{doctype}` ({fields}) VALUES {values}""".format(
ignore_duplicates="IGNORE" if ignore_duplicates else "",
doctype=doctype,
fields=fields,
values=", ".join(["%s"] * len(insert_list)),
),
tuple(insert_list),
)
insert_list = []
table = frappe.qb.DocType(doctype)
for start_index in range(0, len(values), chunk_size):
query = frappe.qb.into(table)
if ignore_duplicates:
# Pypika does not have same api for ignoring duplicates
if frappe.conf.db_type == "mariadb":
query = query.ignore()
elif frappe.conf.db_type == "postgres":
query = query.on_conflict().do_nothing()
values_to_insert = values[start_index : start_index + chunk_size]
query.columns(fields).insert(*values_to_insert).run()
def enqueue_jobs_after_commit():

View file

@ -4,6 +4,7 @@
import datetime
import inspect
import unittest
from math import ceil
from random import choice
from unittest.mock import patch
@ -445,6 +446,33 @@ class TestDB(unittest.TestCase):
self.assertEqual(frappe.db.exists(dt, [["name", "=", dn]]), dn)
def test_bulk_insert(self):
current_count = frappe.db.count("ToDo")
test_body = f"test_bulk_insert - {random_string(10)}"
chunk_size = 10
for number_of_values in (1, 2, 5, 27):
current_transaction_writes = frappe.db.transaction_writes
frappe.db.bulk_insert(
"ToDo",
["name", "description"],
[[f"ToDo Test Bulk Insert {i}", test_body] for i in range(number_of_values)],
ignore_duplicates=True,
chunk_size=chunk_size,
)
# check that all records were inserted
self.assertEqual(number_of_values, frappe.db.count("ToDo") - current_count)
# check if inserts were done in chunks
expected_number_of_writes = ceil(number_of_values / chunk_size)
self.assertEqual(
expected_number_of_writes, frappe.db.transaction_writes - current_transaction_writes
)
frappe.db.delete("ToDo", {"description": test_body})
@run_only_if(db_type_is.MARIADB)
class TestDDLCommandsMaria(unittest.TestCase):