refactor: deferred_insert

* Fallback to insert in realtime if redis conn fails
* Allow List and str inputs for deferred insert API
* Set doctype value unconditionally in insert_record
* Use frappe.logger instead of print
* Add type hints
This commit is contained in:
Gavin D'souza 2022-04-05 18:32:23 +05:30
parent 5b7e7efb57
commit e887208b27

View file

@ -1,12 +1,25 @@
import json
from typing import Dict, List, Union, TYPE_CHECKING
import frappe
import redis
from frappe.utils import cstr
if TYPE_CHECKING:
from frappe.model.document import Document
queue_prefix = 'insert_queue_for_'
def deferred_insert(doctype, records):
frappe.cache().rpush(queue_prefix + doctype, records)
def deferred_insert(doctype: str, records: Union[List[Union[Dict, "Document"]], str]):
if isinstance(records, (dict, list)):
_records = json.dumps(records)
else:
_records = records
try:
frappe.cache().rpush(f"{queue_prefix}{doctype}", _records)
except redis.exceptions.ConnectionError:
for record in records:
insert_record(record, doctype)
def save_to_db():
queue_keys = frappe.cache().get_keys(queue_prefix)
@ -27,17 +40,15 @@ def save_to_db():
frappe.db.commit()
def insert_record(record, doctype):
if not record.get('doctype'):
record['doctype'] = doctype
def insert_record(record: Union[Dict, "Document"], doctype: str):
setattr(record, "doctype", doctype)
try:
doc = frappe.get_doc(record)
doc.insert()
frappe.get_doc(record).insert()
except Exception as e:
print(e, doctype)
frappe.logger().error(f"Error while inserting deferred {doctype} record: {e}")
def get_key_name(key):
def get_key_name(key: str) -> str:
return cstr(key).split('|')[1]
def get_doctype_name(key):
def get_doctype_name(key: str) -> str:
return cstr(key).split(queue_prefix)[1]