feat: add bulk_update and bulk_delete endpoints to /api/v2

This commit is contained in:
Faris Ansari 2026-01-05 17:05:43 +05:30
parent 261e78e492
commit e7ffb2b3c8
2 changed files with 457 additions and 0 deletions

View file

@ -235,6 +235,206 @@ def execute_doc_method(doctype: str, name: str, method: str | None = None):
return result
def bulk_delete_docs(doctype: str):
"""Bulk delete multiple documents of the same doctype.
Request body should contain:
names: List of document names to delete
Returns:
deleted: List of successfully deleted document names
failed: List of failed deletions with error messages
total: Total number of documents attempted
success_count: Number of successful deletions
failure_count: Number of failed deletions
"""
data = frappe.form_dict
names = frappe.parse_json(data.get("names", "[]"))
if not isinstance(names, list):
frappe.throw(_("'names' must be an array"))
deleted = []
failed = []
for name in names:
try:
frappe.delete_doc(doctype, name, ignore_missing=False)
deleted.append(name)
except Exception as e:
failed.append({"name": name, "error": str(e)})
return {
"deleted": deleted,
"failed": failed,
"total": len(names),
"success_count": len(deleted),
"failure_count": len(failed),
}
def bulk_delete():
"""Bulk delete documents across multiple doctypes.
Request body should contain:
documents: List of {"doctype": str, "name": str} objects
Returns:
deleted: List of successfully deleted documents
failed: List of failed deletions with error messages
total: Total number of documents attempted
success_count: Number of successful deletions
failure_count: Number of failed deletions
"""
data = frappe.form_dict
documents = frappe.parse_json(data.get("documents", "[]"))
if not isinstance(documents, list):
frappe.throw(_("Request body must contain 'documents' as an array"))
deleted = []
failed = []
for item in documents:
doctype = None
name = None
try:
if not isinstance(item, dict):
raise ValueError(_("Each document must be a dictionary with 'doctype' and 'name' keys"))
doctype = item.get("doctype")
name = item.get("name")
if not doctype or not name:
raise ValueError(_("Both 'doctype' and 'name' are required"))
frappe.delete_doc(doctype, name, ignore_missing=False)
deleted.append({"doctype": doctype, "name": name})
except Exception as e:
failed.append({"doctype": doctype, "name": name, "error": str(e)})
return {
"deleted": deleted,
"failed": failed,
"total": len(documents),
"success_count": len(deleted),
"failure_count": len(failed),
}
def bulk_update_docs(doctype: str):
"""Bulk update multiple documents of the same doctype.
Request body should contain:
updates: List of {"name": str, ...fields} objects where each object contains
the document name and the fields to update
Returns:
updated: List of successfully updated document names
failed: List of failed updates with error messages
total: Total number of documents attempted
success_count: Number of successful updates
failure_count: Number of failed updates
"""
data = frappe.form_dict
updates = frappe.parse_json(data.get("updates", "[]"))
if not isinstance(updates, list):
frappe.throw(_("'updates' must be an array"))
updated = []
failed = []
for item in updates:
name = None
try:
if not isinstance(item, dict):
raise ValueError(_("Each update must be a dictionary with 'name' and field values"))
name = item.get("name")
if not name:
raise ValueError(_("'name' is required"))
doc = frappe.get_doc(doctype, name, for_update=True)
item_copy = item.copy()
item_copy.pop("name")
item_copy.pop("flags", None)
doc.update(item_copy)
doc.save()
updated.append(name)
except Exception as e:
failed.append({"name": name, "error": str(e)})
return {
"updated": updated,
"failed": failed,
"total": len(updates),
"success_count": len(updated),
"failure_count": len(failed),
}
def bulk_update():
"""Bulk update documents across multiple doctypes.
Request body should contain:
documents: List of {"doctype": str, "name": str, ...fields} objects
Returns:
updated: List of successfully updated documents
failed: List of failed updates with error messages
total: Total number of documents attempted
success_count: Number of successful updates
failure_count: Number of failed updates
"""
data = frappe.form_dict
documents = frappe.parse_json(data.get("documents", "[]"))
if not isinstance(documents, list):
frappe.throw(_("Request body must contain 'documents' as an array"))
updated = []
failed = []
for item in documents:
doctype = None
name = None
try:
if not isinstance(item, dict):
raise ValueError(
_("Each document must be a dictionary with 'doctype', 'name', and field values")
)
doctype = item.get("doctype")
name = item.get("name")
if not doctype or not name:
raise ValueError(_("Both 'doctype' and 'name' are required"))
doc = frappe.get_doc(doctype, name, for_update=True)
item_copy = item.copy()
item_copy.pop("doctype")
item_copy.pop("name")
item_copy.pop("flags", None)
doc.update(item_copy)
doc.save()
updated.append({"doctype": doctype, "name": name})
except Exception as e:
failed.append({"doctype": doctype, "name": name, "error": str(e)})
return {
"updated": updated,
"failed": failed,
"total": len(documents),
"success_count": len(updated),
"failure_count": len(failed),
}
def run_doc_method(method: str, document: dict[str, Any] | str, kwargs=None):
"""run a whitelisted controller method on in-memory document.
@ -272,6 +472,8 @@ url_rules = [
Rule("/method/logout", endpoint=logout, methods=["POST"]),
Rule("/method/ping", endpoint=frappe.ping),
Rule("/method/upload_file", endpoint=upload_file, methods=["POST"]),
Rule("/method/bulk_delete", endpoint=bulk_delete, methods=["POST"]),
Rule("/method/bulk_update", endpoint=bulk_update, methods=["POST"]),
Rule("/method/<method>", endpoint=handle_rpc_call),
Rule(
"/method/run_doc_method",
@ -282,6 +484,8 @@ url_rules = [
# Document level APIs
Rule("/document/<doctype>", methods=["GET"], endpoint=document_list),
Rule("/document/<doctype>", methods=["POST"], endpoint=create_doc),
Rule("/document/<doctype>/bulk_delete", methods=["POST"], endpoint=bulk_delete_docs),
Rule("/document/<doctype>/bulk_update", methods=["POST"], endpoint=bulk_update_docs),
Rule("/document/<doctype>/<path:name>/", methods=["GET"], endpoint=read_doc),
Rule("/document/<doctype>/<path:name>/copy", methods=["GET"], endpoint=copy_doc),
Rule("/document/<doctype>/<path:name>/", methods=["PATCH", "PUT"], endpoint=update_doc),

View file

@ -265,6 +265,259 @@ class TestMethodAPIV2(FrappeAPITestCase):
self.assertEqual(response["data"]["content"], comment_txt)
class TestBulkOperationsV2(FrappeAPITestCase):
"""Test bulk delete and bulk update endpoints"""
version = "v2"
DOCTYPE = "ToDo"
GENERATED_DOCUMENTS: typing.ClassVar[list] = []
@classmethod
def setUpClass(cls):
super().setUpClass()
# Create test documents
for i in range(10):
doc = frappe.get_doc({"doctype": "ToDo", "description": f"Test bulk operations {i}"}).insert()
cls.GENERATED_DOCUMENTS.append(doc.name)
frappe.db.commit()
@classmethod
def tearDownClass(cls):
frappe.db.commit()
for name in cls.GENERATED_DOCUMENTS:
frappe.delete_doc_if_exists(cls.DOCTYPE, name)
frappe.db.commit()
def setUp(self) -> None:
self.post(self.method("login"), {"sid": self.sid})
return super().setUp()
def test_bulk_delete_docs_single_doctype(self):
# Create docs to delete
doc1 = frappe.get_doc({"doctype": self.DOCTYPE, "description": "To delete 1"}).insert()
doc2 = frappe.get_doc({"doctype": self.DOCTYPE, "description": "To delete 2"}).insert()
frappe.db.commit()
# Bulk delete
response = self.post(
self.resource(self.DOCTYPE, "bulk_delete"),
{"names": frappe.as_json([doc1.name, doc2.name]), "sid": self.sid},
)
self.assertEqual(response.status_code, 200)
data = response.json["data"]
self.assertEqual(data["total"], 2)
self.assertEqual(data["success_count"], 2)
self.assertEqual(data["failure_count"], 0)
self.assertIn(doc1.name, data["deleted"])
self.assertIn(doc2.name, data["deleted"])
# Verify deletion
self.assertFalse(frappe.db.exists(self.DOCTYPE, doc1.name))
self.assertFalse(frappe.db.exists(self.DOCTYPE, doc2.name))
def test_bulk_delete_docs_partial_failure(self):
# Create one valid doc
doc = frappe.get_doc({"doctype": self.DOCTYPE, "description": "To delete"}).insert()
frappe.db.commit()
# Try to delete valid and non-existent doc
non_existent = "non-existent-todo"
response = self.post(
self.resource(self.DOCTYPE, "bulk_delete"),
{"names": frappe.as_json([doc.name, non_existent]), "sid": self.sid},
)
self.assertEqual(response.status_code, 200)
data = response.json["data"]
self.assertEqual(data["total"], 2)
self.assertEqual(data["success_count"], 1)
self.assertEqual(data["failure_count"], 1)
self.assertIn(doc.name, data["deleted"])
self.assertEqual(len(data["failed"]), 1)
self.assertEqual(data["failed"][0]["name"], non_existent)
def test_bulk_delete_cross_doctype(self):
# Create docs of different types
todo = frappe.get_doc({"doctype": "ToDo", "description": "Test"}).insert()
note = frappe.get_doc({"doctype": "Note", "title": "Test Note", "content": "Test"}).insert()
frappe.db.commit()
# Bulk delete across doctypes
response = self.post(
self.method("bulk_delete"),
{
"documents": frappe.as_json(
[
{"doctype": "ToDo", "name": todo.name},
{"doctype": "Note", "name": note.name},
]
),
"sid": self.sid,
},
)
self.assertEqual(response.status_code, 200)
data = response.json["data"]
self.assertEqual(data["total"], 2)
self.assertEqual(data["success_count"], 2)
self.assertEqual(data["failure_count"], 0)
# Verify deletion
self.assertFalse(frappe.db.exists("ToDo", todo.name))
self.assertFalse(frappe.db.exists("Note", note.name))
def test_bulk_delete_invalid_format(self):
# Test with invalid format (not a list)
response = self.post(
self.method("bulk_delete"),
{"documents": frappe.as_json({"doctype": "ToDo", "name": "test"}), "sid": self.sid},
)
self.assertEqual(response.status_code, 417)
# Test with invalid document format (not dict)
response = self.post(
self.method("bulk_delete"),
{"documents": frappe.as_json(["invalid-item"]), "sid": self.sid},
)
self.assertEqual(response.status_code, 200)
data = response.json["data"]
self.assertEqual(data["failure_count"], 1)
def test_bulk_update_docs_single_doctype(self):
# Create fresh docs for this test
doc1 = frappe.get_doc({"doctype": self.DOCTYPE, "description": "Original 1"}).insert()
doc2 = frappe.get_doc({"doctype": self.DOCTYPE, "description": "Original 2"}).insert()
frappe.db.commit()
try:
# Bulk update
response = self.post(
self.resource(self.DOCTYPE, "bulk_update"),
{
"updates": frappe.as_json(
[
{"name": doc1.name, "description": "Updated description 1", "priority": "High"},
{"name": doc2.name, "description": "Updated description 2", "priority": "Low"},
]
),
"sid": self.sid,
},
)
self.assertEqual(response.status_code, 200)
data = response.json["data"]
self.assertEqual(data["total"], 2)
self.assertEqual(data["success_count"], 2)
self.assertEqual(data["failure_count"], 0)
self.assertIn(doc1.name, data["updated"])
self.assertIn(doc2.name, data["updated"])
# Verify updates
updated_doc1 = frappe.get_doc(self.DOCTYPE, doc1.name)
updated_doc2 = frappe.get_doc(self.DOCTYPE, doc2.name)
self.assertEqual(updated_doc1.description, "Updated description 1")
self.assertEqual(updated_doc1.priority, "High")
self.assertEqual(updated_doc2.description, "Updated description 2")
self.assertEqual(updated_doc2.priority, "Low")
finally:
frappe.delete_doc_if_exists(self.DOCTYPE, doc1.name)
frappe.delete_doc_if_exists(self.DOCTYPE, doc2.name)
frappe.db.commit()
def test_bulk_update_cross_doctype(self):
# Create test documents
todo = frappe.get_doc({"doctype": "ToDo", "description": "Test"}).insert()
note = frappe.get_doc({"doctype": "Note", "title": "Test", "content": "Test"}).insert()
frappe.db.commit()
try:
# Bulk update across doctypes
response = self.post(
self.method("bulk_update"),
{
"documents": frappe.as_json(
[
{"doctype": "ToDo", "name": todo.name, "description": "Updated ToDo"},
{"doctype": "Note", "name": note.name, "title": "Updated Note"},
]
),
"sid": self.sid,
},
)
self.assertEqual(response.status_code, 200)
data = response.json["data"]
self.assertEqual(data["total"], 2)
self.assertEqual(data["success_count"], 2)
self.assertEqual(data["failure_count"], 0)
# Verify updates
updated_todo = frappe.get_doc("ToDo", todo.name)
updated_note = frappe.get_doc("Note", note.name)
self.assertEqual(updated_todo.description, "Updated ToDo")
self.assertEqual(updated_note.title, "Updated Note")
finally:
frappe.delete_doc_if_exists("ToDo", todo.name)
frappe.delete_doc_if_exists("Note", note.name)
frappe.db.commit()
def test_bulk_update_partial_failure(self):
# Create a fresh doc for this test
doc = frappe.get_doc({"doctype": self.DOCTYPE, "description": "Original"}).insert()
frappe.db.commit()
valid_doc = doc.name
non_existent = "non-existent-todo"
try:
# Try to update valid and non-existent doc
response = self.post(
self.resource(self.DOCTYPE, "bulk_update"),
{
"updates": frappe.as_json(
[
{"name": valid_doc, "description": "Updated"},
{"name": non_existent, "description": "Should fail"},
]
),
"sid": self.sid,
},
)
self.assertEqual(response.status_code, 200)
data = response.json["data"]
self.assertEqual(data["total"], 2)
self.assertEqual(data["success_count"], 1)
self.assertEqual(data["failure_count"], 1)
self.assertIn(valid_doc, data["updated"])
self.assertEqual(len(data["failed"]), 1)
self.assertEqual(data["failed"][0]["name"], non_existent)
# Verify successful update
updated_doc = frappe.get_doc(self.DOCTYPE, valid_doc)
self.assertEqual(updated_doc.description, "Updated")
finally:
frappe.delete_doc_if_exists(self.DOCTYPE, valid_doc)
frappe.db.commit()
def test_bulk_update_invalid_format(self):
# Test with invalid format (not a list)
response = self.post(
self.resource(self.DOCTYPE, "bulk_update"),
{"updates": frappe.as_json({"name": "test", "description": "test"}), "sid": self.sid},
)
self.assertEqual(response.status_code, 417)
# Test with missing name field
response = self.post(
self.resource(self.DOCTYPE, "bulk_update"),
{"updates": frappe.as_json([{"description": "test"}]), "sid": self.sid},
)
self.assertEqual(response.status_code, 200)
data = response.json["data"]
self.assertEqual(data["failure_count"], 1)
class TestDocTypeAPIV2(FrappeAPITestCase):
version = "v2"