Merge pull request #35662 from netchampfaris/v2-bulk-endpoints
This commit is contained in:
commit
18215c9d26
2 changed files with 602 additions and 3 deletions
323
frappe/api/v2.py
323
frappe/api/v2.py
|
|
@ -25,6 +25,21 @@ PERMISSION_MAP = {
|
|||
}
|
||||
|
||||
|
||||
def get_bulk_operation_async_threshold(doctype: str | None = None) -> int:
|
||||
conf = frappe.conf.get("bulk_operation_async_threshold", 20)
|
||||
|
||||
if isinstance(conf, dict):
|
||||
value = conf.get(doctype, 20) if doctype else conf.get("*", 20)
|
||||
else:
|
||||
value = conf
|
||||
|
||||
return cint(value)
|
||||
|
||||
|
||||
class FrappeValueError(ValueError):
|
||||
http_status_code = 417
|
||||
|
||||
|
||||
def handle_rpc_call(method: str, doctype: str | None = None):
|
||||
from frappe.modules.utils import load_doctype_module
|
||||
|
||||
|
|
@ -121,8 +136,17 @@ def document_list(doctype: str) -> list[dict[str, Any]]:
|
|||
start: int = cint(args.get("start", 0))
|
||||
limit: int = cint(args.get("limit", 20))
|
||||
group_by: str | None = args.get("group_by", None)
|
||||
debug: bool = args.get("debug", False)
|
||||
as_dict: bool = args.get("as_dict", True)
|
||||
debug: bool = bool(args.get("debug", False))
|
||||
as_dict: bool = bool(args.get("as_dict", True))
|
||||
|
||||
if fields and not isinstance(fields, list):
|
||||
raise FrappeValueError("'fields' must be a list")
|
||||
if filters and not isinstance(filters, (list, dict)):
|
||||
raise FrappeValueError("'filters' must be a list or dictionary")
|
||||
if order_by and not isinstance(order_by, str):
|
||||
raise FrappeValueError("'order_by' must be a string")
|
||||
if group_by and not isinstance(group_by, str):
|
||||
raise FrappeValueError("'group_by' must be a string")
|
||||
|
||||
query = frappe.qb.get_query(
|
||||
table=doctype,
|
||||
|
|
@ -235,6 +259,294 @@ def execute_doc_method(doctype: str, name: str, method: str | None = None):
|
|||
return result
|
||||
|
||||
|
||||
def bulk_delete_docs(doctype: str):
|
||||
"""Bulk delete multiple documents of the same doctype.
|
||||
|
||||
Request body should contain:
|
||||
names: List of document names to delete
|
||||
|
||||
Returns:
|
||||
deleted: List of successfully deleted document names
|
||||
failed: List of failed deletions with error messages
|
||||
total: Total number of documents attempted
|
||||
success_count: Number of successful deletions
|
||||
failure_count: Number of failed deletions
|
||||
"""
|
||||
names = frappe.form_dict.get("names")
|
||||
|
||||
if not isinstance(names, list):
|
||||
raise FrappeValueError("'names' must be a list")
|
||||
|
||||
if len(names) > get_bulk_operation_async_threshold(doctype):
|
||||
job = frappe.enqueue(
|
||||
"frappe.api.v2.execute_bulk_delete_docs",
|
||||
doctype=doctype,
|
||||
names=names,
|
||||
)
|
||||
frappe.response.http_status_code = 202
|
||||
return {"job_id": job.id}
|
||||
|
||||
return execute_bulk_delete_docs(doctype, names)
|
||||
|
||||
|
||||
def execute_bulk_delete_docs(doctype: str, names: list[str | int]):
|
||||
deleted = []
|
||||
failed = []
|
||||
|
||||
for name in names:
|
||||
if not isinstance(name, str | int):
|
||||
failed.append({"name": name, "error": "'name' must be a string or integer"})
|
||||
continue
|
||||
|
||||
if isinstance(name, int):
|
||||
name = str(name)
|
||||
|
||||
savepoint = "bulk_delete_docs"
|
||||
frappe.db.savepoint(savepoint)
|
||||
|
||||
try:
|
||||
frappe.delete_doc(doctype, name, ignore_missing=False)
|
||||
deleted.append(name)
|
||||
except Exception as e:
|
||||
frappe.db.rollback(save_point=savepoint)
|
||||
failed.append({"name": name, "error": str(e)})
|
||||
|
||||
return {
|
||||
"deleted": deleted,
|
||||
"failed": failed,
|
||||
"total": len(names),
|
||||
"success_count": len(deleted),
|
||||
"failure_count": len(failed),
|
||||
}
|
||||
|
||||
|
||||
def bulk_delete():
|
||||
"""Bulk delete documents across multiple doctypes.
|
||||
|
||||
Request body should contain:
|
||||
docs: List of {"doctype": str, "name": str} objects
|
||||
|
||||
Returns:
|
||||
deleted: List of successfully deleted documents
|
||||
failed: List of failed deletions with error messages
|
||||
total: Total number of documents attempted
|
||||
success_count: Number of successful deletions
|
||||
failure_count: Number of failed deletions
|
||||
"""
|
||||
docs = frappe.form_dict.get("docs", [])
|
||||
|
||||
if not isinstance(docs, list):
|
||||
raise FrappeValueError("'docs' must be a list")
|
||||
|
||||
if len(docs) > get_bulk_operation_async_threshold():
|
||||
job = frappe.enqueue(
|
||||
"frappe.api.v2.execute_bulk_delete",
|
||||
docs=docs,
|
||||
)
|
||||
frappe.response.http_status_code = 202
|
||||
return {"job_id": job.id}
|
||||
|
||||
return execute_bulk_delete(docs)
|
||||
|
||||
|
||||
def execute_bulk_delete(docs: list):
|
||||
deleted = []
|
||||
failed = []
|
||||
|
||||
for item in docs:
|
||||
doctype = None
|
||||
name = None
|
||||
savepoint = "bulk_delete"
|
||||
frappe.db.savepoint(savepoint)
|
||||
|
||||
try:
|
||||
if not isinstance(item, dict):
|
||||
raise FrappeValueError("Each document must be a dictionary with 'doctype' and 'name' keys")
|
||||
|
||||
doctype = item.get("doctype")
|
||||
name = item.get("name")
|
||||
|
||||
if not isinstance(doctype, str):
|
||||
raise FrappeValueError("'doctype' must be a string")
|
||||
|
||||
if not isinstance(name, str | int):
|
||||
raise FrappeValueError("'name' must be a string or integer")
|
||||
|
||||
if isinstance(name, int):
|
||||
name = str(name)
|
||||
|
||||
frappe.delete_doc(doctype, name, ignore_missing=False)
|
||||
deleted.append({"doctype": doctype, "name": name})
|
||||
except Exception as e:
|
||||
frappe.db.rollback(save_point=savepoint)
|
||||
failed.append({"doctype": doctype, "name": name, "error": str(e)})
|
||||
|
||||
return {
|
||||
"deleted": deleted,
|
||||
"failed": failed,
|
||||
"total": len(docs),
|
||||
"success_count": len(deleted),
|
||||
"failure_count": len(failed),
|
||||
}
|
||||
|
||||
|
||||
def bulk_update_docs(doctype: str):
|
||||
"""Bulk update multiple documents of the same doctype.
|
||||
|
||||
Request body should contain:
|
||||
docs: List of {"name": str, ...fields} objects where each object contains
|
||||
the document name and the fields to update
|
||||
|
||||
Returns:
|
||||
updated: List of successfully updated document names
|
||||
failed: List of failed updates with error messages
|
||||
total: Total number of documents attempted
|
||||
success_count: Number of successful updates
|
||||
failure_count: Number of failed updates
|
||||
"""
|
||||
docs = frappe.form_dict.get("docs")
|
||||
|
||||
if not isinstance(docs, list):
|
||||
raise FrappeValueError("'docs' must be a list")
|
||||
|
||||
if len(docs) > get_bulk_operation_async_threshold(doctype):
|
||||
job = frappe.enqueue(
|
||||
"frappe.api.v2.execute_bulk_update_docs",
|
||||
doctype=doctype,
|
||||
docs=docs,
|
||||
)
|
||||
frappe.response.http_status_code = 202
|
||||
return {"job_id": job.id}
|
||||
|
||||
return execute_bulk_update_docs(doctype, docs)
|
||||
|
||||
|
||||
def execute_bulk_update_docs(doctype: str, docs: list):
|
||||
updated = []
|
||||
failed = []
|
||||
|
||||
for item in docs:
|
||||
name = None
|
||||
savepoint = "bulk_update_docs"
|
||||
frappe.db.savepoint(savepoint)
|
||||
|
||||
try:
|
||||
if not isinstance(item, dict):
|
||||
raise FrappeValueError("Each update must be a dictionary with 'name' and field values")
|
||||
|
||||
name = item.get("name")
|
||||
if not isinstance(name, str | int):
|
||||
raise FrappeValueError("'name' must be a string or integer")
|
||||
|
||||
if isinstance(name, int):
|
||||
name = str(name)
|
||||
|
||||
doc = frappe.get_doc(doctype, name, for_update=True)
|
||||
item_copy = item.copy()
|
||||
item_copy.pop("name")
|
||||
item_copy.pop("flags", None)
|
||||
|
||||
doc.update(item_copy)
|
||||
doc.save()
|
||||
doc.apply_fieldlevel_read_permissions()
|
||||
|
||||
updated.append(name)
|
||||
frappe.response.docs.append(doc.as_dict())
|
||||
except Exception as e:
|
||||
frappe.db.rollback(save_point=savepoint)
|
||||
failed.append({"name": name, "error": str(e)})
|
||||
|
||||
return {
|
||||
"updated": updated,
|
||||
"failed": failed,
|
||||
"total": len(docs),
|
||||
"success_count": len(updated),
|
||||
"failure_count": len(failed),
|
||||
}
|
||||
|
||||
|
||||
def bulk_update():
|
||||
"""Bulk update documents across multiple doctypes.
|
||||
|
||||
Request body should contain:
|
||||
docs: List of {"doctype": str, "name": str, ...fields} objects
|
||||
|
||||
Returns:
|
||||
updated: List of successfully updated documents
|
||||
failed: List of failed updates with error messages
|
||||
total: Total number of documents attempted
|
||||
success_count: Number of successful updates
|
||||
failure_count: Number of failed updates
|
||||
"""
|
||||
docs = frappe.form_dict.get("docs")
|
||||
|
||||
if not isinstance(docs, list):
|
||||
raise FrappeValueError("'docs' must be a list")
|
||||
|
||||
if len(docs) > get_bulk_operation_async_threshold():
|
||||
job = frappe.enqueue(
|
||||
"frappe.api.v2.execute_bulk_update",
|
||||
docs=docs,
|
||||
)
|
||||
frappe.response.http_status_code = 202
|
||||
return {"job_id": job.id}
|
||||
|
||||
return execute_bulk_update(docs)
|
||||
|
||||
|
||||
def execute_bulk_update(docs: list):
|
||||
updated = []
|
||||
failed = []
|
||||
|
||||
for item in docs:
|
||||
doctype = None
|
||||
name = None
|
||||
savepoint = "bulk_update"
|
||||
frappe.db.savepoint(savepoint)
|
||||
|
||||
try:
|
||||
if not isinstance(item, dict):
|
||||
raise FrappeValueError(
|
||||
"Each document must be a dictionary with 'doctype', 'name', and field values"
|
||||
)
|
||||
|
||||
doctype = item.get("doctype")
|
||||
name = item.get("name")
|
||||
|
||||
if not isinstance(doctype, str):
|
||||
raise FrappeValueError("'doctype' must be a string")
|
||||
|
||||
if not isinstance(name, str | int):
|
||||
raise FrappeValueError("'name' must be a string or integer")
|
||||
|
||||
if isinstance(name, int):
|
||||
name = str(name)
|
||||
|
||||
doc = frappe.get_doc(doctype, name, for_update=True)
|
||||
item_copy = item.copy()
|
||||
item_copy.pop("doctype")
|
||||
item_copy.pop("name")
|
||||
item_copy.pop("flags", None)
|
||||
|
||||
doc.update(item_copy)
|
||||
doc.save()
|
||||
doc.apply_fieldlevel_read_permissions()
|
||||
|
||||
updated.append({"doctype": doctype, "name": name})
|
||||
frappe.response.docs.append(doc.as_dict())
|
||||
except Exception as e:
|
||||
frappe.db.rollback(save_point=savepoint)
|
||||
failed.append({"doctype": doctype, "name": name, "error": str(e)})
|
||||
|
||||
return {
|
||||
"updated": updated,
|
||||
"failed": failed,
|
||||
"total": len(docs),
|
||||
"success_count": len(updated),
|
||||
"failure_count": len(failed),
|
||||
}
|
||||
|
||||
|
||||
def run_doc_method(method: str, document: dict[str, Any] | str, kwargs=None):
|
||||
"""run a whitelisted controller method on in-memory document.
|
||||
|
||||
|
|
@ -248,6 +560,9 @@ def run_doc_method(method: str, document: dict[str, Any] | str, kwargs=None):
|
|||
if isinstance(document, str):
|
||||
document = frappe.parse_json(document)
|
||||
|
||||
if not isinstance(document, dict):
|
||||
raise FrappeValueError("'document' must be a dictionary")
|
||||
|
||||
if kwargs is None:
|
||||
kwargs = {}
|
||||
|
||||
|
|
@ -272,6 +587,8 @@ url_rules = [
|
|||
Rule("/method/logout", endpoint=logout, methods=["POST"]),
|
||||
Rule("/method/ping", endpoint=frappe.ping),
|
||||
Rule("/method/upload_file", endpoint=upload_file, methods=["POST"]),
|
||||
Rule("/method/bulk_delete", endpoint=bulk_delete, methods=["POST"]),
|
||||
Rule("/method/bulk_update", endpoint=bulk_update, methods=["POST"]),
|
||||
Rule("/method/<method>", endpoint=handle_rpc_call),
|
||||
Rule(
|
||||
"/method/run_doc_method",
|
||||
|
|
@ -282,6 +599,8 @@ url_rules = [
|
|||
# Document level APIs
|
||||
Rule("/document/<doctype>", methods=["GET"], endpoint=document_list),
|
||||
Rule("/document/<doctype>", methods=["POST"], endpoint=create_doc),
|
||||
Rule("/document/<doctype>/bulk_delete", methods=["POST"], endpoint=bulk_delete_docs),
|
||||
Rule("/document/<doctype>/bulk_update", methods=["POST"], endpoint=bulk_update_docs),
|
||||
Rule("/document/<doctype>/<path:name>/", methods=["GET"], endpoint=read_doc),
|
||||
Rule("/document/<doctype>/<path:name>/copy", methods=["GET"], endpoint=copy_doc),
|
||||
Rule("/document/<doctype>/<path:name>/", methods=["PATCH", "PUT"], endpoint=update_doc),
|
||||
|
|
|
|||
|
|
@ -265,6 +265,286 @@ class TestMethodAPIV2(FrappeAPITestCase):
|
|||
self.assertEqual(response["data"]["content"], comment_txt)
|
||||
|
||||
|
||||
class TestBulkOperationsV2(FrappeAPITestCase):
|
||||
"""Test bulk delete and bulk update endpoints"""
|
||||
|
||||
version = "v2"
|
||||
DOCTYPE = "ToDo"
|
||||
|
||||
def setUp(self) -> None:
|
||||
self.post(self.method("login"), {"sid": self.sid})
|
||||
return super().setUp()
|
||||
|
||||
def test_bulk_delete_docs_single_doctype(self):
|
||||
# Create docs to delete
|
||||
doc1 = frappe.get_doc({"doctype": self.DOCTYPE, "description": "To delete 1"}).insert()
|
||||
doc2 = frappe.get_doc({"doctype": self.DOCTYPE, "description": "To delete 2"}).insert()
|
||||
frappe.db.commit() # nosemgrep
|
||||
|
||||
# Bulk delete
|
||||
response = self.post(
|
||||
self.resource(self.DOCTYPE, "bulk_delete"),
|
||||
{"names": [doc1.name, doc2.name], "sid": self.sid},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data = response.json["data"]
|
||||
self.assertEqual(data["total"], 2)
|
||||
self.assertEqual(data["success_count"], 2)
|
||||
self.assertEqual(data["failure_count"], 0)
|
||||
self.assertIn(doc1.name, data["deleted"])
|
||||
self.assertIn(doc2.name, data["deleted"])
|
||||
|
||||
# Verify deletion
|
||||
self.assertFalse(frappe.db.exists(self.DOCTYPE, doc1.name))
|
||||
self.assertFalse(frappe.db.exists(self.DOCTYPE, doc2.name))
|
||||
|
||||
def test_bulk_delete_docs_partial_failure(self):
|
||||
# Create one valid doc
|
||||
doc = frappe.get_doc({"doctype": self.DOCTYPE, "description": "To delete"}).insert()
|
||||
frappe.db.commit() # nosemgrep
|
||||
|
||||
# Try to delete valid and non-existent doc
|
||||
non_existent = "non-existent-todo"
|
||||
response = self.post(
|
||||
self.resource(self.DOCTYPE, "bulk_delete"),
|
||||
{"names": [doc.name, non_existent], "sid": self.sid},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data = response.json["data"]
|
||||
self.assertEqual(data["total"], 2)
|
||||
self.assertEqual(data["success_count"], 1)
|
||||
self.assertEqual(data["failure_count"], 1)
|
||||
self.assertIn(doc.name, data["deleted"])
|
||||
self.assertEqual(len(data["failed"]), 1)
|
||||
self.assertEqual(data["failed"][0]["name"], non_existent)
|
||||
|
||||
def test_bulk_delete_cross_doctype(self):
|
||||
# Create docs of different types
|
||||
todo = frappe.get_doc({"doctype": "ToDo", "description": "Test"}).insert()
|
||||
note = frappe.get_doc({"doctype": "Note", "title": "Test Note", "content": "Test"}).insert()
|
||||
frappe.db.commit() # nosemgrep
|
||||
|
||||
# Bulk delete across doctypes
|
||||
response = self.post(
|
||||
self.method("bulk_delete"),
|
||||
{
|
||||
"docs": [
|
||||
{"doctype": "ToDo", "name": todo.name},
|
||||
{"doctype": "Note", "name": note.name},
|
||||
],
|
||||
"sid": self.sid,
|
||||
},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data = response.json["data"]
|
||||
self.assertEqual(data["total"], 2)
|
||||
self.assertEqual(data["success_count"], 2)
|
||||
self.assertEqual(data["failure_count"], 0)
|
||||
|
||||
# Verify deletion
|
||||
self.assertFalse(frappe.db.exists("ToDo", todo.name))
|
||||
self.assertFalse(frappe.db.exists("Note", note.name))
|
||||
|
||||
def test_bulk_delete_invalid_format(self):
|
||||
# Test with invalid format (not a list)
|
||||
response = self.post(
|
||||
self.method("bulk_delete"),
|
||||
{"docs": {"doctype": "ToDo", "name": "test"}, "sid": self.sid},
|
||||
)
|
||||
self.assertEqual(response.status_code, 417)
|
||||
self.assertIn("'docs' must be a list", response.json["errors"][0]["exception"])
|
||||
|
||||
# Test with invalid document format (not dict)
|
||||
response = self.post(
|
||||
self.method("bulk_delete"),
|
||||
{"docs": ["invalid-item"], "sid": self.sid},
|
||||
)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data = response.json["data"]
|
||||
self.assertEqual(data["failure_count"], 1)
|
||||
self.assertIn("must be a dictionary", data["failed"][0]["error"])
|
||||
|
||||
def test_bulk_update_docs_single_doctype(self):
|
||||
# Create fresh docs for this test
|
||||
doc1 = frappe.get_doc({"doctype": self.DOCTYPE, "description": "Original 1"}).insert()
|
||||
doc2 = frappe.get_doc({"doctype": self.DOCTYPE, "description": "Original 2"}).insert()
|
||||
frappe.db.commit() # nosemgrep
|
||||
|
||||
try:
|
||||
# Bulk update
|
||||
response = self.post(
|
||||
self.resource(self.DOCTYPE, "bulk_update"),
|
||||
{
|
||||
"docs": [
|
||||
{"name": doc1.name, "description": "Updated description 1", "priority": "High"},
|
||||
{"name": doc2.name, "description": "Updated description 2", "priority": "Low"},
|
||||
],
|
||||
"sid": self.sid,
|
||||
},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data = response.json["data"]
|
||||
self.assertEqual(data["total"], 2)
|
||||
self.assertEqual(data["success_count"], 2)
|
||||
self.assertEqual(data["failure_count"], 0)
|
||||
self.assertIn(doc1.name, data["updated"])
|
||||
self.assertIn(doc2.name, data["updated"])
|
||||
|
||||
# Verify updates
|
||||
updated_doc1 = frappe.get_doc(self.DOCTYPE, doc1.name)
|
||||
updated_doc2 = frappe.get_doc(self.DOCTYPE, doc2.name)
|
||||
self.assertEqual(updated_doc1.description, "Updated description 1")
|
||||
self.assertEqual(updated_doc1.priority, "High")
|
||||
self.assertEqual(updated_doc2.description, "Updated description 2")
|
||||
self.assertEqual(updated_doc2.priority, "Low")
|
||||
finally:
|
||||
frappe.delete_doc_if_exists(self.DOCTYPE, doc1.name)
|
||||
frappe.delete_doc_if_exists(self.DOCTYPE, doc2.name)
|
||||
frappe.db.commit() # nosemgrep
|
||||
|
||||
def test_bulk_update_cross_doctype(self):
|
||||
# Create test documents
|
||||
todo = frappe.get_doc({"doctype": "ToDo", "description": "Test"}).insert()
|
||||
note = frappe.get_doc({"doctype": "Note", "title": "Test", "content": "Test"}).insert()
|
||||
frappe.db.commit() # nosemgrep
|
||||
|
||||
try:
|
||||
# Bulk update across doctypes
|
||||
response = self.post(
|
||||
self.method("bulk_update"),
|
||||
{
|
||||
"docs": [
|
||||
{"doctype": "ToDo", "name": todo.name, "description": "Updated ToDo"},
|
||||
{"doctype": "Note", "name": note.name, "title": "Updated Note"},
|
||||
],
|
||||
"sid": self.sid,
|
||||
},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data = response.json["data"]
|
||||
self.assertEqual(data["total"], 2)
|
||||
self.assertEqual(data["success_count"], 2)
|
||||
self.assertEqual(data["failure_count"], 0)
|
||||
|
||||
# Verify updates
|
||||
updated_todo = frappe.get_doc("ToDo", todo.name)
|
||||
updated_note = frappe.get_doc("Note", note.name)
|
||||
self.assertEqual(updated_todo.description, "Updated ToDo")
|
||||
self.assertEqual(updated_note.title, "Updated Note")
|
||||
finally:
|
||||
frappe.delete_doc_if_exists("ToDo", todo.name)
|
||||
frappe.delete_doc_if_exists("Note", note.name)
|
||||
frappe.db.commit() # nosemgrep
|
||||
|
||||
def test_bulk_update_partial_failure(self):
|
||||
# Create a fresh doc for this test
|
||||
doc = frappe.get_doc({"doctype": self.DOCTYPE, "description": "Original"}).insert()
|
||||
frappe.db.commit() # nosemgrep
|
||||
valid_doc = doc.name
|
||||
non_existent = "non-existent-todo"
|
||||
|
||||
try:
|
||||
# Try to update valid and non-existent doc
|
||||
response = self.post(
|
||||
self.resource(self.DOCTYPE, "bulk_update"),
|
||||
{
|
||||
"docs": [
|
||||
{"name": valid_doc, "description": "Updated"},
|
||||
{"name": non_existent, "description": "Should fail"},
|
||||
],
|
||||
"sid": self.sid,
|
||||
},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data = response.json["data"]
|
||||
self.assertEqual(data["total"], 2)
|
||||
self.assertEqual(data["success_count"], 1)
|
||||
self.assertEqual(data["failure_count"], 1)
|
||||
self.assertIn(valid_doc, data["updated"])
|
||||
self.assertEqual(len(data["failed"]), 1)
|
||||
self.assertEqual(data["failed"][0]["name"], non_existent)
|
||||
|
||||
# Verify successful update
|
||||
updated_doc = frappe.get_doc(self.DOCTYPE, valid_doc)
|
||||
self.assertEqual(updated_doc.description, "Updated")
|
||||
finally:
|
||||
frappe.delete_doc_if_exists(self.DOCTYPE, valid_doc)
|
||||
frappe.db.commit() # nosemgrep
|
||||
|
||||
def test_bulk_update_invalid_format(self):
|
||||
# Test with invalid format (not a list)
|
||||
response = self.post(
|
||||
self.resource(self.DOCTYPE, "bulk_update"),
|
||||
{"docs": {"name": "test", "description": "test"}, "sid": self.sid},
|
||||
)
|
||||
self.assertEqual(response.status_code, 417)
|
||||
self.assertIn("'docs' must be a list", response.json["errors"][0]["exception"])
|
||||
|
||||
# Test with missing name field
|
||||
response = self.post(
|
||||
self.resource(self.DOCTYPE, "bulk_update"),
|
||||
{"docs": [{"description": "test"}], "sid": self.sid},
|
||||
)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data = response.json["data"]
|
||||
self.assertEqual(data["failure_count"], 1)
|
||||
self.assertIn("'name' must be a string or integer", data["failed"][0]["error"])
|
||||
|
||||
def test_bulk_enqueue(self):
|
||||
# Create 25 docs
|
||||
docs = [
|
||||
frappe.get_doc({"doctype": self.DOCTYPE, "description": f"To delete {i}"}).insert()
|
||||
for i in range(25)
|
||||
]
|
||||
frappe.db.commit() # nosemgrep
|
||||
|
||||
try:
|
||||
# Bulk delete > 20 docs
|
||||
names = [doc.name for doc in docs]
|
||||
response = self.post(
|
||||
self.resource(self.DOCTYPE, "bulk_delete"),
|
||||
{"names": names, "sid": self.sid},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 202)
|
||||
self.assertIn("job_id", response.json["data"])
|
||||
finally:
|
||||
# Clean up
|
||||
for doc in docs:
|
||||
frappe.delete_doc_if_exists(self.DOCTYPE, doc.name)
|
||||
frappe.db.commit() # nosemgrep
|
||||
|
||||
def test_bulk_update_enqueue(self):
|
||||
# Create 25 docs
|
||||
docs = [
|
||||
frappe.get_doc({"doctype": self.DOCTYPE, "description": f"To update {i}"}).insert()
|
||||
for i in range(25)
|
||||
]
|
||||
frappe.db.commit() # nosemgrep
|
||||
|
||||
try:
|
||||
# Bulk update > 20 docs
|
||||
updates = [{"name": doc.name, "description": "Updated"} for doc in docs]
|
||||
response = self.post(
|
||||
self.resource(self.DOCTYPE, "bulk_update"),
|
||||
{"docs": updates, "sid": self.sid},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 202)
|
||||
self.assertIn("job_id", response.json["data"])
|
||||
finally:
|
||||
# Clean up
|
||||
for doc in docs:
|
||||
frappe.delete_doc_if_exists(self.DOCTYPE, doc.name)
|
||||
frappe.db.commit() # nosemgrep
|
||||
|
||||
|
||||
class TestDocTypeAPIV2(FrappeAPITestCase):
|
||||
version = "v2"
|
||||
|
||||
|
|
@ -314,7 +594,7 @@ def generate_admin_keys():
|
|||
from frappe.core.doctype.user.user import generate_keys
|
||||
|
||||
generate_keys("Administrator")
|
||||
frappe.db.commit()
|
||||
frappe.db.commit() # nosemgrep
|
||||
|
||||
|
||||
@whitelist_for_tests()
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue