Merge branch 'frappe:develop' into fix-test_never_render
This commit is contained in:
commit
4ccfd1e8fa
24 changed files with 314 additions and 154 deletions
|
|
@ -240,14 +240,14 @@ def start_worker_pool(queue, quiet=False, num_workers=2, burst=False):
|
|||
@click.option("--site", help="site name")
|
||||
@pass_context
|
||||
def ready_for_migration(context, site=None):
|
||||
from frappe.utils.doctor import get_pending_jobs
|
||||
from frappe.utils.doctor import any_job_pending
|
||||
|
||||
if not site:
|
||||
site = get_site(context)
|
||||
|
||||
try:
|
||||
frappe.init(site=site)
|
||||
pending_jobs = get_pending_jobs(site=site)
|
||||
pending_jobs = any_job_pending(site=site)
|
||||
|
||||
if pending_jobs:
|
||||
print(f"NOT READY for migration: site {site} has pending background jobs")
|
||||
|
|
|
|||
|
|
@ -33,13 +33,28 @@ class TestDocShare(FrappeTestCase):
|
|||
|
||||
def test_doc_permission(self):
|
||||
frappe.set_user(self.user)
|
||||
|
||||
self.assertFalse(self.event.has_permission())
|
||||
|
||||
frappe.set_user("Administrator")
|
||||
frappe.share.add("Event", self.event.name, self.user)
|
||||
|
||||
frappe.set_user(self.user)
|
||||
self.assertTrue(self.event.has_permission())
|
||||
# PERF: All share permission check should happen with maximum 1 query.
|
||||
with self.assertRowsRead(1):
|
||||
self.assertTrue(self.event.has_permission())
|
||||
|
||||
second_event = frappe.get_doc(
|
||||
{
|
||||
"doctype": "Event",
|
||||
"subject": "test share event 2",
|
||||
"starts_on": "2015-01-01 10:00:00",
|
||||
"event_type": "Private",
|
||||
}
|
||||
).insert()
|
||||
frappe.share.add("Event", second_event.name, self.user)
|
||||
with self.assertRowsRead(1):
|
||||
self.assertTrue(self.event.has_permission())
|
||||
|
||||
def test_share_permission(self):
|
||||
frappe.share.add("Event", self.event.name, self.user, write=1, share=1)
|
||||
|
|
|
|||
|
|
@ -63,23 +63,15 @@ class RQJob(Document):
|
|||
|
||||
order_desc = "desc" in args.get("order_by", "")
|
||||
|
||||
matched_job_ids = RQJob.get_matching_job_ids(args)
|
||||
matched_job_ids = RQJob.get_matching_job_ids(args)[start : start + page_length]
|
||||
|
||||
jobs = []
|
||||
for job_ids in create_batch(matched_job_ids, 100):
|
||||
jobs.extend(
|
||||
serialize_job(job)
|
||||
for job in Job.fetch_many(job_ids=job_ids, connection=get_redis_conn())
|
||||
if job and for_current_site(job)
|
||||
)
|
||||
if len(jobs) > start + page_length:
|
||||
# we have fetched enough. This is inefficient but because of site filtering TINA
|
||||
break
|
||||
conn = get_redis_conn()
|
||||
jobs = [serialize_job(job) for job in Job.fetch_many(job_ids=matched_job_ids, connection=conn)]
|
||||
|
||||
return sorted(jobs, key=lambda j: j.modified, reverse=order_desc)[start : start + page_length]
|
||||
return sorted(jobs, key=lambda j: j.modified, reverse=order_desc)
|
||||
|
||||
@staticmethod
|
||||
def get_matching_job_ids(args):
|
||||
def get_matching_job_ids(args) -> list[str]:
|
||||
filters = make_filter_dict(args.get("filters"))
|
||||
|
||||
queues = _eval_filters(filters.get("queue"), QUEUES)
|
||||
|
|
@ -92,7 +84,7 @@ class RQJob(Document):
|
|||
for status in statuses:
|
||||
matched_job_ids.extend(fetch_job_ids(queue, status))
|
||||
|
||||
return matched_job_ids
|
||||
return filter_current_site_jobs(matched_job_ids)
|
||||
|
||||
@check_permissions
|
||||
def delete(self):
|
||||
|
|
@ -107,8 +99,7 @@ class RQJob(Document):
|
|||
|
||||
@staticmethod
|
||||
def get_count(args) -> int:
|
||||
# Can not be implemented efficiently due to site filtering hence ignored.
|
||||
return 0
|
||||
return len(RQJob.get_matching_job_ids(args))
|
||||
|
||||
# None of these methods apply to virtual job doctype, overriden for sanity.
|
||||
@staticmethod
|
||||
|
|
@ -155,6 +146,12 @@ def for_current_site(job: Job) -> bool:
|
|||
return job.kwargs.get("site") == frappe.local.site
|
||||
|
||||
|
||||
def filter_current_site_jobs(job_ids: list[str]) -> list[str]:
|
||||
site = frappe.local.site
|
||||
|
||||
return [j for j in job_ids if j.startswith(site)]
|
||||
|
||||
|
||||
def _eval_filters(filter, values: list[str]) -> list[str]:
|
||||
if filter:
|
||||
operator, operand = filter
|
||||
|
|
@ -186,10 +183,13 @@ def remove_failed_jobs():
|
|||
frappe.only_for("System Manager")
|
||||
for queue in get_queues():
|
||||
fail_registry = queue.failed_job_registry
|
||||
for job_ids in create_batch(fail_registry.get_job_ids(), 100):
|
||||
for job in Job.fetch_many(job_ids=job_ids, connection=get_redis_conn()):
|
||||
if job and for_current_site(job):
|
||||
fail_registry.remove(job, delete_job=True)
|
||||
failed_jobs = filter_current_site_jobs(fail_registry.get_job_ids())
|
||||
|
||||
# Delete in batches to avoid loading too many things in memory
|
||||
conn = get_redis_conn()
|
||||
for job_ids in create_batch(failed_jobs, 100):
|
||||
for job in Job.fetch_many(job_ids=job_ids, connection=conn):
|
||||
job and fail_registry.remove(job, delete_job=True)
|
||||
|
||||
|
||||
def get_all_queued_jobs():
|
||||
|
|
|
|||
|
|
@ -15,7 +15,6 @@ frappe.listview_settings["RQ Job"] = {
|
|||
);
|
||||
|
||||
if (listview.list_view_settings) {
|
||||
listview.list_view_settings.disable_count = 1;
|
||||
listview.list_view_settings.disable_sidebar_stats = 1;
|
||||
}
|
||||
|
||||
|
|
@ -57,6 +56,6 @@ frappe.listview_settings["RQ Job"] = {
|
|||
}
|
||||
|
||||
listview.refresh();
|
||||
}, 5000);
|
||||
}, 15000);
|
||||
},
|
||||
};
|
||||
|
|
|
|||
|
|
@ -108,13 +108,27 @@ class TestRQJob(FrappeTestCase):
|
|||
self.assertIn("quitting", cstr(stderr))
|
||||
|
||||
@timeout(20)
|
||||
def test_job_id_dedup(self):
|
||||
def test_job_id_manual_dedup(self):
|
||||
job_id = "test_dedup"
|
||||
job = frappe.enqueue(self.BG_JOB, sleep=5, job_id=job_id)
|
||||
self.assertTrue(is_job_enqueued(job_id))
|
||||
self.check_status(job, "finished")
|
||||
self.assertFalse(is_job_enqueued(job_id))
|
||||
|
||||
@timeout(20)
|
||||
def test_auto_job_dedup(self):
|
||||
job_id = "test_dedup"
|
||||
job1 = frappe.enqueue(self.BG_JOB, sleep=2, job_id=job_id, deduplicate=True)
|
||||
job2 = frappe.enqueue(self.BG_JOB, sleep=5, job_id=job_id, deduplicate=True)
|
||||
self.assertIsNone(job2)
|
||||
self.check_status(job1, "finished") # wait
|
||||
|
||||
# Failed jobs last longer, subsequent job should still pass with same ID.
|
||||
job3 = frappe.enqueue(self.BG_JOB, fail=True, job_id=job_id, deduplicate=True)
|
||||
self.check_status(job3, "failed")
|
||||
job4 = frappe.enqueue(self.BG_JOB, sleep=1, job_id=job_id, deduplicate=True)
|
||||
self.check_status(job4, "finished")
|
||||
|
||||
@timeout(20)
|
||||
def test_enqueue_after_commit(self):
|
||||
job_id = frappe.generate_hash()
|
||||
|
|
|
|||
|
|
@ -132,7 +132,6 @@ def return_unsubscribed_page(email, doctype, name):
|
|||
def flush(from_test=False):
|
||||
"""flush email queue, every time: called from scheduler"""
|
||||
from frappe.email.doctype.email_queue.email_queue import send_mail
|
||||
from frappe.utils.background_jobs import get_jobs
|
||||
|
||||
# To avoid running jobs inside unit tests
|
||||
if frappe.are_emails_muted():
|
||||
|
|
@ -142,24 +141,16 @@ def flush(from_test=False):
|
|||
if cint(frappe.db.get_default("suspend_email_queue")) == 1:
|
||||
return
|
||||
|
||||
try:
|
||||
queued_jobs = set(get_jobs(site=frappe.local.site, key="job_name")[frappe.local.site])
|
||||
except Exception:
|
||||
queued_jobs = set()
|
||||
|
||||
for row in get_queue():
|
||||
try:
|
||||
job_name = f"email_queue_sendmail_{row.name}"
|
||||
if job_name not in queued_jobs:
|
||||
frappe.enqueue(
|
||||
method=send_mail,
|
||||
email_queue_name=row.name,
|
||||
now=from_test,
|
||||
job_name=job_name,
|
||||
queue="short",
|
||||
)
|
||||
else:
|
||||
frappe.logger().debug(f"Not queueing job {job_name} because it is in queue already")
|
||||
frappe.enqueue(
|
||||
method=send_mail,
|
||||
email_queue_name=row.name,
|
||||
now=from_test,
|
||||
job_id=f"email_queue_sendmail_{row.name}",
|
||||
queue="short",
|
||||
dedupicate=True,
|
||||
)
|
||||
except Exception:
|
||||
frappe.get_doc("Email Queue", row.name).log_error()
|
||||
|
||||
|
|
|
|||
|
|
@ -26,6 +26,7 @@ class Webhook(Document):
|
|||
self.validate_request_url()
|
||||
self.validate_request_body()
|
||||
self.validate_repeating_fields()
|
||||
self.validate_secret()
|
||||
self.preview_document = None
|
||||
|
||||
def on_update(self):
|
||||
|
|
@ -74,6 +75,13 @@ class Webhook(Document):
|
|||
if len(webhook_data) != len(set(webhook_data)):
|
||||
frappe.throw(_("Same Field is entered more than once"))
|
||||
|
||||
def validate_secret(self):
|
||||
if self.enable_security:
|
||||
try:
|
||||
self.get_password("webhook_secret", False).encode("utf8")
|
||||
except Exception:
|
||||
frappe.throw(_("Invalid Webhook Secret"))
|
||||
|
||||
@frappe.whitelist()
|
||||
def generate_preview(self):
|
||||
# This function doesn't need to do anything specific as virtual fields
|
||||
|
|
@ -112,16 +120,21 @@ def get_context(doc):
|
|||
|
||||
|
||||
def enqueue_webhook(doc, webhook) -> None:
|
||||
webhook: Webhook = frappe.get_doc("Webhook", webhook.get("name"))
|
||||
headers = get_webhook_headers(doc, webhook)
|
||||
data = get_webhook_data(doc, webhook)
|
||||
try:
|
||||
webhook: Webhook = frappe.get_doc("Webhook", webhook.get("name"))
|
||||
headers = get_webhook_headers(doc, webhook)
|
||||
data = get_webhook_data(doc, webhook)
|
||||
|
||||
if webhook.is_dynamic_url:
|
||||
request_url = frappe.render_template(webhook.request_url, get_context(doc))
|
||||
else:
|
||||
request_url = webhook.request_url
|
||||
if webhook.is_dynamic_url:
|
||||
request_url = frappe.render_template(webhook.request_url, get_context(doc))
|
||||
else:
|
||||
request_url = webhook.request_url
|
||||
|
||||
except Exception as e:
|
||||
frappe.logger().debug({"enqueue_webhook_error": e})
|
||||
log_request(webhook.name, doc.name, request_url, headers, data)
|
||||
return
|
||||
|
||||
r = None
|
||||
for i in range(3):
|
||||
try:
|
||||
r = requests.request(
|
||||
|
|
|
|||
|
|
@ -118,17 +118,24 @@ def has_permission(
|
|||
|
||||
def false_if_not_shared():
|
||||
if ptype in ("read", "write", "share", "submit", "email", "print"):
|
||||
shared = frappe.share.get_shared(
|
||||
doctype, user, ["read" if ptype in ("email", "print") else ptype]
|
||||
)
|
||||
|
||||
rights = ["read" if ptype in ("email", "print") else ptype]
|
||||
|
||||
if doc:
|
||||
doc_name = get_doc_name(doc)
|
||||
if doc_name in shared:
|
||||
shared = frappe.share.get_shared(
|
||||
doctype,
|
||||
user,
|
||||
rights=rights,
|
||||
filters=[["share_name", "=", doc_name]],
|
||||
limit=1,
|
||||
)
|
||||
|
||||
if shared:
|
||||
if ptype in ("read", "write", "share", "submit") or meta.permissions[0].get(ptype):
|
||||
return True
|
||||
|
||||
elif shared:
|
||||
elif frappe.share.get_shared(doctype, user, rights=rights, limit=1):
|
||||
# if atleast one shared doc of that type, then return True
|
||||
# this is used in db_query to check if permission on DocType
|
||||
return True
|
||||
|
|
|
|||
|
|
@ -174,6 +174,15 @@ frappe.ui.form.ControlAutocomplete = class ControlAutoComplete extends frappe.ui
|
|||
if (typeof options[0] === "string") {
|
||||
options = options.map((o) => ({ label: o, value: o }));
|
||||
}
|
||||
|
||||
options = options.map((o) => {
|
||||
if (typeof o !== "string") {
|
||||
o.label = o.label.toString();
|
||||
o.value = o.value.toString();
|
||||
}
|
||||
return o;
|
||||
});
|
||||
|
||||
return options;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -711,7 +711,7 @@ frappe.views.ListView = class ListView extends frappe.views.BaseList {
|
|||
}
|
||||
|
||||
get_column_html(col, doc) {
|
||||
if (col.type === "Status") {
|
||||
if (col.type === "Status" || col.df?.options == "Workflow State") {
|
||||
return `
|
||||
<div class="list-row-col hidden-xs ellipsis">
|
||||
${this.get_indicator_html(doc)}
|
||||
|
|
|
|||
|
|
@ -202,14 +202,16 @@ frappe.dashboard_utils = {
|
|||
},
|
||||
|
||||
get_all_filters(doc) {
|
||||
let filters = JSON.parse(doc.filters_json || "null");
|
||||
let dynamic_filters = JSON.parse(doc.dynamic_filters_json || "null");
|
||||
let filters = doc.filters_json ? JSON.parse(doc.filters_json) : null;
|
||||
let dynamic_filters = doc.dynamic_filters_json
|
||||
? JSON.parse(doc.dynamic_filters_json)
|
||||
: null;
|
||||
|
||||
if (!dynamic_filters) {
|
||||
if (!dynamic_filters || !Object.keys(dynamic_filters).length) {
|
||||
return filters;
|
||||
}
|
||||
|
||||
if ($.isArray(dynamic_filters)) {
|
||||
if (Array.isArray(dynamic_filters)) {
|
||||
dynamic_filters.forEach((f) => {
|
||||
try {
|
||||
f[3] = eval(f[3]);
|
||||
|
|
|
|||
|
|
@ -100,11 +100,11 @@
|
|||
|
||||
.group-by-button {
|
||||
margin: 5px;
|
||||
max-width: 125px;
|
||||
}
|
||||
|
||||
.group-by-button.btn-primary-light {
|
||||
color: var(--text-on-blue);
|
||||
outline: 1px solid var(--bg-dark-blue);
|
||||
}
|
||||
|
||||
.group-by-icon.active {
|
||||
|
|
|
|||
|
|
@ -100,10 +100,10 @@ def emit_via_redis(event, message, room):
|
|||
:param event: Event name, like `task_progress` etc.
|
||||
:param message: JSON message object. For async must contain `task_id`
|
||||
:param room: name of the room"""
|
||||
from frappe.utils.background_jobs import get_redis_conn
|
||||
from frappe.utils.background_jobs import get_redis_connection_without_auth
|
||||
|
||||
with suppress(redis.exceptions.ConnectionError):
|
||||
r = get_redis_conn()
|
||||
r = get_redis_connection_without_auth()
|
||||
r.publish("events", frappe.as_json({"event": event, "message": message, "room": room}))
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -18,10 +18,9 @@ import frappe.translate
|
|||
import frappe.utils
|
||||
from frappe import _
|
||||
from frappe.cache_manager import clear_user_cache
|
||||
from frappe.query_builder import DocType, Order
|
||||
from frappe.query_builder.functions import Now
|
||||
from frappe.query_builder.utils import PseudoColumn
|
||||
from frappe.query_builder import Order
|
||||
from frappe.utils import cint, cstr, get_assets_json
|
||||
from frappe.utils.data import add_to_date
|
||||
|
||||
|
||||
@frappe.whitelist()
|
||||
|
|
@ -62,7 +61,7 @@ def get_sessions_to_clear(user=None, keep_current=False):
|
|||
simultaneous_sessions = frappe.db.get_value("User", user, "simultaneous_sessions") or 1
|
||||
offset = simultaneous_sessions - 1
|
||||
|
||||
session = DocType("Sessions")
|
||||
session = frappe.qb.DocType("Sessions")
|
||||
session_id = frappe.qb.from_(session).where(session.user == user)
|
||||
if keep_current:
|
||||
session_id = session_id.where(session.sid != frappe.session.sid)
|
||||
|
|
@ -88,7 +87,7 @@ def delete_session(sid=None, user=None, reason="Session Expired"):
|
|||
frappe.cache.hdel("session", sid)
|
||||
frappe.cache.hdel("last_db_session_update", sid)
|
||||
if sid and not user:
|
||||
table = DocType("Sessions")
|
||||
table = frappe.qb.DocType("Sessions")
|
||||
user_details = (
|
||||
frappe.qb.from_(table).where(table.sid == sid).select(table.user).run(as_dict=True)
|
||||
)
|
||||
|
|
@ -112,17 +111,12 @@ def clear_all_sessions(reason=None):
|
|||
def get_expired_sessions():
|
||||
"""Returns list of expired sessions"""
|
||||
|
||||
sessions = DocType("Sessions")
|
||||
|
||||
return frappe.db.get_values(
|
||||
sessions,
|
||||
filters=(
|
||||
PseudoColumn(f"({Now()} - {sessions.lastupdate.get_sql()})") > get_expiry_period_for_query()
|
||||
),
|
||||
fieldname="sid",
|
||||
order_by=None,
|
||||
pluck=True,
|
||||
)
|
||||
sessions = frappe.qb.DocType("Sessions")
|
||||
return (
|
||||
frappe.qb.from_(sessions)
|
||||
.select(sessions.sid)
|
||||
.where(sessions.lastupdate < get_expired_threshold())
|
||||
).run(pluck=True)
|
||||
|
||||
|
||||
def clear_expired_sessions():
|
||||
|
|
@ -232,7 +226,7 @@ class Session:
|
|||
sid = frappe.generate_hash()
|
||||
|
||||
self.data.user = self.user
|
||||
self.data.sid = sid
|
||||
self.sid = self.data.sid = sid
|
||||
self.data.data.user = self.user
|
||||
self.data.data.session_ip = frappe.local.request_ip
|
||||
if self.user != "Guest":
|
||||
|
|
@ -268,14 +262,17 @@ class Session:
|
|||
frappe.db.commit()
|
||||
|
||||
def insert_session_record(self):
|
||||
frappe.db.sql(
|
||||
"""insert into `tabSessions`
|
||||
(`sessiondata`, `user`, `lastupdate`, `sid`, `status`)
|
||||
values (%s , %s, NOW(), %s, 'Active')""",
|
||||
(str(self.data["data"]), self.data["user"], self.data["sid"]),
|
||||
)
|
||||
|
||||
# also add to memcache
|
||||
Sessions = frappe.qb.DocType("Sessions")
|
||||
now = frappe.utils.now()
|
||||
|
||||
(
|
||||
frappe.qb.into(Sessions)
|
||||
.columns(
|
||||
Sessions.sessiondata, Sessions.user, Sessions.lastupdate, Sessions.sid, Sessions.status
|
||||
)
|
||||
.insert((str(self.data["data"]), self.data["user"], now, self.data["sid"], "Active"))
|
||||
).run()
|
||||
frappe.cache.hset("session", self.data.sid, self.data)
|
||||
|
||||
def resume(self):
|
||||
|
|
@ -338,20 +335,18 @@ class Session:
|
|||
return data and data.data
|
||||
|
||||
def get_session_data_from_db(self):
|
||||
sessions = DocType("Sessions")
|
||||
rec = frappe.db.get_values(
|
||||
sessions,
|
||||
filters=(sessions.sid == self.sid)
|
||||
& (
|
||||
PseudoColumn(f"({Now()} - {sessions.lastupdate.get_sql()})") < get_expiry_period_for_query()
|
||||
),
|
||||
fieldname=["user", "sessiondata"],
|
||||
order_by=None,
|
||||
)
|
||||
sessions = frappe.qb.DocType("Sessions")
|
||||
|
||||
if rec:
|
||||
data = frappe._dict(frappe.safe_eval(rec and rec[0][1] or "{}"))
|
||||
data.user = rec[0][0]
|
||||
record = (
|
||||
frappe.qb.from_(sessions)
|
||||
.select(sessions.user, sessions.sessiondata)
|
||||
.where(sessions.sid == self.sid)
|
||||
.where(sessions.lastupdate > get_expired_threshold())
|
||||
).run()
|
||||
|
||||
if record:
|
||||
data = frappe._dict(frappe.safe_eval(record and record[0][1] or "{}"))
|
||||
data.user = record[0][0]
|
||||
else:
|
||||
self._delete_session()
|
||||
data = None
|
||||
|
|
@ -373,6 +368,8 @@ class Session:
|
|||
|
||||
now = frappe.utils.now()
|
||||
|
||||
Sessions = frappe.qb.DocType("Sessions")
|
||||
|
||||
self.data["data"]["last_updated"] = now
|
||||
self.data["data"]["lang"] = str(frappe.lang)
|
||||
|
||||
|
|
@ -384,17 +381,14 @@ class Session:
|
|||
updated_in_db = False
|
||||
if (force or (time_diff is None) or (time_diff > 600)) and not frappe.flags.read_only:
|
||||
# update sessions table
|
||||
frappe.db.sql(
|
||||
"""update `tabSessions` set sessiondata=%s,
|
||||
lastupdate=NOW() where sid=%s""",
|
||||
(str(self.data["data"]), self.data["sid"]),
|
||||
)
|
||||
(
|
||||
frappe.qb.update(Sessions)
|
||||
.where(Sessions.sid == self.data["sid"])
|
||||
.set(Sessions.sessiondata, str(self.data["data"]))
|
||||
.set(Sessions.lastupdate, now)
|
||||
).run()
|
||||
|
||||
# update last active in user table
|
||||
frappe.db.sql(
|
||||
"""update `tabUser` set last_active=%(now)s where name=%(name)s""",
|
||||
{"now": now, "name": frappe.session.user},
|
||||
)
|
||||
frappe.db.set_value("User", frappe.session.user, "last_active", now, update_modified=False)
|
||||
|
||||
frappe.db.commit()
|
||||
frappe.cache.hset("last_db_session_update", self.sid, now)
|
||||
|
|
@ -421,6 +415,15 @@ def get_expiry_in_seconds(expiry=None):
|
|||
return (cint(parts[0]) * 3600) + (cint(parts[1]) * 60) + cint(parts[2])
|
||||
|
||||
|
||||
def get_expired_threshold():
|
||||
"""Get cutoff time before which all sessions are considered expired."""
|
||||
|
||||
now = frappe.utils.now()
|
||||
expiry_in_seconds = get_expiry_in_seconds()
|
||||
|
||||
return add_to_date(now, seconds=-expiry_in_seconds, as_string=True)
|
||||
|
||||
|
||||
def get_expiry_period():
|
||||
exp_sec = frappe.defaults.get_global_default("session_expiry") or "06:00:00"
|
||||
|
||||
|
|
|
|||
|
|
@ -141,7 +141,7 @@ def get_users(doctype, name):
|
|||
)
|
||||
|
||||
|
||||
def get_shared(doctype, user=None, rights=None):
|
||||
def get_shared(doctype, user=None, rights=None, *, filters=None, limit=None):
|
||||
"""Get list of shared document names for given user and DocType.
|
||||
|
||||
:param doctype: DocType of which shared names are queried.
|
||||
|
|
@ -154,14 +154,22 @@ def get_shared(doctype, user=None, rights=None):
|
|||
if not rights:
|
||||
rights = ["read"]
|
||||
|
||||
filters = [[right, "=", 1] for right in rights]
|
||||
filters += [["share_doctype", "=", doctype]]
|
||||
share_filters = [[right, "=", 1] for right in rights]
|
||||
share_filters += [["share_doctype", "=", doctype]]
|
||||
if filters:
|
||||
share_filters += filters
|
||||
|
||||
or_filters = [["user", "=", user]]
|
||||
if user != "Guest":
|
||||
or_filters += [["everyone", "=", 1]]
|
||||
|
||||
shared_docs = frappe.get_all(
|
||||
"DocShare", fields=["share_name"], filters=filters, or_filters=or_filters, order_by=None
|
||||
"DocShare",
|
||||
fields=["share_name"],
|
||||
filters=share_filters,
|
||||
or_filters=or_filters,
|
||||
order_by=None,
|
||||
limit_page_length=limit,
|
||||
)
|
||||
|
||||
return [doc.share_name for doc in shared_docs]
|
||||
|
|
|
|||
|
|
@ -1,14 +1,18 @@
|
|||
# Copyright (c) 2021, Frappe Technologies Pvt. Ltd. and Contributors
|
||||
# License: MIT. See LICENSE
|
||||
import time
|
||||
from unittest.mock import patch
|
||||
|
||||
import requests
|
||||
|
||||
import frappe
|
||||
import frappe.utils
|
||||
from frappe.auth import LoginAttemptTracker
|
||||
from frappe.frappeclient import AuthError, FrappeClient
|
||||
from frappe.sessions import Session, get_expired_sessions, get_expiry_in_seconds
|
||||
from frappe.tests.test_api import FrappeAPITestCase
|
||||
from frappe.tests.utils import FrappeTestCase
|
||||
from frappe.utils import get_site_url, now
|
||||
from frappe.utils.data import add_to_date
|
||||
from frappe.www.login import _generate_temporary_login_link
|
||||
|
||||
|
||||
|
|
@ -26,9 +30,7 @@ class TestAuth(FrappeTestCase):
|
|||
@classmethod
|
||||
def setUpClass(cls):
|
||||
super().setUpClass()
|
||||
cls.HOST_NAME = frappe.get_site_config().host_name or frappe.utils.get_site_url(
|
||||
frappe.local.site
|
||||
)
|
||||
cls.HOST_NAME = frappe.get_site_config().host_name or get_site_url(frappe.local.site)
|
||||
cls.test_user_email = "test_auth@test.com"
|
||||
cls.test_user_name = "test_auth_user"
|
||||
cls.test_user_mobile = "+911234567890"
|
||||
|
|
@ -197,3 +199,27 @@ class TestLoginAttemptTracker(FrappeTestCase):
|
|||
|
||||
tracker.add_failure_attempt()
|
||||
self.assertTrue(tracker.is_user_allowed())
|
||||
|
||||
|
||||
class TestSessionExpirty(FrappeAPITestCase):
|
||||
def test_session_expires(self):
|
||||
sid = self.sid # triggers login for test case login
|
||||
s: Session = frappe.local.session_obj
|
||||
|
||||
expiry_in = get_expiry_in_seconds()
|
||||
session_created = now()
|
||||
|
||||
# Try with 1% increments of times, it should always work
|
||||
for step in range(0, 100, 1):
|
||||
seconds_elapsed = expiry_in * step / 100
|
||||
|
||||
time_now = add_to_date(session_created, seconds=seconds_elapsed, as_string=True)
|
||||
with patch("frappe.utils.now", return_value=time_now):
|
||||
data = s.get_session_data_from_db()
|
||||
self.assertEqual(data.user, "Administrator")
|
||||
|
||||
# 1% higher should immediately expire
|
||||
time_now = add_to_date(session_created, seconds=expiry_in * 1.01, as_string=True)
|
||||
with patch("frappe.utils.now", return_value=time_now):
|
||||
self.assertIn(sid, get_expired_sessions())
|
||||
self.assertFalse(s.get_session_data_from_db())
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ from frappe.tests.utils import FrappeTestCase
|
|||
from frappe.utils.background_jobs import (
|
||||
RQ_JOB_FAILURE_TTL,
|
||||
RQ_RESULTS_TTL,
|
||||
create_job_id,
|
||||
execute_job,
|
||||
generate_qname,
|
||||
get_redis_conn,
|
||||
|
|
@ -54,11 +55,12 @@ class TestBackgroundJobs(FrappeTestCase):
|
|||
|
||||
def test_enqueue_call(self):
|
||||
with patch.object(Queue, "enqueue_call") as mock_enqueue_call:
|
||||
frappe.enqueue(
|
||||
job = frappe.enqueue(
|
||||
"frappe.handler.ping",
|
||||
queue="short",
|
||||
timeout=300,
|
||||
kwargs={"site": frappe.local.site},
|
||||
job_id="test",
|
||||
)
|
||||
|
||||
mock_enqueue_call.assert_called_once_with(
|
||||
|
|
@ -78,7 +80,7 @@ class TestBackgroundJobs(FrappeTestCase):
|
|||
at_front=False,
|
||||
failure_ttl=RQ_JOB_FAILURE_TTL,
|
||||
result_ttl=RQ_RESULTS_TTL,
|
||||
job_id=None,
|
||||
job_id=create_job_id("test"),
|
||||
)
|
||||
|
||||
def test_job_hooks(self):
|
||||
|
|
|
|||
|
|
@ -20,9 +20,11 @@ from unittest.mock import patch
|
|||
import click
|
||||
from click import Command
|
||||
from click.testing import CliRunner, Result
|
||||
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed
|
||||
|
||||
# imports - module imports
|
||||
import frappe
|
||||
import frappe.commands.scheduler
|
||||
import frappe.commands.site
|
||||
import frappe.commands.utils
|
||||
import frappe.recorder
|
||||
|
|
@ -760,6 +762,19 @@ class TestBenchBuild(BaseTestCommands):
|
|||
)
|
||||
|
||||
|
||||
class TestSchedulerUtils(BaseTestCommands):
|
||||
# Retry just in case there are stuck queued jobs
|
||||
@retry(
|
||||
retry=retry_if_exception_type(AssertionError),
|
||||
stop=stop_after_attempt(3),
|
||||
wait=wait_fixed(3),
|
||||
reraise=True,
|
||||
)
|
||||
def test_ready_for_migrate(self):
|
||||
with cli(frappe.commands.scheduler.ready_for_migration) as result:
|
||||
self.assertEqual(result.exit_code, 0)
|
||||
|
||||
|
||||
class TestCommandUtils(FrappeTestCase):
|
||||
def test_bench_helper(self):
|
||||
from frappe.utils.bench_helper import get_app_groups
|
||||
|
|
|
|||
|
|
@ -140,27 +140,6 @@ class TestDB(FrappeTestCase):
|
|||
frappe.db.get_value("DocType", "DocField", order_by="creation desc, modified asc, name", run=0),
|
||||
)
|
||||
|
||||
def test_get_value_limits(self):
|
||||
# check both dict and list style filters
|
||||
filters = [{"enabled": 1}, [["enabled", "=", 1]]]
|
||||
for filter in filters:
|
||||
self.assertEqual(1, len(frappe.db.get_values("User", filters=filter, limit=1)))
|
||||
# count of last touched rows as per DB-API 2.0 https://peps.python.org/pep-0249/#rowcount
|
||||
self.assertGreaterEqual(1, cint(frappe.db._cursor.rowcount))
|
||||
self.assertEqual(2, len(frappe.db.get_values("User", filters=filter, limit=2)))
|
||||
self.assertGreaterEqual(2, cint(frappe.db._cursor.rowcount))
|
||||
|
||||
# without limits length == count
|
||||
self.assertEqual(
|
||||
len(frappe.db.get_values("User", filters=filter)), frappe.db.count("User", filter)
|
||||
)
|
||||
|
||||
frappe.db.get_value("User", filters=filter)
|
||||
self.assertGreaterEqual(1, cint(frappe.db._cursor.rowcount))
|
||||
|
||||
frappe.db.exists("User", filter)
|
||||
self.assertGreaterEqual(1, cint(frappe.db._cursor.rowcount))
|
||||
|
||||
def test_escape(self):
|
||||
frappe.db.escape("香港濟生堂製藥有限公司 - IT".encode())
|
||||
|
||||
|
|
|
|||
|
|
@ -27,6 +27,7 @@ from frappe.model.base_document import get_controller
|
|||
from frappe.query_builder.utils import db_type_is
|
||||
from frappe.tests.test_query_builder import run_only_if
|
||||
from frappe.tests.utils import FrappeTestCase
|
||||
from frappe.utils import cint
|
||||
from frappe.website.path_resolver import PathResolver
|
||||
|
||||
|
||||
|
|
@ -70,6 +71,25 @@ class TestPerformance(FrappeTestCase):
|
|||
with self.assertQueryCount(0):
|
||||
get_controller("User")
|
||||
|
||||
def test_get_value_limits(self):
|
||||
# check both dict and list style filters
|
||||
filters = [{"enabled": 1}, [["enabled", "=", 1]]]
|
||||
for filter in filters:
|
||||
with self.assertRowsRead(1):
|
||||
self.assertEqual(1, len(frappe.db.get_values("User", filters=filter, limit=1)))
|
||||
with self.assertRowsRead(2):
|
||||
self.assertEqual(2, len(frappe.db.get_values("User", filters=filter, limit=2)))
|
||||
|
||||
self.assertEqual(
|
||||
len(frappe.db.get_values("User", filters=filter)), frappe.db.count("User", filter)
|
||||
)
|
||||
|
||||
with self.assertRowsRead(1):
|
||||
frappe.db.get_value("User", filters=filter)
|
||||
|
||||
with self.assertRowsRead(1):
|
||||
frappe.db.exists("User", filter)
|
||||
|
||||
def test_db_value_cache(self):
|
||||
"""Link validation if repeated should just use db.value_cache, hence no extra queries"""
|
||||
doc = frappe.get_last_doc("User")
|
||||
|
|
|
|||
|
|
@ -92,6 +92,26 @@ class FrappeTestCase(unittest.TestCase):
|
|||
finally:
|
||||
frappe.db.sql = orig_sql
|
||||
|
||||
@contextmanager
|
||||
def assertRowsRead(self, count):
|
||||
rows_read = 0
|
||||
|
||||
def _sql_with_count(*args, **kwargs):
|
||||
nonlocal rows_read
|
||||
|
||||
ret = orig_sql(*args, **kwargs)
|
||||
# count of last touched rows as per DB-API 2.0 https://peps.python.org/pep-0249/#rowcount
|
||||
rows_read += cint(frappe.db._cursor.rowcount)
|
||||
return ret
|
||||
|
||||
try:
|
||||
orig_sql = frappe.db.sql
|
||||
frappe.db.sql = _sql_with_count
|
||||
yield
|
||||
self.assertLessEqual(rows_read, count, msg="Queries read more rows than expected")
|
||||
finally:
|
||||
frappe.db.sql = orig_sql
|
||||
|
||||
|
||||
class MockedRequestTestCase(FrappeTestCase):
|
||||
def setUp(self):
|
||||
|
|
|
|||
|
|
@ -66,6 +66,7 @@ def enqueue(
|
|||
on_failure: Callable = None,
|
||||
at_front: bool = False,
|
||||
job_id: str = None,
|
||||
deduplicate=False,
|
||||
**kwargs,
|
||||
) -> Job | Any:
|
||||
"""
|
||||
|
|
@ -79,14 +80,28 @@ def enqueue(
|
|||
:param job_name: [DEPRECATED] can be used to name an enqueue call, which can be used to prevent duplicate calls
|
||||
:param now: if now=True, the method is executed via frappe.call
|
||||
:param kwargs: keyword arguments to be passed to the method
|
||||
:param deduplicate: do not re-queue job if it's already queued, requires job_id.
|
||||
:param job_id: Assigning unique job id, which can be checked using `is_job_enqueued`
|
||||
"""
|
||||
# To handle older implementations
|
||||
is_async = kwargs.pop("async", is_async)
|
||||
|
||||
if job_id:
|
||||
# namespace job ids to sites
|
||||
job_id = create_job_id(job_id)
|
||||
if deduplicate:
|
||||
if not job_id:
|
||||
frappe.throw(_("`job_id` paramater is required for deduplication."))
|
||||
job = get_job(job_id)
|
||||
if job and job.get_status() in (JobStatus.QUEUED, JobStatus.STARTED):
|
||||
frappe.logger().debug(f"Not queueing job {job.id} because it is in queue already")
|
||||
return
|
||||
elif job:
|
||||
# delete job to avoid argument issues related to job args
|
||||
# https://github.com/rq/rq/issues/793
|
||||
job.delete()
|
||||
|
||||
# If job exists and is completed then delete it before re-queue
|
||||
|
||||
# namespace job ids to sites
|
||||
job_id = create_job_id(job_id)
|
||||
|
||||
if job_name:
|
||||
deprecation_warning("Using enqueue with `job_name` is deprecated, use `job_id` instead.")
|
||||
|
|
@ -394,8 +409,8 @@ def validate_queue(queue, default_queue_list=None):
|
|||
|
||||
|
||||
@retry(
|
||||
retry=retry_if_exception_type(BusyLoadingError) | retry_if_exception_type(ConnectionError),
|
||||
stop=stop_after_attempt(10),
|
||||
retry=retry_if_exception_type((BusyLoadingError, ConnectionError)),
|
||||
stop=stop_after_attempt(5),
|
||||
wait=wait_fixed(1),
|
||||
reraise=True,
|
||||
)
|
||||
|
|
@ -423,9 +438,7 @@ def get_redis_conn(username=None, password=None):
|
|||
|
||||
try:
|
||||
if not cred:
|
||||
if not _redis_queue_conn:
|
||||
_redis_queue_conn = RedisQueue.get_connection()
|
||||
return _redis_queue_conn
|
||||
return get_redis_connection_without_auth()
|
||||
else:
|
||||
return RedisQueue.get_connection(**cred)
|
||||
except (redis.exceptions.AuthenticationError, redis.exceptions.ResponseError):
|
||||
|
|
@ -440,6 +453,14 @@ def get_redis_conn(username=None, password=None):
|
|||
raise
|
||||
|
||||
|
||||
def get_redis_connection_without_auth():
|
||||
global _redis_queue_conn
|
||||
|
||||
if not _redis_queue_conn:
|
||||
_redis_queue_conn = RedisQueue.get_connection()
|
||||
return _redis_queue_conn
|
||||
|
||||
|
||||
def get_queues() -> list[Queue]:
|
||||
"""Get all the queues linked to the current bench."""
|
||||
queues = Queue.all(connection=get_redis_conn())
|
||||
|
|
@ -475,6 +496,9 @@ def test_job(s):
|
|||
|
||||
def create_job_id(job_id: str) -> str:
|
||||
"""Generate unique job id for deduplication"""
|
||||
|
||||
if not job_id:
|
||||
job_id = str(uuid4())
|
||||
return f"{frappe.local.site}::{job_id}"
|
||||
|
||||
|
||||
|
|
@ -484,9 +508,13 @@ def is_job_enqueued(job_id: str) -> bool:
|
|||
|
||||
def get_job_status(job_id: str) -> JobStatus | None:
|
||||
"""Get RQ job status, returns None if job is not found."""
|
||||
job = get_job(job_id)
|
||||
if job:
|
||||
return job.get_status()
|
||||
|
||||
|
||||
def get_job(job_id: str) -> Job:
|
||||
try:
|
||||
job = Job.fetch(create_job_id(job_id), connection=get_redis_conn())
|
||||
return Job.fetch(create_job_id(job_id), connection=get_redis_conn())
|
||||
except NoSuchJobError:
|
||||
return None
|
||||
|
||||
return job.get_status()
|
||||
|
|
|
|||
|
|
@ -79,6 +79,15 @@ def get_pending_jobs(site=None):
|
|||
return jobs_per_queue
|
||||
|
||||
|
||||
def any_job_pending(site: str) -> bool:
|
||||
for queue in get_queue_list():
|
||||
q = get_queue(queue)
|
||||
for job_id in q.get_job_ids():
|
||||
if job_id.startswith(site):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def check_number_of_workers():
|
||||
return len(get_workers())
|
||||
|
||||
|
|
|
|||
|
|
@ -631,7 +631,7 @@ def get_link_options(web_form_name, doctype, allow_read_on_all_link_options=Fals
|
|||
if title_field and show_title_field_in_link:
|
||||
return json.dumps(link_options, default=str)
|
||||
else:
|
||||
return "\n".join([doc.value for doc in link_options])
|
||||
return "\n".join([str(doc.value) for doc in link_options])
|
||||
|
||||
else:
|
||||
raise frappe.PermissionError(
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue