feat: bulk update docs using case when queries (#28483)
This commit is contained in:
parent
14510ea96f
commit
6b02484f1c
2 changed files with 188 additions and 0 deletions
|
|
@ -31,6 +31,7 @@ from frappe.database.utils import (
|
|||
)
|
||||
from frappe.exceptions import DoesNotExistError, ImplicitCommitError
|
||||
from frappe.monitor import get_trace_id
|
||||
from frappe.query_builder import Case
|
||||
from frappe.query_builder.functions import Count
|
||||
from frappe.utils import CallbackManager, cint, get_datetime, get_table_name, getdate, now, sbool
|
||||
from frappe.utils import cast as cast_fieldtype
|
||||
|
|
@ -956,6 +957,139 @@ class Database:
|
|||
if dt in self.value_cache:
|
||||
del self.value_cache[dt]
|
||||
|
||||
def bulk_update(
|
||||
self,
|
||||
doctype: str,
|
||||
doc_updates: dict,
|
||||
*,
|
||||
chunk_size: int = 100,
|
||||
modified: str | None = None,
|
||||
modified_by: str | None = None,
|
||||
update_modified: bool = True,
|
||||
debug: bool = False,
|
||||
):
|
||||
"""
|
||||
:param doctype: DocType to update
|
||||
:param doc_updates: Dictionary of key (docname) and values to update
|
||||
:param chunk_size: Number of documents to update in a single transaction
|
||||
:param modified: Use this as the `modified` timestamp.
|
||||
:param modified_by: Set this user as `modified_by`.
|
||||
:param update_modified: default True. Update `modified` and `modified_by` fields
|
||||
:param debug: Print the query in the developer / js console.
|
||||
|
||||
doc_updates should be in the following format:
|
||||
```py
|
||||
{
|
||||
"docname1": {
|
||||
"field1": "value1",
|
||||
"field2": "value2",
|
||||
...
|
||||
},
|
||||
"docname2": {
|
||||
"field1": "value1",
|
||||
"field2": "value2",
|
||||
...
|
||||
},
|
||||
}
|
||||
```
|
||||
|
||||
Note:
|
||||
- Bigger chunk sizes could be less performant. Use appropriate chunk size based on the number of fields to update.
|
||||
|
||||
"""
|
||||
if not doc_updates:
|
||||
return
|
||||
|
||||
modified_dict = None
|
||||
if update_modified:
|
||||
modified_dict = self._get_update_dict(
|
||||
{}, None, modified=modified, modified_by=modified_by, update_modified=update_modified
|
||||
)
|
||||
|
||||
total_docs = len(doc_updates)
|
||||
iterator = iter(doc_updates.items())
|
||||
|
||||
for __ in range(0, total_docs, chunk_size):
|
||||
doc_chunk = dict(itertools.islice(iterator, chunk_size))
|
||||
self._build_and_run_bulk_update_query(doctype, doc_chunk, modified_dict, debug)
|
||||
|
||||
@staticmethod
|
||||
def _build_and_run_bulk_update_query(
|
||||
doctype: str, doc_updates: dict, modified_dict: dict | None = None, debug: bool = False
|
||||
):
|
||||
"""
|
||||
:param doctype: DocType to update
|
||||
:param doc_updates: Dictionary of key (docname) and values to update
|
||||
:param debug: Print the query in the developer / js console.
|
||||
|
||||
---
|
||||
|
||||
doc_updates should be in the following format:
|
||||
```py
|
||||
{
|
||||
"docname1": {
|
||||
"field1": "value1",
|
||||
"field2": "value2",
|
||||
...
|
||||
},
|
||||
"docname2": {
|
||||
"field1": "value1",
|
||||
"field2": "value2",
|
||||
...
|
||||
},
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
Query will be built as:
|
||||
```sql
|
||||
UPDATE `tabItem`
|
||||
SET `status` = CASE
|
||||
WHEN `name` = 'Item-1' THEN 'Close'
|
||||
WHEN `name` = 'Item-2' THEN 'Open'
|
||||
WHEN `name` = 'Item-3' THEN 'Close'
|
||||
WHEN `name` = 'Item-4' THEN 'Cancelled'
|
||||
ELSE `status`
|
||||
end,
|
||||
`description` = CASE
|
||||
WHEN `name` = 'Item-1' THEN 'This is the first task'
|
||||
WHEN `name` = 'Item-2' THEN 'This is the second task'
|
||||
WHEN `name` = 'Item-3' THEN 'This is the third task'
|
||||
WHEN `name` = 'Item-4' THEN 'This is the fourth task'
|
||||
ELSE `description`
|
||||
end
|
||||
WHERE `name` IN ( 'Item-1', 'Item-2', 'Item-3', 'Item-4' )
|
||||
```
|
||||
"""
|
||||
if not doc_updates:
|
||||
return
|
||||
|
||||
dt = frappe.qb.DocType(doctype)
|
||||
update_query = frappe.qb.update(dt)
|
||||
|
||||
conditions = {}
|
||||
docnames = list(doc_updates.keys())
|
||||
|
||||
for docname, row in doc_updates.items():
|
||||
for field, value in row.items():
|
||||
# CASE
|
||||
if field not in conditions:
|
||||
conditions[field] = Case()
|
||||
|
||||
# WHEN
|
||||
conditions[field].when(dt.name == docname, value)
|
||||
|
||||
for field in conditions:
|
||||
# ELSE
|
||||
update_query = update_query.set(dt[field], conditions[field].else_(dt[field]))
|
||||
|
||||
if modified_dict:
|
||||
for column, value in modified_dict.items():
|
||||
update_query = update_query.set(dt[column], value)
|
||||
|
||||
update_query.where(dt.name.isin(docnames)).run(debug=debug)
|
||||
|
||||
def set_global(self, key, val, user="__global"):
|
||||
"""Save a global key value. Global values will be automatically set if they match fieldname."""
|
||||
self.set_default(key, val, user)
|
||||
|
|
|
|||
|
|
@ -550,6 +550,60 @@ class TestDB(IntegrationTestCase):
|
|||
|
||||
frappe.db.delete("ToDo", {"description": test_body})
|
||||
|
||||
def test_bulk_update(self):
|
||||
test_body = f"test_bulk_update - {random_string(10)}"
|
||||
|
||||
frappe.db.bulk_insert(
|
||||
"ToDo",
|
||||
["name", "description"],
|
||||
[[f"ToDo Test Bulk Update {i}", test_body] for i in range(20)],
|
||||
ignore_duplicates=True,
|
||||
)
|
||||
|
||||
record_names = frappe.get_all("ToDo", filters={"description": test_body}, pluck="name")
|
||||
|
||||
new_descriptions = {name: f"{test_body} - updated - {random_string(10)}" for name in record_names}
|
||||
|
||||
# update with same fields to update
|
||||
frappe.db.bulk_update(
|
||||
"ToDo", {name: {"description": new_descriptions[name]} for name in record_names}
|
||||
)
|
||||
|
||||
# check if all records were updated
|
||||
updated_records = dict(
|
||||
frappe.get_all(
|
||||
"ToDo", filters={"name": ("in", record_names)}, fields=["name", "description"], as_list=True
|
||||
)
|
||||
)
|
||||
self.assertDictEqual(new_descriptions, updated_records)
|
||||
|
||||
# update with different fields to update
|
||||
updates = {
|
||||
record_names[0]: {"priority": "High", "status": "Closed"},
|
||||
record_names[1]: {"status": "Closed"},
|
||||
}
|
||||
frappe.db.bulk_update("ToDo", updates)
|
||||
|
||||
priority, status = frappe.db.get_value("ToDo", record_names[0], ["priority", "status"])
|
||||
|
||||
self.assertEqual(priority, "High")
|
||||
self.assertEqual(status, "Closed")
|
||||
|
||||
# further updates with different fields to update
|
||||
updates = {record_names[0]: {"status": "Open"}, record_names[1]: {"priority": "Low"}}
|
||||
frappe.db.bulk_update("ToDo", updates)
|
||||
|
||||
priority, status = frappe.db.get_value("ToDo", record_names[0], ["priority", "status"])
|
||||
self.assertEqual(priority, "High") # should stay the same
|
||||
self.assertEqual(status, "Open")
|
||||
|
||||
priority, status = frappe.db.get_value("ToDo", record_names[1], ["priority", "status"])
|
||||
self.assertEqual(priority, "Low")
|
||||
self.assertEqual(status, "Closed") # should stay the same
|
||||
|
||||
# cleanup
|
||||
frappe.db.delete("ToDo", {"name": ("in", record_names)})
|
||||
|
||||
def test_count(self):
|
||||
frappe.db.delete("Note")
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue