2
0
mirror of https://github.com/frappe/frappe.git synced 2024-06-12 15:52:22 +00:00
frappe/frappe/deferred_insert.py
Akhil Narang 26ae0f3460
fix: ruff fixes
Signed-off-by: Akhil Narang <me@akhilnarang.dev>
2024-02-07 17:04:31 +05:30

60 lines
1.4 KiB
Python

import json
from typing import TYPE_CHECKING, Union
import redis
import frappe
from frappe.utils import cstr
if TYPE_CHECKING:
from frappe.model.document import Document
queue_prefix = "insert_queue_for_"
def deferred_insert(doctype: str, records: list[Union[dict, "Document"]] | str):
if isinstance(records, dict | list):
_records = json.dumps(records)
else:
_records = records
try:
frappe.cache.rpush(f"{queue_prefix}{doctype}", _records)
except redis.exceptions.ConnectionError:
for record in records:
insert_record(record, doctype)
def save_to_db():
queue_keys = frappe.cache.get_keys(queue_prefix)
for key in queue_keys:
record_count = 0
queue_key = get_key_name(key)
doctype = get_doctype_name(key)
while frappe.cache.llen(queue_key) > 0 and record_count <= 500:
records = frappe.cache.lpop(queue_key)
records = json.loads(records.decode("utf-8"))
if isinstance(records, dict):
record_count += 1
insert_record(records, doctype)
continue
for record in records:
record_count += 1
insert_record(record, doctype)
def insert_record(record: Union[dict, "Document"], doctype: str):
try:
record.update({"doctype": doctype})
frappe.get_doc(record).insert()
except Exception as e:
frappe.logger().error(f"Error while inserting deferred {doctype} record: {e}")
def get_key_name(key: str) -> str:
return cstr(key).split("|")[1]
def get_doctype_name(key: str) -> str:
return cstr(key).split(queue_prefix)[1]