mirror of
https://github.com/frappe/frappe.git
synced 2024-06-12 15:52:22 +00:00
26ae0f3460
Signed-off-by: Akhil Narang <me@akhilnarang.dev>
60 lines
1.4 KiB
Python
60 lines
1.4 KiB
Python
import json
|
|
from typing import TYPE_CHECKING, Union
|
|
|
|
import redis
|
|
|
|
import frappe
|
|
from frappe.utils import cstr
|
|
|
|
if TYPE_CHECKING:
|
|
from frappe.model.document import Document
|
|
|
|
queue_prefix = "insert_queue_for_"
|
|
|
|
|
|
def deferred_insert(doctype: str, records: list[Union[dict, "Document"]] | str):
|
|
if isinstance(records, dict | list):
|
|
_records = json.dumps(records)
|
|
else:
|
|
_records = records
|
|
|
|
try:
|
|
frappe.cache.rpush(f"{queue_prefix}{doctype}", _records)
|
|
except redis.exceptions.ConnectionError:
|
|
for record in records:
|
|
insert_record(record, doctype)
|
|
|
|
|
|
def save_to_db():
|
|
queue_keys = frappe.cache.get_keys(queue_prefix)
|
|
for key in queue_keys:
|
|
record_count = 0
|
|
queue_key = get_key_name(key)
|
|
doctype = get_doctype_name(key)
|
|
while frappe.cache.llen(queue_key) > 0 and record_count <= 500:
|
|
records = frappe.cache.lpop(queue_key)
|
|
records = json.loads(records.decode("utf-8"))
|
|
if isinstance(records, dict):
|
|
record_count += 1
|
|
insert_record(records, doctype)
|
|
continue
|
|
for record in records:
|
|
record_count += 1
|
|
insert_record(record, doctype)
|
|
|
|
|
|
def insert_record(record: Union[dict, "Document"], doctype: str):
|
|
try:
|
|
record.update({"doctype": doctype})
|
|
frappe.get_doc(record).insert()
|
|
except Exception as e:
|
|
frappe.logger().error(f"Error while inserting deferred {doctype} record: {e}")
|
|
|
|
|
|
def get_key_name(key: str) -> str:
|
|
return cstr(key).split("|")[1]
|
|
|
|
|
|
def get_doctype_name(key: str) -> str:
|
|
return cstr(key).split(queue_prefix)[1]
|