import json from typing import TYPE_CHECKING, Dict, List, Union import redis import frappe from frappe.utils import cstr if TYPE_CHECKING: from frappe.model.document import Document queue_prefix = "insert_queue_for_" def deferred_insert(doctype: str, records: Union[List[Union[Dict, "Document"]], str]): if isinstance(records, (dict, list)): _records = json.dumps(records) else: _records = records try: frappe.cache().rpush(f"{queue_prefix}{doctype}", _records) except redis.exceptions.ConnectionError: for record in records: insert_record(record, doctype) def save_to_db(): queue_keys = frappe.cache().get_keys(queue_prefix) for key in queue_keys: record_count = 0 queue_key = get_key_name(key) doctype = get_doctype_name(key) while frappe.cache().llen(queue_key) > 0 and record_count <= 500: records = frappe.cache().lpop(queue_key) records = json.loads(records.decode("utf-8")) if isinstance(records, dict): record_count += 1 insert_record(records, doctype) continue for record in records: record_count += 1 insert_record(record, doctype) def insert_record(record: Union[Dict, "Document"], doctype: str): try: record.update({"doctype": doctype}) frappe.get_doc(record).insert() except Exception as e: frappe.logger().error(f"Error while inserting deferred {doctype} record: {e}") def get_key_name(key: str) -> str: return cstr(key).split("|")[1] def get_doctype_name(key: str) -> str: return cstr(key).split(queue_prefix)[1]