refactor: remove event streaming

This commit is contained in:
phot0n 2022-08-23 11:14:53 +05:30
parent 8950c4170b
commit 84ef8a48dd
45 changed files with 0 additions and 2703 deletions

View file

@ -1,73 +0,0 @@
{
"actions": [],
"creation": "2019-09-27 12:46:50.165135",
"doctype": "DocType",
"editable_grid": 1,
"engine": "InnoDB",
"field_order": [
"local_fieldname",
"mapping_type",
"mapping",
"remote_value_filters",
"column_break_5",
"remote_fieldname",
"default_value"
],
"fields": [
{
"fieldname": "remote_fieldname",
"fieldtype": "Data",
"in_list_view": 1,
"label": "Remote Fieldname"
},
{
"fieldname": "local_fieldname",
"fieldtype": "Data",
"in_list_view": 1,
"label": "Local Fieldname",
"reqd": 1
},
{
"fieldname": "column_break_5",
"fieldtype": "Column Break"
},
{
"fieldname": "default_value",
"fieldtype": "Data",
"label": "Default Value"
},
{
"fieldname": "mapping_type",
"fieldtype": "Select",
"label": "Mapping Type",
"options": "\nChild Table\nDocument"
},
{
"depends_on": "eval:doc.mapping_type;",
"fieldname": "mapping",
"fieldtype": "Link",
"label": "Mapping",
"options": "Document Type Mapping"
},
{
"depends_on": "eval:doc.mapping_type==\"Document\";",
"fieldname": "remote_value_filters",
"fieldtype": "Code",
"label": "Remote Value Filters",
"mandatory_depends_on": "eval:doc.mapping_type===\"Document\";",
"options": "JSON"
}
],
"istable": 1,
"links": [],
"modified": "2020-03-19 13:56:36.223799",
"modified_by": "Administrator",
"module": "Event Streaming",
"name": "Document Type Field Mapping",
"owner": "Administrator",
"permissions": [],
"quick_entry": 1,
"sort_field": "modified",
"sort_order": "DESC",
"track_changes": 1
}

View file

@ -1,9 +0,0 @@
# Copyright (c) 2019, Frappe Technologies and contributors
# License: MIT. See LICENSE
# import frappe
from frappe.model.document import Document
class DocumentTypeFieldMapping(Document):
pass

View file

@ -1,37 +0,0 @@
// Copyright (c) 2019, Frappe Technologies and contributors
// For license information, please see license.txt
frappe.ui.form.on("Document Type Mapping", {
local_doctype: function (frm) {
if (frm.doc.local_doctype) {
frappe.model.clear_table(frm.doc, "field_mapping");
let fields = frm.events.get_fields(frm);
$.each(fields, function (i, data) {
let row = frappe.model.add_child(
frm.doc,
"Document Type Field Mapping",
"field_mapping"
);
row.local_fieldname = data;
});
refresh_field("field_mapping");
}
},
get_fields: function (frm) {
let filtered_fields = [];
frappe.model.with_doctype(frm.doc.local_doctype, () => {
frappe.get_meta(frm.doc.local_doctype).fields.map((field) => {
if (
field.fieldname !== "remote_docname" &&
field.fieldname !== "remote_site_name" &&
frappe.model.is_value_type(field) &&
!field.hidden
) {
filtered_fields.push(field.fieldname);
}
});
});
return filtered_fields;
},
});

View file

@ -1,71 +0,0 @@
{
"autoname": "field:mapping_name",
"creation": "2019-09-27 12:45:56.529124",
"doctype": "DocType",
"editable_grid": 1,
"engine": "InnoDB",
"field_order": [
"mapping_name",
"local_doctype",
"remote_doctype",
"section_break_3",
"field_mapping"
],
"fields": [
{
"fieldname": "local_doctype",
"fieldtype": "Link",
"in_list_view": 1,
"label": "Local Document Type",
"options": "DocType",
"reqd": 1
},
{
"fieldname": "remote_doctype",
"fieldtype": "Data",
"in_list_view": 1,
"label": "Remote Document Type",
"reqd": 1
},
{
"fieldname": "section_break_3",
"fieldtype": "Section Break"
},
{
"fieldname": "field_mapping",
"fieldtype": "Table",
"label": "Field Mapping",
"options": "Document Type Field Mapping"
},
{
"fieldname": "mapping_name",
"fieldtype": "Data",
"label": "Mapping Name",
"reqd": 1,
"unique": 1
}
],
"modified": "2019-10-09 08:36:04.621397",
"modified_by": "Administrator",
"module": "Event Streaming",
"name": "Document Type Mapping",
"owner": "Administrator",
"permissions": [
{
"create": 1,
"delete": 1,
"email": 1,
"export": 1,
"print": 1,
"read": 1,
"report": 1,
"role": "System Manager",
"share": 1,
"write": 1
}
],
"quick_entry": 1,
"sort_field": "modified",
"sort_order": "DESC",
"track_changes": 1
}

View file

@ -1,181 +0,0 @@
# Copyright (c) 2019, Frappe Technologies and contributors
# License: MIT. See LICENSE
import json
import frappe
from frappe import _
from frappe.model import child_table_fields, default_fields
from frappe.model.document import Document
class DocumentTypeMapping(Document):
def validate(self):
self.validate_inner_mapping()
def validate_inner_mapping(self):
meta = frappe.get_meta(self.local_doctype)
for field_map in self.field_mapping:
if field_map.local_fieldname not in (default_fields + child_table_fields):
field = meta.get_field(field_map.local_fieldname)
if not field:
frappe.throw(_("Row #{0}: Invalid Local Fieldname").format(field_map.idx))
fieldtype = field.get("fieldtype")
if fieldtype in ["Link", "Dynamic Link", "Table"]:
if not field_map.mapping and not field_map.default_value:
msg = _(
"Row #{0}: Please set Mapping or Default Value for the field {1} since its a dependency field"
).format(field_map.idx, frappe.bold(field_map.local_fieldname))
frappe.throw(msg, title="Inner Mapping Missing")
if field_map.mapping_type == "Document" and not field_map.remote_value_filters:
msg = _(
"Row #{0}: Please set remote value filters for the field {1} to fetch the unique remote dependency document"
).format(field_map.idx, frappe.bold(field_map.remote_fieldname))
frappe.throw(msg, title="Remote Value Filters Missing")
def get_mapping(self, doc, producer_site, update_type):
remote_fields = []
# list of tuples (local_fieldname, dependent_doc)
dependencies = []
for mapping in self.field_mapping:
if doc.get(mapping.remote_fieldname):
if mapping.mapping_type == "Document":
if not mapping.default_value:
dependency = self.get_mapped_dependency(mapping, producer_site, doc)
if dependency:
dependencies.append((mapping.local_fieldname, dependency))
else:
doc[mapping.local_fieldname] = mapping.default_value
if mapping.mapping_type == "Child Table" and update_type != "Update":
doc[mapping.local_fieldname] = get_mapped_child_table_docs(
mapping.mapping, doc[mapping.remote_fieldname], producer_site
)
else:
# copy value into local fieldname key and remove remote fieldname key
doc[mapping.local_fieldname] = doc[mapping.remote_fieldname]
if mapping.local_fieldname != mapping.remote_fieldname:
remote_fields.append(mapping.remote_fieldname)
if not doc.get(mapping.remote_fieldname) and mapping.default_value and update_type != "Update":
doc[mapping.local_fieldname] = mapping.default_value
# remove the remote fieldnames
for field in remote_fields:
doc.pop(field, None)
if update_type != "Update":
doc["doctype"] = self.local_doctype
mapping = {"doc": frappe.as_json(doc)}
if len(dependencies):
mapping["dependencies"] = dependencies
return mapping
def get_mapped_update(self, update, producer_site):
update_diff = frappe._dict(json.loads(update.data))
mapping = update_diff
dependencies = []
if update_diff.changed:
doc_map = self.get_mapping(update_diff.changed, producer_site, "Update")
mapped_doc = doc_map.get("doc")
mapping.changed = json.loads(mapped_doc)
if doc_map.get("dependencies"):
dependencies += doc_map.get("dependencies")
if update_diff.removed:
mapping = self.map_rows_removed(update_diff, mapping)
if update_diff.added:
mapping = self.map_rows(update_diff, mapping, producer_site, operation="added")
if update_diff.row_changed:
mapping = self.map_rows(update_diff, mapping, producer_site, operation="row_changed")
update = {"doc": frappe.as_json(mapping)}
if len(dependencies):
update["dependencies"] = dependencies
return update
def get_mapped_dependency(self, mapping, producer_site, doc):
inner_mapping = frappe.get_doc("Document Type Mapping", mapping.mapping)
filters = json.loads(mapping.remote_value_filters)
for key, value in filters.items():
if value.startswith("eval:"):
val = frappe.safe_eval(value[5:], None, dict(doc=doc))
filters[key] = val
if doc.get(value):
filters[key] = doc.get(value)
matching_docs = producer_site.get_doc(inner_mapping.remote_doctype, filters=filters)
if len(matching_docs):
remote_docname = matching_docs[0].get("name")
remote_doc = producer_site.get_doc(inner_mapping.remote_doctype, remote_docname)
doc = inner_mapping.get_mapping(remote_doc, producer_site, "Insert").get("doc")
return doc
return
def map_rows_removed(self, update_diff, mapping):
removed = []
mapping["removed"] = update_diff.removed
for key, value in update_diff.removed.copy().items():
local_table_name = frappe.db.get_value(
"Document Type Field Mapping",
{"remote_fieldname": key, "parent": self.name},
"local_fieldname",
)
mapping.removed[local_table_name] = value
if local_table_name != key:
removed.append(key)
# remove the remote fieldnames
for field in removed:
mapping.removed.pop(field, None)
return mapping
def map_rows(self, update_diff, mapping, producer_site, operation):
remote_fields = []
for tablename, entries in update_diff.get(operation).copy().items():
local_table_name = frappe.db.get_value(
"Document Type Field Mapping", {"remote_fieldname": tablename}, "local_fieldname"
)
table_map = frappe.db.get_value(
"Document Type Field Mapping",
{"local_fieldname": local_table_name, "parent": self.name},
"mapping",
)
table_map = frappe.get_doc("Document Type Mapping", table_map)
docs = []
for entry in entries:
mapped_doc = table_map.get_mapping(entry, producer_site, "Update").get("doc")
docs.append(json.loads(mapped_doc))
mapping.get(operation)[local_table_name] = docs
if local_table_name != tablename:
remote_fields.append(tablename)
# remove the remote fieldnames
for field in remote_fields:
mapping.get(operation).pop(field, None)
return mapping
def get_mapped_child_table_docs(child_map, table_entries, producer_site):
"""Get mapping for child doctypes"""
child_map = frappe.get_doc("Document Type Mapping", child_map)
mapped_entries = []
remote_fields = []
for child_doc in table_entries:
for mapping in child_map.field_mapping:
if child_doc.get(mapping.remote_fieldname):
child_doc[mapping.local_fieldname] = child_doc[mapping.remote_fieldname]
if mapping.local_fieldname != mapping.remote_fieldname:
child_doc.pop(mapping.remote_fieldname, None)
mapped_entries.append(child_doc)
# remove the remote fieldnames
for field in remote_fields:
child_doc.pop(field, None)
child_doc["doctype"] = child_map.local_doctype
return mapped_entries

View file

@ -1,8 +0,0 @@
# Copyright (c) 2019, Frappe Technologies and Contributors
# License: MIT. See LICENSE
# import frappe
from frappe.tests.utils import FrappeTestCase
class TestDocumentTypeMapping(FrappeTestCase):
pass

View file

@ -1,17 +0,0 @@
// Copyright (c) 2019, Frappe Technologies and contributors
// For license information, please see license.txt
frappe.ui.form.on("Event Consumer", {
refresh: function (frm) {
// formatter for subscribed doctype approval status
frm.set_indicator_formatter("status", function (doc) {
let indicator = "orange";
if (doc.status == "Approved") {
indicator = "green";
} else if (doc.status == "Rejected") {
indicator = "red";
}
return indicator;
});
},
});

View file

@ -1,97 +0,0 @@
{
"actions": [],
"autoname": "field:callback_url",
"creation": "2019-08-26 17:45:15.479530",
"doctype": "DocType",
"editable_grid": 1,
"engine": "InnoDB",
"field_order": [
"consumer_doctypes",
"callback_url",
"section_break_3",
"api_key",
"api_secret",
"column_break_6",
"user",
"incoming_change"
],
"fields": [
{
"fieldname": "callback_url",
"fieldtype": "Data",
"in_list_view": 1,
"label": "Callback URL",
"read_only": 1,
"reqd": 1,
"unique": 1
},
{
"fieldname": "api_key",
"fieldtype": "Data",
"label": "API Key",
"reqd": 1
},
{
"fieldname": "api_secret",
"fieldtype": "Password",
"label": "API Secret",
"reqd": 1
},
{
"fieldname": "user",
"fieldtype": "Link",
"label": "Event Subscriber",
"options": "User",
"read_only": 1,
"reqd": 1
},
{
"fieldname": "section_break_3",
"fieldtype": "Section Break"
},
{
"fieldname": "column_break_6",
"fieldtype": "Column Break"
},
{
"default": "0",
"fieldname": "incoming_change",
"fieldtype": "Check",
"hidden": 1,
"label": "Incoming Change",
"read_only": 1
},
{
"fieldname": "consumer_doctypes",
"fieldtype": "Table",
"label": "Event Consumer Document Types",
"options": "Event Consumer Document Type",
"reqd": 1
}
],
"in_create": 1,
"links": [],
"modified": "2020-09-08 16:42:39.828085",
"modified_by": "Administrator",
"module": "Event Streaming",
"name": "Event Consumer",
"owner": "Administrator",
"permissions": [
{
"create": 1,
"delete": 1,
"email": 1,
"export": 1,
"print": 1,
"read": 1,
"report": 1,
"role": "System Manager",
"share": 1,
"write": 1
}
],
"quick_entry": 1,
"sort_field": "modified",
"sort_order": "DESC",
"track_changes": 1
}

View file

@ -1,216 +0,0 @@
# Copyright (c) 2019, Frappe Technologies and contributors
# License: MIT. See LICENSE
import json
import os
import requests
import frappe
from frappe import _
from frappe.frappeclient import FrappeClient
from frappe.model.document import Document
from frappe.utils.background_jobs import get_jobs
from frappe.utils.data import get_url
class EventConsumer(Document):
def validate(self):
# approve subscribed doctypes for tests
# frappe.flags.in_test won't work here as tests are running on the consumer site
if os.environ.get("CI"):
for entry in self.consumer_doctypes:
entry.status = "Approved"
def on_update(self):
if not self.incoming_change:
doc_before_save = self.get_doc_before_save()
if doc_before_save.api_key != self.api_key or doc_before_save.api_secret != self.api_secret:
return
self.update_consumer_status()
else:
frappe.db.set_value(self.doctype, self.name, "incoming_change", 0)
frappe.cache().delete_value("event_consumer_document_type_map")
def on_trash(self):
for i in frappe.get_all("Event Update Log Consumer", {"consumer": self.name}):
frappe.delete_doc("Event Update Log Consumer", i.name)
frappe.cache().delete_value("event_consumer_document_type_map")
def update_consumer_status(self):
consumer_site = get_consumer_site(self.callback_url)
event_producer = consumer_site.get_doc("Event Producer", get_url())
event_producer = frappe._dict(event_producer)
config = event_producer.producer_doctypes
event_producer.producer_doctypes = []
for entry in config:
if entry.get("has_mapping"):
ref_doctype = consumer_site.get_value(
"Document Type Mapping", "remote_doctype", entry.get("mapping")
).get("remote_doctype")
else:
ref_doctype = entry.get("ref_doctype")
entry["status"] = frappe.db.get_value(
"Event Consumer Document Type", {"parent": self.name, "ref_doctype": ref_doctype}, "status"
)
event_producer.producer_doctypes = config
# when producer doc is updated it updates the consumer doc
# set flag to avoid deadlock
event_producer.incoming_change = True
consumer_site.update(event_producer)
def get_consumer_status(self):
response = requests.get(self.callback_url)
if response.status_code != 200:
return "offline"
return "online"
@frappe.whitelist()
def register_consumer(data):
"""create an event consumer document for registering a consumer"""
data = json.loads(data)
# to ensure that consumer is created only once
if frappe.db.exists("Event Consumer", data["event_consumer"]):
return None
user = data["user"]
if not frappe.db.exists("User", user):
frappe.throw(_("User {0} not found on the producer site").format(user))
if "System Manager" not in frappe.get_roles(user):
frappe.throw(_("Event Subscriber has to be a System Manager."))
consumer = frappe.new_doc("Event Consumer")
consumer.callback_url = data["event_consumer"]
consumer.user = data["user"]
consumer.api_key = data["api_key"]
consumer.api_secret = data["api_secret"]
consumer.incoming_change = True
consumer_doctypes = json.loads(data["consumer_doctypes"])
for entry in consumer_doctypes:
consumer.append(
"consumer_doctypes",
{"ref_doctype": entry.get("doctype"), "status": "Pending", "condition": entry.get("condition")},
)
consumer.insert()
# consumer's 'last_update' field should point to the latest update
# in producer's update log when subscribing
# so that, updates after subscribing are consumed and not the old ones.
last_update = str(get_last_update())
return json.dumps({"last_update": last_update})
def get_consumer_site(consumer_url):
"""create a FrappeClient object for event consumer site"""
consumer_doc = frappe.get_doc("Event Consumer", consumer_url)
consumer_site = FrappeClient(
url=consumer_url,
api_key=consumer_doc.api_key,
api_secret=consumer_doc.get_password("api_secret"),
)
return consumer_site
def get_last_update():
"""get the creation timestamp of last update consumed"""
updates = frappe.get_list(
"Event Update Log", "creation", ignore_permissions=True, limit=1, order_by="creation desc"
)
if updates:
return updates[0].creation
return frappe.utils.now_datetime()
@frappe.whitelist()
def notify_event_consumers(doctype):
"""get all event consumers and set flag for notification status"""
event_consumers = frappe.get_all(
"Event Consumer Document Type", ["parent"], {"ref_doctype": doctype, "status": "Approved"}
)
for entry in event_consumers:
consumer = frappe.get_doc("Event Consumer", entry.parent)
consumer.flags.notified = False
notify(consumer)
@frappe.whitelist()
def notify(consumer):
"""notify individual event consumers about a new update"""
consumer_status = consumer.get_consumer_status()
if consumer_status == "online":
try:
client = get_consumer_site(consumer.callback_url)
client.post_request(
{
"cmd": "frappe.event_streaming.doctype.event_producer.event_producer.new_event_notification",
"producer_url": get_url(),
}
)
consumer.flags.notified = True
except Exception:
consumer.flags.notified = False
else:
consumer.flags.notified = False
# enqueue another job if the site was not notified
if not consumer.flags.notified:
enqueued_method = "frappe.event_streaming.doctype.event_consumer.event_consumer.notify"
jobs = get_jobs()
if not jobs or enqueued_method not in jobs[frappe.local.site] and not consumer.flags.notifed:
frappe.enqueue(
enqueued_method, queue="long", enqueue_after_commit=True, **{"consumer": consumer}
)
def has_consumer_access(consumer, update_log):
"""Checks if consumer has completely satisfied all the conditions on the doc"""
if isinstance(consumer, str):
consumer = frappe.get_doc("Event Consumer", consumer)
if not frappe.db.exists(update_log.ref_doctype, update_log.docname):
# Delete Log
# Check if the last Update Log of this document was read by this consumer
last_update_log = frappe.get_all(
"Event Update Log",
filters={
"ref_doctype": update_log.ref_doctype,
"docname": update_log.docname,
"creation": ["<", update_log.creation],
},
order_by="creation desc",
limit_page_length=1,
)
if not len(last_update_log):
return False
last_update_log = frappe.get_doc("Event Update Log", last_update_log[0].name)
return len([x for x in last_update_log.consumers if x.consumer == consumer.name])
doc = frappe.get_doc(update_log.ref_doctype, update_log.docname)
try:
for dt_entry in consumer.consumer_doctypes:
if dt_entry.ref_doctype != update_log.ref_doctype:
continue
if not dt_entry.condition:
return True
condition: str = dt_entry.condition
if condition.startswith("cmd:"):
cmd = condition.split("cmd:")[1].strip()
args = {"consumer": consumer, "doc": doc, "update_log": update_log}
return frappe.call(cmd, **args)
else:
return frappe.safe_eval(condition, frappe._dict(doc=doc))
except Exception as e:
consumer.log_error("has_consumer_access error")
return False

View file

@ -1,8 +0,0 @@
# Copyright (c) 2019, Frappe Technologies and Contributors
# License: MIT. See LICENSE
# import frappe
from frappe.tests.utils import FrappeTestCase
class TestEventConsumer(FrappeTestCase):
pass

View file

@ -1,61 +0,0 @@
{
"actions": [],
"creation": "2019-10-03 21:10:54.754651",
"doctype": "DocType",
"editable_grid": 1,
"engine": "InnoDB",
"field_order": [
"ref_doctype",
"status",
"unsubscribed",
"condition"
],
"fields": [
{
"columns": 4,
"fieldname": "ref_doctype",
"fieldtype": "Link",
"in_list_view": 1,
"label": "Document Type",
"options": "DocType",
"read_only": 1,
"reqd": 1
},
{
"columns": 4,
"default": "Pending",
"fieldname": "status",
"fieldtype": "Select",
"in_list_view": 1,
"label": "Approval Status",
"options": "Pending\nApproved\nRejected"
},
{
"columns": 2,
"default": "0",
"fieldname": "unsubscribed",
"fieldtype": "Check",
"in_list_view": 1,
"label": "Unsubscribed",
"read_only": 1
},
{
"fieldname": "condition",
"fieldtype": "Code",
"label": "Condition",
"read_only": 1
}
],
"istable": 1,
"links": [],
"modified": "2020-11-07 09:26:49.894294",
"modified_by": "Administrator",
"module": "Event Streaming",
"name": "Event Consumer Document Type",
"owner": "Administrator",
"permissions": [],
"quick_entry": 1,
"sort_field": "modified",
"sort_order": "DESC",
"track_changes": 1
}

View file

@ -1,9 +0,0 @@
# Copyright (c) 2019, Frappe Technologies and contributors
# License: MIT. See LICENSE
# import frappe
from frappe.model.document import Document
class EventConsumerDocumentType(Document):
pass

View file

@ -1,25 +0,0 @@
// Copyright (c) 2019, Frappe Technologies and contributors
// For license information, please see license.txt
frappe.ui.form.on("Event Producer", {
refresh: function (frm) {
frm.set_query("ref_doctype", "producer_doctypes", function () {
return {
filters: {
issingle: 0,
istable: 0,
},
};
});
frm.set_indicator_formatter("status", function (doc) {
let indicator = "orange";
if (doc.status == "Approved") {
indicator = "green";
} else if (doc.status == "Rejected") {
indicator = "red";
}
return indicator;
});
},
});

View file

@ -1,96 +0,0 @@
{
"actions": [],
"autoname": "field:producer_url",
"creation": "2019-08-26 19:17:24.919196",
"doctype": "DocType",
"editable_grid": 1,
"engine": "InnoDB",
"field_order": [
"producer_url",
"producer_doctypes",
"section_break_3",
"api_key",
"api_secret",
"column_break_6",
"user",
"incoming_change"
],
"fields": [
{
"fieldname": "producer_url",
"fieldtype": "Data",
"in_list_view": 1,
"label": "Producer URL",
"reqd": 1,
"unique": 1
},
{
"description": "API Key of the user(Event Subscriber) on the producer site",
"fieldname": "api_key",
"fieldtype": "Data",
"label": "API Key",
"reqd": 1
},
{
"description": "API Secret of the user(Event Subscriber) on the producer site",
"fieldname": "api_secret",
"fieldtype": "Password",
"label": "API Secret",
"reqd": 1
},
{
"fieldname": "user",
"fieldtype": "Link",
"label": "Event Subscriber",
"options": "User",
"reqd": 1,
"set_only_once": 1
},
{
"fieldname": "column_break_6",
"fieldtype": "Column Break"
},
{
"fieldname": "section_break_3",
"fieldtype": "Section Break"
},
{
"default": "0",
"fieldname": "incoming_change",
"fieldtype": "Check",
"hidden": 1,
"label": "Incoming Change"
},
{
"fieldname": "producer_doctypes",
"fieldtype": "Table",
"label": "Event Producer Document Types",
"options": "Event Producer Document Type",
"reqd": 1
}
],
"links": [],
"modified": "2020-10-26 13:00:15.361316",
"modified_by": "Administrator",
"module": "Event Streaming",
"name": "Event Producer",
"owner": "Administrator",
"permissions": [
{
"create": 1,
"delete": 1,
"email": 1,
"export": 1,
"print": 1,
"read": 1,
"report": 1,
"role": "System Manager",
"share": 1,
"write": 1
}
],
"quick_entry": 1,
"sort_field": "modified",
"sort_order": "DESC",
"track_changes": 1
}

View file

@ -1,569 +0,0 @@
# Copyright (c) 2019, Frappe Technologies and contributors
# License: MIT. See LICENSE
import json
import time
import requests
import frappe
from frappe import _
from frappe.custom.doctype.custom_field.custom_field import create_custom_field
from frappe.frappeclient import FrappeClient
from frappe.model.document import Document
from frappe.utils.background_jobs import get_jobs
from frappe.utils.data import get_link_to_form, get_url
from frappe.utils.password import get_decrypted_password
class EventProducer(Document):
def before_insert(self):
self.check_url()
self.validate_event_subscriber()
self.incoming_change = True
self.create_event_consumer()
self.create_custom_fields()
def validate(self):
self.validate_event_subscriber()
if frappe.flags.in_test:
for entry in self.producer_doctypes:
entry.status = "Approved"
def validate_event_subscriber(self):
if not frappe.db.get_value("User", self.user, "api_key"):
frappe.throw(
_("Please generate keys for the Event Subscriber User {0} first.").format(
frappe.bold(get_link_to_form("User", self.user))
)
)
def on_update(self):
if not self.incoming_change:
if frappe.db.exists("Event Producer", self.name):
if not self.api_key or not self.api_secret:
frappe.throw(_("Please set API Key and Secret on the producer and consumer sites first."))
else:
doc_before_save = self.get_doc_before_save()
if doc_before_save.api_key != self.api_key or doc_before_save.api_secret != self.api_secret:
return
self.update_event_consumer()
self.create_custom_fields()
else:
# when producer doc is updated it updates the consumer doc, set flag to avoid deadlock
self.db_set("incoming_change", 0)
self.reload()
def on_trash(self):
last_update = frappe.db.get_value("Event Producer Last Update", dict(event_producer=self.name))
if last_update:
frappe.delete_doc("Event Producer Last Update", last_update)
def check_url(self):
valid_url_schemes = ("http", "https")
frappe.utils.validate_url(self.producer_url, throw=True, valid_schemes=valid_url_schemes)
# remove '/' from the end of the url like http://test_site.com/
# to prevent mismatch in get_url() results
if self.producer_url.endswith("/"):
self.producer_url = self.producer_url[:-1]
def create_event_consumer(self):
"""register event consumer on the producer site"""
if self.is_producer_online():
producer_site = FrappeClient(
url=self.producer_url, api_key=self.api_key, api_secret=self.get_password("api_secret")
)
response = producer_site.post_api(
"frappe.event_streaming.doctype.event_consumer.event_consumer.register_consumer",
params={"data": json.dumps(self.get_request_data())},
)
if response:
response = json.loads(response)
self.set_last_update(response["last_update"])
else:
frappe.throw(
_(
"Failed to create an Event Consumer or an Event Consumer for the current site is already registered."
)
)
def set_last_update(self, last_update):
last_update_doc_name = frappe.db.get_value(
"Event Producer Last Update", dict(event_producer=self.name)
)
if not last_update_doc_name:
frappe.get_doc(
dict(
doctype="Event Producer Last Update",
event_producer=self.producer_url,
last_update=last_update,
)
).insert(ignore_permissions=True)
else:
frappe.db.set_value(
"Event Producer Last Update", last_update_doc_name, "last_update", last_update
)
def get_last_update(self):
return frappe.db.get_value(
"Event Producer Last Update", dict(event_producer=self.name), "last_update"
)
def get_request_data(self):
consumer_doctypes = []
for entry in self.producer_doctypes:
if entry.has_mapping:
# if mapping, subscribe to remote doctype on consumer's site
dt = frappe.db.get_value("Document Type Mapping", entry.mapping, "remote_doctype")
else:
dt = entry.ref_doctype
consumer_doctypes.append({"doctype": dt, "condition": entry.condition})
user_key = frappe.db.get_value("User", self.user, "api_key")
user_secret = get_decrypted_password("User", self.user, "api_secret")
return {
"event_consumer": get_url(),
"consumer_doctypes": json.dumps(consumer_doctypes),
"user": self.user,
"api_key": user_key,
"api_secret": user_secret,
}
def create_custom_fields(self):
"""create custom field to store remote docname and remote site url"""
for entry in self.producer_doctypes:
if not entry.use_same_name:
if not frappe.db.exists(
"Custom Field", {"fieldname": "remote_docname", "dt": entry.ref_doctype}
):
df = dict(
fieldname="remote_docname",
label="Remote Document Name",
fieldtype="Data",
read_only=1,
print_hide=1,
)
create_custom_field(entry.ref_doctype, df)
if not frappe.db.exists(
"Custom Field", {"fieldname": "remote_site_name", "dt": entry.ref_doctype}
):
df = dict(
fieldname="remote_site_name",
label="Remote Site",
fieldtype="Data",
read_only=1,
print_hide=1,
)
create_custom_field(entry.ref_doctype, df)
def update_event_consumer(self):
if self.is_producer_online():
producer_site = get_producer_site(self.producer_url)
event_consumer = producer_site.get_doc("Event Consumer", get_url())
event_consumer = frappe._dict(event_consumer)
if event_consumer:
config = event_consumer.consumer_doctypes
event_consumer.consumer_doctypes = []
for entry in self.producer_doctypes:
if entry.has_mapping:
# if mapping, subscribe to remote doctype on consumer's site
ref_doctype = frappe.db.get_value("Document Type Mapping", entry.mapping, "remote_doctype")
else:
ref_doctype = entry.ref_doctype
event_consumer.consumer_doctypes.append(
{
"ref_doctype": ref_doctype,
"status": get_approval_status(config, ref_doctype),
"unsubscribed": entry.unsubscribe,
"condition": entry.condition,
}
)
event_consumer.user = self.user
event_consumer.incoming_change = True
producer_site.update(event_consumer)
def is_producer_online(self):
"""check connection status for the Event Producer site"""
retry = 3
while retry > 0:
res = requests.get(self.producer_url)
if res.status_code == 200:
return True
retry -= 1
time.sleep(5)
frappe.throw(_("Failed to connect to the Event Producer site. Retry after some time."))
def get_producer_site(producer_url):
"""create a FrappeClient object for event producer site"""
producer_doc = frappe.get_doc("Event Producer", producer_url)
producer_site = FrappeClient(
url=producer_url,
api_key=producer_doc.api_key,
api_secret=producer_doc.get_password("api_secret"),
)
return producer_site
def get_approval_status(config, ref_doctype):
"""check the approval status for consumption"""
for entry in config:
if entry.get("ref_doctype") == ref_doctype:
return entry.get("status")
return "Pending"
@frappe.whitelist()
def pull_producer_data():
"""Fetch data from producer node."""
response = requests.get(get_url())
if response.status_code == 200:
for event_producer in frappe.get_all("Event Producer"):
pull_from_node(event_producer.name)
return "success"
return None
@frappe.whitelist()
def pull_from_node(event_producer):
"""pull all updates after the last update timestamp from event producer site"""
event_producer = frappe.get_doc("Event Producer", event_producer)
producer_site = get_producer_site(event_producer.producer_url)
last_update = event_producer.get_last_update()
(doctypes, mapping_config, naming_config) = get_config(event_producer.producer_doctypes)
updates = get_updates(producer_site, last_update, doctypes)
for update in updates:
update.use_same_name = naming_config.get(update.ref_doctype)
mapping = mapping_config.get(update.ref_doctype)
if mapping:
update.mapping = mapping
update = get_mapped_update(update, producer_site)
if not update.update_type == "Delete":
update.data = json.loads(update.data)
sync(update, producer_site, event_producer)
def get_config(event_config):
"""get the doctype mapping and naming configurations for consumption"""
doctypes, mapping_config, naming_config = [], {}, {}
for entry in event_config:
if entry.status == "Approved":
if entry.has_mapping:
(mapped_doctype, mapping) = frappe.db.get_value(
"Document Type Mapping", entry.mapping, ["remote_doctype", "name"]
)
mapping_config[mapped_doctype] = mapping
naming_config[mapped_doctype] = entry.use_same_name
doctypes.append(mapped_doctype)
else:
naming_config[entry.ref_doctype] = entry.use_same_name
doctypes.append(entry.ref_doctype)
return (doctypes, mapping_config, naming_config)
def sync(update, producer_site, event_producer, in_retry=False):
"""Sync the individual update"""
try:
if update.update_type == "Create":
set_insert(update, producer_site, event_producer.name)
if update.update_type == "Update":
set_update(update, producer_site)
if update.update_type == "Delete":
set_delete(update)
if in_retry:
return "Synced"
log_event_sync(update, event_producer.name, "Synced")
except Exception:
if in_retry:
if frappe.flags.in_test:
print(frappe.get_traceback())
return "Failed"
log_event_sync(update, event_producer.name, "Failed", frappe.get_traceback())
event_producer.set_last_update(update.creation)
frappe.db.commit()
def set_insert(update, producer_site, event_producer):
"""Sync insert type update"""
if frappe.db.get_value(update.ref_doctype, update.docname):
# doc already created
return
doc = frappe.get_doc(update.data)
if update.mapping:
if update.get("dependencies"):
dependencies_created = sync_mapped_dependencies(update.dependencies, producer_site)
for fieldname, value in dependencies_created.items():
doc.update({fieldname: value})
else:
sync_dependencies(doc, producer_site)
if update.use_same_name:
doc.insert(set_name=update.docname, set_child_names=False)
else:
# if event consumer is not saving documents with the same name as the producer
# store the remote docname in a custom field for future updates
doc.remote_docname = update.docname
doc.remote_site_name = event_producer
doc.insert(set_child_names=False)
def set_update(update, producer_site):
"""Sync update type update"""
local_doc = get_local_doc(update)
if local_doc:
data = frappe._dict(update.data)
if data.changed:
local_doc.update(data.changed)
if data.removed:
local_doc = update_row_removed(local_doc, data.removed)
if data.row_changed:
update_row_changed(local_doc, data.row_changed)
if data.added:
local_doc = update_row_added(local_doc, data.added)
if update.mapping:
if update.get("dependencies"):
dependencies_created = sync_mapped_dependencies(update.dependencies, producer_site)
for fieldname, value in dependencies_created.items():
local_doc.update({fieldname: value})
else:
sync_dependencies(local_doc, producer_site)
local_doc.save()
local_doc.db_update_all()
def update_row_removed(local_doc, removed):
"""Sync child table row deletion type update"""
for tablename, rownames in removed.items():
table = local_doc.get_table_field_doctype(tablename)
for row in rownames:
table_rows = local_doc.get(tablename)
child_table_row = get_child_table_row(table_rows, row)
table_rows.remove(child_table_row)
local_doc.set(tablename, table_rows)
return local_doc
def get_child_table_row(table_rows, row):
for entry in table_rows:
if entry.get("name") == row:
return entry
def update_row_changed(local_doc, changed):
"""Sync child table row updation type update"""
for tablename, rows in changed.items():
old = local_doc.get(tablename)
for doc in old:
for row in rows:
if row["name"] == doc.get("name"):
doc.update(row)
def update_row_added(local_doc, added):
"""Sync child table row addition type update"""
for tablename, rows in added.items():
local_doc.extend(tablename, rows)
for child in rows:
child_doc = frappe.get_doc(child)
child_doc.parent = local_doc.name
child_doc.parenttype = local_doc.doctype
child_doc.insert(set_name=child_doc.name)
return local_doc
def set_delete(update):
"""Sync delete type update"""
local_doc = get_local_doc(update)
if local_doc:
local_doc.delete()
def get_updates(producer_site, last_update, doctypes):
"""Get all updates generated after the last update timestamp"""
docs = producer_site.post_request(
{
"cmd": "frappe.event_streaming.doctype.event_update_log.event_update_log.get_update_logs_for_consumer",
"event_consumer": get_url(),
"doctypes": frappe.as_json(doctypes),
"last_update": last_update,
}
)
return [frappe._dict(d) for d in (docs or [])]
def get_local_doc(update):
"""Get the local document if created with a different name"""
try:
if not update.use_same_name:
return frappe.get_doc(update.ref_doctype, {"remote_docname": update.docname})
return frappe.get_doc(update.ref_doctype, update.docname)
except frappe.DoesNotExistError:
return None
def sync_dependencies(document, producer_site):
"""
dependencies is a dictionary to store all the docs
having dependencies and their sync status,
which is shared among all nested functions.
"""
dependencies = {document: True}
def check_doc_has_dependencies(doc, producer_site):
"""Sync child table link fields first,
then sync link fields,
then dynamic links"""
meta = frappe.get_meta(doc.doctype)
table_fields = meta.get_table_fields()
link_fields = meta.get_link_fields()
dl_fields = meta.get_dynamic_link_fields()
if table_fields:
sync_child_table_dependencies(doc, table_fields, producer_site)
if link_fields:
sync_link_dependencies(doc, link_fields, producer_site)
if dl_fields:
sync_dynamic_link_dependencies(doc, dl_fields, producer_site)
def sync_child_table_dependencies(doc, table_fields, producer_site):
for df in table_fields:
child_table = doc.get(df.fieldname)
for entry in child_table:
child_doc = producer_site.get_doc(entry.doctype, entry.name)
if child_doc:
child_doc = frappe._dict(child_doc)
set_dependencies(child_doc, frappe.get_meta(entry.doctype).get_link_fields(), producer_site)
def sync_link_dependencies(doc, link_fields, producer_site):
set_dependencies(doc, link_fields, producer_site)
def sync_dynamic_link_dependencies(doc, dl_fields, producer_site):
for df in dl_fields:
docname = doc.get(df.fieldname)
linked_doctype = doc.get(df.options)
if docname and not check_dependency_fulfilled(linked_doctype, docname):
master_doc = producer_site.get_doc(linked_doctype, docname)
frappe.get_doc(master_doc).insert(set_name=docname)
def set_dependencies(doc, link_fields, producer_site):
for df in link_fields:
docname = doc.get(df.fieldname)
linked_doctype = df.get_link_doctype()
if docname and not check_dependency_fulfilled(linked_doctype, docname):
master_doc = producer_site.get_doc(linked_doctype, docname)
try:
master_doc = frappe.get_doc(master_doc)
master_doc.insert(set_name=docname)
frappe.db.commit()
# for dependency inside a dependency
except Exception:
dependencies[master_doc] = True
def check_dependency_fulfilled(linked_doctype, docname):
return frappe.db.exists(linked_doctype, docname)
while dependencies[document]:
# find the first non synced dependency
for item in reversed(list(dependencies.keys())):
if dependencies[item]:
dependency = item
break
check_doc_has_dependencies(dependency, producer_site)
# mark synced for nested dependency
if dependency != document:
dependencies[dependency] = False
dependency.insert()
# no more dependencies left to be synced, the main doc is ready to be synced
# end the dependency loop
if not any(list(dependencies.values())[1:]):
dependencies[document] = False
def sync_mapped_dependencies(dependencies, producer_site):
dependencies_created = {}
for entry in dependencies:
doc = frappe._dict(json.loads(entry[1]))
docname = frappe.db.exists(doc.doctype, doc.name)
if not docname:
doc = frappe.get_doc(doc).insert(set_child_names=False)
dependencies_created[entry[0]] = doc.name
else:
dependencies_created[entry[0]] = docname
return dependencies_created
def log_event_sync(update, event_producer, sync_status, error=None):
"""Log event update received with the sync_status as Synced or Failed"""
doc = frappe.new_doc("Event Sync Log")
doc.update_type = update.update_type
doc.ref_doctype = update.ref_doctype
doc.status = sync_status
doc.event_producer = event_producer
doc.producer_doc = update.docname
doc.data = frappe.as_json(update.data)
doc.use_same_name = update.use_same_name
doc.mapping = update.mapping if update.mapping else None
if update.use_same_name:
doc.docname = update.docname
else:
doc.docname = frappe.db.get_value(update.ref_doctype, {"remote_docname": update.docname}, "name")
if error:
doc.error = error
doc.insert()
def get_mapped_update(update, producer_site):
"""get the new update document with mapped fields"""
mapping = frappe.get_doc("Document Type Mapping", update.mapping)
if update.update_type == "Create":
doc = frappe._dict(json.loads(update.data))
mapped_update = mapping.get_mapping(doc, producer_site, update.update_type)
update.data = mapped_update.get("doc")
update.dependencies = mapped_update.get("dependencies", None)
elif update.update_type == "Update":
mapped_update = mapping.get_mapped_update(update, producer_site)
update.data = mapped_update.get("doc")
update.dependencies = mapped_update.get("dependencies", None)
update["ref_doctype"] = mapping.local_doctype
return update
@frappe.whitelist()
def new_event_notification(producer_url):
"""Pull data from producer when notified"""
enqueued_method = "frappe.event_streaming.doctype.event_producer.event_producer.pull_from_node"
jobs = get_jobs()
if not jobs or enqueued_method not in jobs[frappe.local.site]:
frappe.enqueue(enqueued_method, queue="default", **{"event_producer": producer_url})
@frappe.whitelist()
def resync(update):
"""Retry syncing update if failed"""
update = frappe._dict(json.loads(update))
producer_site = get_producer_site(update.event_producer)
event_producer = frappe.get_doc("Event Producer", update.event_producer)
if update.mapping:
update = get_mapped_update(update, producer_site)
update.data = json.loads(update.data)
return sync(update, producer_site, event_producer, in_retry=True)

View file

@ -1,438 +0,0 @@
# Copyright (c) 2019, Frappe Technologies and Contributors
# License: MIT. See LICENSE
import json
import frappe
from frappe.core.doctype.user.user import generate_keys
from frappe.event_streaming.doctype.event_producer.event_producer import pull_from_node
from frappe.frappeclient import FrappeClient
from frappe.query_builder.utils import db_type_is
from frappe.tests.test_query_builder import run_only_if
from frappe.tests.utils import FrappeTestCase
producer_url = "http://test_site_producer:8000"
class TestEventProducer(FrappeTestCase):
def setUp(self):
create_event_producer(producer_url)
def tearDown(self):
unsubscribe_doctypes(producer_url)
def test_insert(self):
producer = get_remote_site()
producer_doc = insert_into_producer(producer, "test creation 1 sync")
self.pull_producer_data()
self.assertTrue(frappe.db.exists("ToDo", producer_doc.name))
def test_update(self):
producer = get_remote_site()
producer_doc = insert_into_producer(producer, "test update 1")
producer_doc["description"] = "test update 2"
producer_doc = producer.update(producer_doc)
self.pull_producer_data()
local_doc = frappe.get_doc(producer_doc.doctype, producer_doc.name)
self.assertEqual(local_doc.description, producer_doc.description)
def test_delete(self):
producer = get_remote_site()
producer_doc = insert_into_producer(producer, "test delete sync")
self.pull_producer_data()
self.assertTrue(frappe.db.exists("ToDo", producer_doc.name))
producer.delete("ToDo", producer_doc.name)
self.pull_producer_data()
self.assertFalse(frappe.db.exists("ToDo", producer_doc.name))
@run_only_if(db_type_is.MARIADB)
def test_multiple_doctypes_sync(self):
# TODO: This test is extremely flaky with Postgres. Rewrite this!
producer = get_remote_site()
# insert todo and note in producer
producer_todo = insert_into_producer(producer, "test multiple doc sync")
producer_note1 = frappe._dict(doctype="Note", title="test multiple doc sync 1")
delete_on_remote_if_exists(producer, "Note", {"title": producer_note1["title"]})
frappe.db.delete("Note", {"title": producer_note1["title"]})
producer_note1 = producer.insert(producer_note1)
producer_note2 = frappe._dict(doctype="Note", title="test multiple doc sync 2")
delete_on_remote_if_exists(producer, "Note", {"title": producer_note2["title"]})
frappe.db.delete("Note", {"title": producer_note2["title"]})
producer_note2 = producer.insert(producer_note2)
# update in producer
producer_todo["description"] = "test multiple doc update sync"
producer_todo = producer.update(producer_todo)
producer_note1["content"] = "testing update sync"
producer_note1 = producer.update(producer_note1)
producer.delete("Note", producer_note2.name)
self.pull_producer_data()
# check inserted
self.assertTrue(frappe.db.exists("ToDo", producer_todo.name))
# check update
local_todo = frappe.get_doc("ToDo", producer_todo.name)
self.assertEqual(local_todo.description, producer_todo.description)
local_note1 = frappe.get_doc("Note", producer_note1.name)
self.assertEqual(local_note1.content, producer_note1.content)
# check delete
self.assertFalse(frappe.db.exists("Note", producer_note2.name))
def test_child_table_sync_with_dependencies(self):
producer = get_remote_site()
producer_user = frappe._dict(
doctype="User",
email="test_user@sync.com",
send_welcome_email=0,
first_name="Test Sync User",
enabled=1,
roles=[{"role": "System Manager"}],
)
delete_on_remote_if_exists(producer, "User", {"email": producer_user.email})
frappe.db.delete("User", {"email": producer_user.email})
producer_user = producer.insert(producer_user)
producer_note = frappe._dict(
doctype="Note", title="test child table dependency sync", seen_by=[{"user": producer_user.name}]
)
delete_on_remote_if_exists(producer, "Note", {"title": producer_note.title})
frappe.db.delete("Note", {"title": producer_note.title})
producer_note = producer.insert(producer_note)
self.pull_producer_data()
self.assertTrue(frappe.db.exists("User", producer_user.name))
if self.assertTrue(frappe.db.exists("Note", producer_note.name)):
local_note = frappe.get_doc("Note", producer_note.name)
self.assertEqual(len(local_note.seen_by), 1)
def test_dynamic_link_dependencies_synced(self):
producer = get_remote_site()
# unsubscribe for Note to check whether dependency is fulfilled
event_producer = frappe.get_doc("Event Producer", producer_url, for_update=True)
event_producer.producer_doctypes = []
event_producer.append("producer_doctypes", {"ref_doctype": "ToDo", "use_same_name": 1})
event_producer.save()
producer_link_doc = frappe._dict(doctype="Note", title="Test Dynamic Link 1")
delete_on_remote_if_exists(producer, "Note", {"title": producer_link_doc.title})
frappe.db.delete("Note", {"title": producer_link_doc.title})
producer_link_doc = producer.insert(producer_link_doc)
producer_doc = frappe._dict(
doctype="ToDo",
description="Test Dynamic Link 2",
assigned_by="Administrator",
reference_type="Note",
reference_name=producer_link_doc.name,
)
producer_doc = producer.insert(producer_doc)
self.pull_producer_data()
# check dynamic link dependency created
self.assertTrue(frappe.db.exists("Note", producer_link_doc.name))
self.assertEqual(
producer_link_doc.name, frappe.db.get_value("ToDo", producer_doc.name, "reference_name")
)
reset_configuration(producer_url)
def test_naming_configuration(self):
# test with use_same_name = 0
producer = get_remote_site()
event_producer = frappe.get_doc("Event Producer", producer_url, for_update=True)
event_producer.producer_doctypes = []
event_producer.append("producer_doctypes", {"ref_doctype": "ToDo", "use_same_name": 0})
event_producer.save()
producer_doc = insert_into_producer(producer, "test different name sync")
self.pull_producer_data()
self.assertTrue(
frappe.db.exists(
"ToDo", {"remote_docname": producer_doc.name, "remote_site_name": producer_url}
)
)
reset_configuration(producer_url)
def test_conditional_events(self):
producer = get_remote_site()
# Add Condition
event_producer = frappe.get_doc("Event Producer", producer_url)
note_producer_entry = [x for x in event_producer.producer_doctypes if x.ref_doctype == "Note"][0]
note_producer_entry.condition = "doc.public == 1"
event_producer.save()
# Make test doc
producer_note1 = frappe._dict(doctype="Note", public=0, title="test conditional sync")
delete_on_remote_if_exists(producer, "Note", {"title": producer_note1["title"]})
producer_note1 = producer.insert(producer_note1)
# Make Update
producer_note1["content"] = "Test Conditional Sync Content"
producer_note1 = producer.update(producer_note1)
self.pull_producer_data()
# Check if synced here
self.assertFalse(frappe.db.exists("Note", producer_note1.name))
# Lets satisfy the condition
producer_note1["public"] = 1
producer_note1 = producer.update(producer_note1)
self.pull_producer_data()
# it should sync now
self.assertTrue(frappe.db.exists("Note", producer_note1.name))
local_note = frappe.get_doc("Note", producer_note1.name)
self.assertEqual(local_note.content, producer_note1.content)
reset_configuration(producer_url)
def test_conditional_events_with_cmd(self):
producer = get_remote_site()
# Add Condition
event_producer = frappe.get_doc("Event Producer", producer_url)
note_producer_entry = [x for x in event_producer.producer_doctypes if x.ref_doctype == "Note"][0]
note_producer_entry.condition = (
"cmd: frappe.event_streaming.doctype.event_producer.test_event_producer.can_sync_note"
)
event_producer.save()
# Make test doc
producer_note1 = frappe._dict(doctype="Note", public=0, title="test conditional sync cmd")
delete_on_remote_if_exists(producer, "Note", {"title": producer_note1["title"]})
producer_note1 = producer.insert(producer_note1)
# Make Update
producer_note1["content"] = "Test Conditional Sync Content"
producer_note1 = producer.update(producer_note1)
self.pull_producer_data()
# Check if synced here
self.assertFalse(frappe.db.exists("Note", producer_note1.name))
# Lets satisfy the condition
producer_note1["public"] = 1
producer_note1 = producer.update(producer_note1)
self.pull_producer_data()
# it should sync now
self.assertTrue(frappe.db.exists("Note", producer_note1.name))
local_note = frappe.get_doc("Note", producer_note1.name)
self.assertEqual(local_note.content, producer_note1.content)
reset_configuration(producer_url)
def test_update_log(self):
producer = get_remote_site()
producer_doc = insert_into_producer(producer, "test update log")
update_log_doc = producer.get_value(
"Event Update Log", "docname", {"docname": producer_doc.get("name")}
)
self.assertEqual(update_log_doc.get("docname"), producer_doc.get("name"))
def test_event_sync_log(self):
producer = get_remote_site()
producer_doc = insert_into_producer(producer, "test event sync log")
self.pull_producer_data()
self.assertTrue(frappe.db.exists("Event Sync Log", {"docname": producer_doc.name}))
def pull_producer_data(self):
pull_from_node(producer_url)
def test_mapping(self):
producer = get_remote_site()
event_producer = frappe.get_doc("Event Producer", producer_url, for_update=True)
event_producer.producer_doctypes = []
mapping = [{"local_fieldname": "description", "remote_fieldname": "content"}]
event_producer.append(
"producer_doctypes",
{
"ref_doctype": "ToDo",
"use_same_name": 1,
"has_mapping": 1,
"mapping": get_mapping("ToDo to Note", "ToDo", "Note", mapping),
},
)
event_producer.save()
producer_note = frappe._dict(doctype="Note", title="Test Mapping", content="Test Mapping")
delete_on_remote_if_exists(producer, "Note", {"title": producer_note.title})
producer_note = producer.insert(producer_note)
self.pull_producer_data()
# check inserted
self.assertTrue(frappe.db.exists("ToDo", {"description": producer_note.content}))
# update in producer
producer_note["content"] = "test mapped doc update sync"
producer_note = producer.update(producer_note)
self.pull_producer_data()
# check updated
self.assertTrue(frappe.db.exists("ToDo", {"description": producer_note["content"]}))
producer.delete("Note", producer_note.name)
self.pull_producer_data()
# check delete
self.assertFalse(frappe.db.exists("ToDo", {"description": producer_note.content}))
reset_configuration(producer_url)
def test_inner_mapping(self):
producer = get_remote_site()
setup_event_producer_for_inner_mapping()
producer_note = frappe._dict(
doctype="Note", title="Inner Mapping Tester", content="Test Inner Mapping"
)
delete_on_remote_if_exists(producer, "Note", {"title": producer_note.title})
producer_note = producer.insert(producer_note)
self.pull_producer_data()
# check dependency inserted
self.assertTrue(frappe.db.exists("Role", {"role_name": producer_note.title}))
# check doc inserted
self.assertTrue(frappe.db.exists("ToDo", {"description": producer_note.content}))
reset_configuration(producer_url)
def can_sync_note(consumer, doc, update_log):
return doc.public == 1
def setup_event_producer_for_inner_mapping():
event_producer = frappe.get_doc("Event Producer", producer_url, for_update=True)
event_producer.producer_doctypes = []
inner_mapping = [{"local_fieldname": "role_name", "remote_fieldname": "title"}]
inner_map = get_mapping("Role to Note Dependency Creation", "Role", "Note", inner_mapping)
mapping = [
{
"local_fieldname": "description",
"remote_fieldname": "content",
},
{
"local_fieldname": "role",
"remote_fieldname": "title",
"mapping_type": "Document",
"mapping": inner_map,
"remote_value_filters": json.dumps({"title": "title"}),
},
]
event_producer.append(
"producer_doctypes",
{
"ref_doctype": "ToDo",
"use_same_name": 1,
"has_mapping": 1,
"mapping": get_mapping("ToDo to Note Mapping", "ToDo", "Note", mapping),
},
)
event_producer.save()
return event_producer
def insert_into_producer(producer, description):
# create and insert todo on remote site
todo = dict(doctype="ToDo", description=description, assigned_by="Administrator")
return producer.insert(todo)
def delete_on_remote_if_exists(producer, doctype, filters):
remote_doc = producer.get_value(doctype, "name", filters)
if remote_doc:
producer.delete(doctype, remote_doc.get("name"))
def get_mapping(mapping_name, local, remote, field_map):
name = frappe.db.exists("Document Type Mapping", mapping_name)
if name:
doc = frappe.get_doc("Document Type Mapping", name)
else:
doc = frappe.new_doc("Document Type Mapping")
doc.mapping_name = mapping_name
doc.local_doctype = local
doc.remote_doctype = remote
for entry in field_map:
doc.append("field_mapping", entry)
doc.save()
return doc.name
def create_event_producer(producer_url):
if frappe.db.exists("Event Producer", producer_url):
event_producer = frappe.get_doc("Event Producer", producer_url)
for entry in event_producer.producer_doctypes:
entry.unsubscribe = 0
event_producer.save()
return
generate_keys("Administrator")
producer_site = connect()
response = producer_site.post_api(
"frappe.core.doctype.user.user.generate_keys", params={"user": "Administrator"}
)
api_secret = response.get("api_secret")
response = producer_site.get_value("User", "api_key", {"name": "Administrator"})
api_key = response.get("api_key")
event_producer = frappe.new_doc("Event Producer")
event_producer.producer_doctypes = []
event_producer.producer_url = producer_url
event_producer.append("producer_doctypes", {"ref_doctype": "ToDo", "use_same_name": 1})
event_producer.append("producer_doctypes", {"ref_doctype": "Note", "use_same_name": 1})
event_producer.user = "Administrator"
event_producer.api_key = api_key
event_producer.api_secret = api_secret
event_producer.save()
def reset_configuration(producer_url):
event_producer = frappe.get_doc("Event Producer", producer_url, for_update=True)
event_producer.producer_doctypes = []
event_producer.conditions = []
event_producer.producer_url = producer_url
event_producer.append("producer_doctypes", {"ref_doctype": "ToDo", "use_same_name": 1})
event_producer.append("producer_doctypes", {"ref_doctype": "Note", "use_same_name": 1})
event_producer.user = "Administrator"
event_producer.save()
def get_remote_site():
producer_doc = frappe.get_doc("Event Producer", producer_url)
producer_site = FrappeClient(
url=producer_doc.producer_url, username="Administrator", password="admin", verify=False
)
return producer_site
def unsubscribe_doctypes(producer_url):
event_producer = frappe.get_doc("Event Producer", producer_url)
for entry in event_producer.producer_doctypes:
entry.unsubscribe = 1
event_producer.save()
def connect():
def _connect():
return FrappeClient(url=producer_url, username="Administrator", password="admin", verify=False)
try:
return _connect()
except Exception:
return _connect()

View file

@ -1,86 +0,0 @@
{
"actions": [],
"creation": "2019-10-03 21:08:25.890352",
"doctype": "DocType",
"editable_grid": 1,
"engine": "InnoDB",
"field_order": [
"ref_doctype",
"status",
"use_same_name",
"unsubscribe",
"has_mapping",
"mapping",
"condition"
],
"fields": [
{
"columns": 3,
"fieldname": "ref_doctype",
"fieldtype": "Link",
"in_list_view": 1,
"label": "Document Type",
"options": "DocType",
"reqd": 1,
"set_only_once": 1
},
{
"default": "0",
"description": "If the document has different field names on the Producer and Consumer's end check this and set up the Mapping",
"fieldname": "has_mapping",
"fieldtype": "Check",
"label": "Has Mapping"
},
{
"depends_on": "eval: doc.has_mapping",
"fieldname": "mapping",
"fieldtype": "Link",
"label": "Mapping",
"options": "Document Type Mapping"
},
{
"columns": 2,
"default": "0",
"description": "If this is checked the documents will have the same name as they have on the Event Producer's site",
"fieldname": "use_same_name",
"fieldtype": "Check",
"in_list_view": 1,
"label": "Use Same Name"
},
{
"columns": 3,
"default": "Pending",
"fieldname": "status",
"fieldtype": "Select",
"in_list_view": 1,
"label": "Approval Status",
"options": "Pending\nApproved\nRejected",
"read_only": 1
},
{
"columns": 2,
"default": "0",
"fieldname": "unsubscribe",
"fieldtype": "Check",
"in_list_view": 1,
"label": "Unsubscribe"
},
{
"fieldname": "condition",
"fieldtype": "Code",
"label": "Condition"
}
],
"istable": 1,
"links": [],
"modified": "2020-11-07 09:26:58.463868",
"modified_by": "Administrator",
"module": "Event Streaming",
"name": "Event Producer Document Type",
"owner": "Administrator",
"permissions": [],
"quick_entry": 1,
"sort_field": "modified",
"sort_order": "DESC",
"track_changes": 1
}

View file

@ -1,9 +0,0 @@
# Copyright (c) 2019, Frappe Technologies and contributors
# License: MIT. See LICENSE
# import frappe
from frappe.model.document import Document
class EventProducerDocumentType(Document):
pass

View file

@ -1,7 +0,0 @@
// Copyright (c) 2020, Frappe Technologies and contributors
// For license information, please see license.txt
frappe.ui.form.on("Event Producer Last Update", {
// refresh: function(frm) {
// }
});

View file

@ -1,53 +0,0 @@
{
"actions": [],
"autoname": "field:event_producer",
"creation": "2020-10-26 12:53:11.940177",
"doctype": "DocType",
"editable_grid": 1,
"engine": "InnoDB",
"field_order": [
"event_producer",
"last_update"
],
"fields": [
{
"fieldname": "event_producer",
"fieldtype": "Data",
"in_list_view": 1,
"label": "Event Producer",
"reqd": 1,
"unique": 1
},
{
"fieldname": "last_update",
"fieldtype": "Data",
"label": "Last Update"
}
],
"in_create": 1,
"index_web_pages_for_search": 1,
"links": [],
"modified": "2020-10-26 13:22:27.056599",
"modified_by": "Administrator",
"module": "Event Streaming",
"name": "Event Producer Last Update",
"owner": "Administrator",
"permissions": [
{
"create": 1,
"delete": 1,
"email": 1,
"export": 1,
"print": 1,
"read": 1,
"report": 1,
"role": "System Manager",
"share": 1,
"write": 1
}
],
"read_only": 1,
"sort_field": "modified",
"sort_order": "DESC",
"track_changes": 1
}

View file

@ -1,9 +0,0 @@
# Copyright (c) 2020, Frappe Technologies and contributors
# License: MIT. See LICENSE
# import frappe
from frappe.model.document import Document
class EventProducerLastUpdate(Document):
pass

View file

@ -1,8 +0,0 @@
# Copyright (c) 2020, Frappe Technologies and Contributors
# License: MIT. See LICENSE
# import frappe
from frappe.tests.utils import FrappeTestCase
class TestEventProducerLastUpdate(FrappeTestCase):
pass

View file

@ -1,24 +0,0 @@
// Copyright (c) 2019, Frappe Technologies and contributors
// For license information, please see license.txt
frappe.ui.form.on("Event Sync Log", {
refresh: function (frm) {
if (frm.doc.status == "Failed") {
frm.add_custom_button(__("Resync"), function () {
frappe.call({
method: "frappe.event_streaming.doctype.event_producer.event_producer.resync",
args: {
update: frm.doc,
},
callback: function (r) {
if (r.message) {
frappe.msgprint(r.message);
frm.set_value("status", r.message);
frm.save();
}
},
});
});
}
},
});

View file

@ -1,137 +0,0 @@
{
"creation": "2019-09-24 22:22:05.845089",
"doctype": "DocType",
"editable_grid": 1,
"engine": "InnoDB",
"field_order": [
"update_type",
"ref_doctype",
"docname",
"column_break_4",
"status",
"event_producer",
"producer_doc",
"event_configurations_section",
"use_same_name",
"column_break_9",
"mapping",
"section_break_8",
"data",
"error"
],
"fields": [
{
"fieldname": "update_type",
"fieldtype": "Select",
"in_list_view": 1,
"label": "Update Type",
"options": "Create\nUpdate\nDelete",
"read_only": 1
},
{
"fieldname": "ref_doctype",
"fieldtype": "Link",
"label": "Doctype",
"options": "DocType",
"read_only": 1
},
{
"fieldname": "docname",
"fieldtype": "Data",
"in_list_view": 1,
"label": "Document Name",
"options": "ref_doctype",
"read_only": 1
},
{
"fieldname": "column_break_4",
"fieldtype": "Column Break"
},
{
"fieldname": "status",
"fieldtype": "Select",
"in_list_view": 1,
"label": "Status",
"options": "\nSynced\nFailed",
"read_only": 1
},
{
"fieldname": "event_producer",
"fieldtype": "Data",
"in_list_view": 1,
"label": "Event Producer",
"options": "Event Producer",
"read_only": 1
},
{
"fieldname": "section_break_8",
"fieldtype": "Section Break",
"label": "Data"
},
{
"fieldname": "data",
"fieldtype": "Code",
"label": "Data",
"read_only": 1
},
{
"fieldname": "producer_doc",
"fieldtype": "Data",
"label": "Producer Document Name",
"read_only": 1
},
{
"depends_on": "eval:doc.status=='Failed'",
"fieldname": "error",
"fieldtype": "Code",
"label": "Error",
"read_only": 1
},
{
"fieldname": "event_configurations_section",
"fieldtype": "Section Break",
"label": "Event Configurations"
},
{
"default": "0",
"fieldname": "use_same_name",
"fieldtype": "Data",
"label": "Use Same Name",
"read_only": 1
},
{
"fieldname": "column_break_9",
"fieldtype": "Column Break"
},
{
"fieldname": "mapping",
"fieldtype": "Data",
"label": "Mapping",
"read_only": 1
}
],
"in_create": 1,
"modified": "2019-10-07 13:22:10.401479",
"modified_by": "Administrator",
"module": "Event Streaming",
"name": "Event Sync Log",
"owner": "Administrator",
"permissions": [
{
"create": 1,
"delete": 1,
"email": 1,
"export": 1,
"print": 1,
"read": 1,
"report": 1,
"role": "System Manager",
"share": 1,
"write": 1
}
],
"quick_entry": 1,
"sort_field": "modified",
"sort_order": "DESC",
"track_changes": 1
}

View file

@ -1,9 +0,0 @@
# Copyright (c) 2019, Frappe Technologies and contributors
# License: MIT. See LICENSE
# import frappe
from frappe.model.document import Document
class EventSyncLog(Document):
pass

View file

@ -1,9 +0,0 @@
frappe.listview_settings["Event Sync Log"] = {
get_indicator: function (doc) {
var colors = {
Failed: "red",
Synced: "green",
};
return [__(doc.status), colors[doc.status], "status,=," + doc.status];
},
};

View file

@ -1,8 +0,0 @@
# Copyright (c) 2019, Frappe Technologies and Contributors
# License: MIT. See LICENSE
# import frappe
from frappe.tests.utils import FrappeTestCase
class TestEventSyncLog(FrappeTestCase):
pass

View file

@ -1,7 +0,0 @@
// Copyright (c) 2019, Frappe Technologies Pvt. Ltd. and contributors
// For license information, please see license.txt
frappe.ui.form.on("Event Update Log", {
// refresh: function(frm) {
// }
});

View file

@ -1,77 +0,0 @@
{
"actions": [],
"creation": "2019-07-30 15:31:26.352527",
"doctype": "DocType",
"editable_grid": 1,
"engine": "InnoDB",
"field_order": [
"update_type",
"ref_doctype",
"docname",
"data",
"consumers"
],
"fields": [
{
"fieldname": "update_type",
"fieldtype": "Select",
"in_list_view": 1,
"label": "Update Type",
"options": "Create\nUpdate\nDelete",
"read_only": 1
},
{
"fieldname": "ref_doctype",
"fieldtype": "Link",
"in_list_view": 1,
"label": "DocType",
"options": "DocType",
"read_only": 1
},
{
"fieldname": "docname",
"fieldtype": "Data",
"in_list_view": 1,
"label": "Document Name",
"read_only": 1
},
{
"fieldname": "data",
"fieldtype": "Code",
"label": "Data",
"read_only": 1
},
{
"fieldname": "consumers",
"fieldtype": "Table MultiSelect",
"label": "Consumers",
"options": "Event Update Log Consumer",
"read_only": 1
}
],
"in_create": 1,
"links": [],
"modified": "2020-09-04 07:31:52.599804",
"modified_by": "Administrator",
"module": "Event Streaming",
"name": "Event Update Log",
"owner": "Administrator",
"permissions": [
{
"create": 1,
"delete": 1,
"email": 1,
"export": 1,
"print": 1,
"read": 1,
"report": 1,
"role": "System Manager",
"share": 1,
"write": 1
}
],
"quick_entry": 1,
"sort_field": "modified",
"sort_order": "DESC",
"track_changes": 1
}

View file

@ -1,296 +0,0 @@
# Copyright (c) 2019, Frappe Technologies Pvt. Ltd. and contributors
# License: MIT. See LICENSE
import frappe
from frappe.model import no_value_fields, table_fields
from frappe.model.document import Document
from frappe.utils.background_jobs import get_jobs
class EventUpdateLog(Document):
def after_insert(self):
"""Send update notification updates to event consumers
whenever update log is generated"""
enqueued_method = (
"frappe.event_streaming.doctype.event_consumer.event_consumer.notify_event_consumers"
)
jobs = get_jobs()
if not jobs or enqueued_method not in jobs[frappe.local.site]:
frappe.enqueue(
enqueued_method, doctype=self.ref_doctype, queue="long", enqueue_after_commit=True
)
def notify_consumers(doc, event):
"""called via hooks"""
# make event update log for doctypes having event consumers
if frappe.flags.in_install or frappe.flags.in_migrate:
return
consumers = check_doctype_has_consumers(doc.doctype)
if consumers:
if event == "after_insert":
doc.flags.event_update_log = make_event_update_log(doc, update_type="Create")
elif event == "on_trash":
make_event_update_log(doc, update_type="Delete")
else:
# on_update
# called after saving
if not doc.flags.event_update_log: # if not already inserted
diff = get_update(doc.get_doc_before_save(), doc)
if diff:
doc.diff = diff
make_event_update_log(doc, update_type="Update")
def check_doctype_has_consumers(doctype):
"""Check if doctype has event consumers for event streaming"""
return frappe.cache_manager.get_doctype_map(
"Event Consumer Document Type",
doctype,
dict(ref_doctype=doctype, status="Approved", unsubscribed=0),
)
def get_update(old, new, for_child=False):
"""
Get document objects with updates only
If there is a change, then returns a dict like:
{
"changed" : {fieldname1: new_value1, fieldname2: new_value2, },
"added" : {table_fieldname1: [{row_dict1}, {row_dict2}], },
"removed" : {table_fieldname1: [row_name1, row_name2], },
"row_changed" : {table_fieldname1:
{
child_fieldname1: new_val,
child_fieldname2: new_val
},
},
}
"""
if not new:
return None
out = frappe._dict(changed={}, added={}, removed={}, row_changed={})
for df in new.meta.fields:
if df.fieldtype in no_value_fields and df.fieldtype not in table_fields:
continue
old_value, new_value = old.get(df.fieldname), new.get(df.fieldname)
if df.fieldtype in table_fields:
old_row_by_name, new_row_by_name = make_maps(old_value, new_value)
out = check_for_additions(out, df, new_value, old_row_by_name)
out = check_for_deletions(out, df, old_value, new_row_by_name)
elif old_value != new_value:
out.changed[df.fieldname] = new_value
out = check_docstatus(out, old, new, for_child)
if any((out.changed, out.added, out.removed, out.row_changed)):
return out
return None
def make_event_update_log(doc, update_type):
"""Save update info for doctypes that have event consumers"""
if update_type != "Delete":
# diff for update type, doc for create type
data = frappe.as_json(doc) if not doc.get("diff") else frappe.as_json(doc.diff)
else:
data = None
return frappe.get_doc(
{
"doctype": "Event Update Log",
"update_type": update_type,
"ref_doctype": doc.doctype,
"docname": doc.name,
"data": data,
}
).insert(ignore_permissions=True)
def make_maps(old_value, new_value):
"""make maps"""
old_row_by_name, new_row_by_name = {}, {}
for d in old_value:
old_row_by_name[d.name] = d
for d in new_value:
new_row_by_name[d.name] = d
return old_row_by_name, new_row_by_name
def check_for_additions(out, df, new_value, old_row_by_name):
"""check rows for additions, changes"""
for _i, d in enumerate(new_value):
if d.name in old_row_by_name:
diff = get_update(old_row_by_name[d.name], d, for_child=True)
if diff and diff.changed:
if not out.row_changed.get(df.fieldname):
out.row_changed[df.fieldname] = []
diff.changed["name"] = d.name
out.row_changed[df.fieldname].append(diff.changed)
else:
if not out.added.get(df.fieldname):
out.added[df.fieldname] = []
out.added[df.fieldname].append(d.as_dict())
return out
def check_for_deletions(out, df, old_value, new_row_by_name):
"""check for deletions"""
for d in old_value:
if d.name not in new_row_by_name:
if not out.removed.get(df.fieldname):
out.removed[df.fieldname] = []
out.removed[df.fieldname].append(d.name)
return out
def check_docstatus(out, old, new, for_child):
"""docstatus changes"""
if not for_child and old.docstatus != new.docstatus:
out.changed["docstatus"] = new.docstatus
return out
def is_consumer_uptodate(update_log, consumer):
"""
Checks if Consumer has read all the UpdateLogs before the specified update_log
:param update_log: The UpdateLog Doc in context
:param consumer: The EventConsumer doc
"""
if update_log.update_type == "Create":
# consumer is obviously up to date
return True
prev_logs = frappe.get_all(
"Event Update Log",
filters={
"ref_doctype": update_log.ref_doctype,
"docname": update_log.docname,
"creation": ["<", update_log.creation],
},
order_by="creation desc",
limit_page_length=1,
)
if not len(prev_logs):
return False
prev_log_consumers = frappe.get_all(
"Event Update Log Consumer",
fields=["consumer"],
filters={
"parent": prev_logs[0].name,
"parenttype": "Event Update Log",
"consumer": consumer.name,
},
)
return len(prev_log_consumers) > 0
def mark_consumer_read(update_log_name, consumer_name):
"""
This function appends the Consumer to the list of Consumers that has 'read' an Update Log
"""
update_log = frappe.get_doc("Event Update Log", update_log_name)
if len([x for x in update_log.consumers if x.consumer == consumer_name]):
return
frappe.get_doc(
frappe._dict(
doctype="Event Update Log Consumer",
consumer=consumer_name,
parent=update_log_name,
parenttype="Event Update Log",
parentfield="consumers",
)
).insert(ignore_permissions=True)
def get_unread_update_logs(consumer_name, dt, dn):
"""
Get old logs unread by the consumer on a particular document
"""
already_consumed = [
x[0]
for x in frappe.db.sql(
"""
SELECT
update_log.name
FROM `tabEvent Update Log` update_log
JOIN `tabEvent Update Log Consumer` consumer ON consumer.parent = %(log_name)s
WHERE
consumer.consumer = %(consumer)s
AND update_log.ref_doctype = %(dt)s
AND update_log.docname = %(dn)s
""",
{
"consumer": consumer_name,
"dt": dt,
"dn": dn,
"log_name": "update_log.name"
if frappe.conf.db_type == "mariadb"
else "CAST(update_log.name AS VARCHAR)",
},
as_dict=0,
)
]
logs = frappe.get_all(
"Event Update Log",
fields=["update_type", "ref_doctype", "docname", "data", "name", "creation"],
filters={"ref_doctype": dt, "docname": dn, "name": ["not in", already_consumed]},
order_by="creation",
)
return logs
@frappe.whitelist()
def get_update_logs_for_consumer(event_consumer, doctypes, last_update):
"""
Fetches all the UpdateLogs for the consumer
It will inject old un-consumed Update Logs if a doc was just found to be accessible to the Consumer
"""
if isinstance(doctypes, str):
doctypes = frappe.parse_json(doctypes)
from frappe.event_streaming.doctype.event_consumer.event_consumer import has_consumer_access
consumer = frappe.get_doc("Event Consumer", event_consumer)
docs = frappe.get_list(
doctype="Event Update Log",
filters={"ref_doctype": ("in", doctypes), "creation": (">", last_update)},
fields=["update_type", "ref_doctype", "docname", "data", "name", "creation"],
order_by="creation desc",
)
result = []
to_update_history = []
for d in docs:
if (d.ref_doctype, d.docname) in to_update_history:
# will be notified by background jobs
continue
if not has_consumer_access(consumer=consumer, update_log=d):
continue
if not is_consumer_uptodate(d, consumer):
to_update_history.append((d.ref_doctype, d.docname))
# get_unread_update_logs will have the current log
old_logs = get_unread_update_logs(consumer.name, d.ref_doctype, d.docname)
if old_logs:
old_logs.reverse()
result.extend(old_logs)
else:
result.append(d)
for d in result:
mark_consumer_read(update_log_name=d.name, consumer_name=consumer.name)
result.reverse()
return result

View file

@ -1,8 +0,0 @@
# Copyright (c) 2019, Frappe Technologies Pvt. Ltd. and Contributors
# License: MIT. See LICENSE
# import frappe
from frappe.tests.utils import FrappeTestCase
class TestEventUpdateLog(FrappeTestCase):
pass

View file

@ -1,32 +0,0 @@
{
"actions": [],
"creation": "2020-06-30 10:54:53.301787",
"doctype": "DocType",
"editable_grid": 1,
"engine": "InnoDB",
"field_order": [
"consumer"
],
"fields": [
{
"fieldname": "consumer",
"fieldtype": "Link",
"in_list_view": 1,
"label": "Consumer",
"options": "Event Consumer",
"reqd": 1
}
],
"istable": 1,
"links": [],
"modified": "2020-06-30 10:54:53.301787",
"modified_by": "Administrator",
"module": "Event Streaming",
"name": "Event Update Log Consumer",
"owner": "Administrator",
"permissions": [],
"quick_entry": 1,
"sort_field": "modified",
"sort_order": "DESC",
"track_changes": 1
}

View file

@ -1,9 +0,0 @@
# Copyright (c) 2020, Frappe Technologies and contributors
# License: MIT. See LICENSE
# import frappe
from frappe.model.document import Document
class EventUpdateLogConsumer(Document):
pass