feat: frappe.job local for reading job info

This commit is contained in:
Ankush Menat 2023-11-04 14:02:13 +05:30
parent ecca32853e
commit 089985e53b
4 changed files with 16 additions and 12 deletions

View file

@ -146,6 +146,7 @@ qb = local("qb")
conf = local("conf")
form = form_dict = local("form_dict")
request = local("request")
job = local("job")
response = local("response")
session = local("session")
user = local("user")

View file

@ -104,12 +104,10 @@ class SMTPServer:
)
def _enqueue_connection_closure(self):
if frappe.request:
if frappe.request and hasattr(frappe.request, "after_response"):
frappe.request.after_response.add(self.quit)
else:
if not hasattr(frappe.local, "open_smtp_connections"):
frappe.local.open_smtp_connections = set()
frappe.local.open_smtp_connections.add(self)
elif frappe.job:
frappe.job.after_job.add(self.quit)
def is_session_active(self):
if self._session:
@ -129,8 +127,3 @@ class SMTPServer:
title=_("Invalid Credentials"),
exc=InvalidEmailCredentials,
)
def close_open_smtp_connections():
for conn in getattr(frappe.local, "open_smtp_connections", []):
conn.quit()

View file

@ -430,7 +430,6 @@ before_job = [
after_job = [
"frappe.monitor.stop",
"frappe.utils.file_lock.release_document_locks",
"frappe.email.smtp.close_open_smtp_connections",
"frappe.utils.telemetry.flush",
]

View file

@ -21,7 +21,7 @@ from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fi
import frappe
import frappe.monitor
from frappe import _
from frappe.utils import cint, cstr, get_bench_id
from frappe.utils import CallbackManager, cint, cstr, get_bench_id
from frappe.utils.commands import log
from frappe.utils.deprecations import deprecation_warning
from frappe.utils.redis_queue import RedisQueue
@ -185,6 +185,7 @@ def run_doc_method(doctype, name, doc_method, **kwargs):
def execute_job(site, method, event, job_name, kwargs, user=None, is_async=True, retry=0):
"""Executes job in a worker, performs commit/rollback and logs if there is any error"""
retval = None
if is_async:
frappe.connect(site)
if os.environ.get("CI"):
@ -199,6 +200,15 @@ def execute_job(site, method, event, job_name, kwargs, user=None, is_async=True,
else:
method_name = cstr(method.__name__)
frappe.local.job = frappe._dict(
site=site,
method=method_name,
job_name=job_name,
kwargs=kwargs,
user=user,
after_job=CallbackManager(),
)
for before_job_task in frappe.get_hooks("before_job"):
frappe.call(before_job_task, method=method_name, kwargs=kwargs, transaction_type="job")
@ -239,6 +249,7 @@ def execute_job(site, method, event, job_name, kwargs, user=None, is_async=True,
finally:
for after_job_task in frappe.get_hooks("after_job"):
frappe.call(after_job_task, method=method_name, kwargs=kwargs, result=retval)
frappe.local.job.after_job.run()
if is_async:
frappe.destroy()