diff --git a/frappe/event_streaming/__init__.py b/frappe/event_streaming/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/frappe/event_streaming/doctype/__init__.py b/frappe/event_streaming/doctype/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/frappe/event_streaming/doctype/document_type_field_mapping/__init__.py b/frappe/event_streaming/doctype/document_type_field_mapping/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/frappe/event_streaming/doctype/document_type_field_mapping/document_type_field_mapping.json b/frappe/event_streaming/doctype/document_type_field_mapping/document_type_field_mapping.json deleted file mode 100644 index bba0a98237..0000000000 --- a/frappe/event_streaming/doctype/document_type_field_mapping/document_type_field_mapping.json +++ /dev/null @@ -1,73 +0,0 @@ -{ - "actions": [], - "creation": "2019-09-27 12:46:50.165135", - "doctype": "DocType", - "editable_grid": 1, - "engine": "InnoDB", - "field_order": [ - "local_fieldname", - "mapping_type", - "mapping", - "remote_value_filters", - "column_break_5", - "remote_fieldname", - "default_value" - ], - "fields": [ - { - "fieldname": "remote_fieldname", - "fieldtype": "Data", - "in_list_view": 1, - "label": "Remote Fieldname" - }, - { - "fieldname": "local_fieldname", - "fieldtype": "Data", - "in_list_view": 1, - "label": "Local Fieldname", - "reqd": 1 - }, - { - "fieldname": "column_break_5", - "fieldtype": "Column Break" - }, - { - "fieldname": "default_value", - "fieldtype": "Data", - "label": "Default Value" - }, - { - "fieldname": "mapping_type", - "fieldtype": "Select", - "label": "Mapping Type", - "options": "\nChild Table\nDocument" - }, - { - "depends_on": "eval:doc.mapping_type;", - "fieldname": "mapping", - "fieldtype": "Link", - "label": "Mapping", - "options": "Document Type Mapping" - }, - { - "depends_on": "eval:doc.mapping_type==\"Document\";", - "fieldname": "remote_value_filters", - "fieldtype": "Code", - "label": "Remote Value Filters", - "mandatory_depends_on": "eval:doc.mapping_type===\"Document\";", - "options": "JSON" - } - ], - "istable": 1, - "links": [], - "modified": "2020-03-19 13:56:36.223799", - "modified_by": "Administrator", - "module": "Event Streaming", - "name": "Document Type Field Mapping", - "owner": "Administrator", - "permissions": [], - "quick_entry": 1, - "sort_field": "modified", - "sort_order": "DESC", - "track_changes": 1 -} \ No newline at end of file diff --git a/frappe/event_streaming/doctype/document_type_field_mapping/document_type_field_mapping.py b/frappe/event_streaming/doctype/document_type_field_mapping/document_type_field_mapping.py deleted file mode 100644 index 96d9e0fcb3..0000000000 --- a/frappe/event_streaming/doctype/document_type_field_mapping/document_type_field_mapping.py +++ /dev/null @@ -1,9 +0,0 @@ -# Copyright (c) 2019, Frappe Technologies and contributors -# License: MIT. See LICENSE - -# import frappe -from frappe.model.document import Document - - -class DocumentTypeFieldMapping(Document): - pass diff --git a/frappe/event_streaming/doctype/document_type_mapping/__init__.py b/frappe/event_streaming/doctype/document_type_mapping/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/frappe/event_streaming/doctype/document_type_mapping/document_type_mapping.js b/frappe/event_streaming/doctype/document_type_mapping/document_type_mapping.js deleted file mode 100644 index ad9ab0f51d..0000000000 --- a/frappe/event_streaming/doctype/document_type_mapping/document_type_mapping.js +++ /dev/null @@ -1,37 +0,0 @@ -// Copyright (c) 2019, Frappe Technologies and contributors -// For license information, please see license.txt - -frappe.ui.form.on("Document Type Mapping", { - local_doctype: function (frm) { - if (frm.doc.local_doctype) { - frappe.model.clear_table(frm.doc, "field_mapping"); - let fields = frm.events.get_fields(frm); - $.each(fields, function (i, data) { - let row = frappe.model.add_child( - frm.doc, - "Document Type Field Mapping", - "field_mapping" - ); - row.local_fieldname = data; - }); - refresh_field("field_mapping"); - } - }, - - get_fields: function (frm) { - let filtered_fields = []; - frappe.model.with_doctype(frm.doc.local_doctype, () => { - frappe.get_meta(frm.doc.local_doctype).fields.map((field) => { - if ( - field.fieldname !== "remote_docname" && - field.fieldname !== "remote_site_name" && - frappe.model.is_value_type(field) && - !field.hidden - ) { - filtered_fields.push(field.fieldname); - } - }); - }); - return filtered_fields; - }, -}); diff --git a/frappe/event_streaming/doctype/document_type_mapping/document_type_mapping.json b/frappe/event_streaming/doctype/document_type_mapping/document_type_mapping.json deleted file mode 100644 index 6a59cf3b70..0000000000 --- a/frappe/event_streaming/doctype/document_type_mapping/document_type_mapping.json +++ /dev/null @@ -1,71 +0,0 @@ -{ - "autoname": "field:mapping_name", - "creation": "2019-09-27 12:45:56.529124", - "doctype": "DocType", - "editable_grid": 1, - "engine": "InnoDB", - "field_order": [ - "mapping_name", - "local_doctype", - "remote_doctype", - "section_break_3", - "field_mapping" - ], - "fields": [ - { - "fieldname": "local_doctype", - "fieldtype": "Link", - "in_list_view": 1, - "label": "Local Document Type", - "options": "DocType", - "reqd": 1 - }, - { - "fieldname": "remote_doctype", - "fieldtype": "Data", - "in_list_view": 1, - "label": "Remote Document Type", - "reqd": 1 - }, - { - "fieldname": "section_break_3", - "fieldtype": "Section Break" - }, - { - "fieldname": "field_mapping", - "fieldtype": "Table", - "label": "Field Mapping", - "options": "Document Type Field Mapping" - }, - { - "fieldname": "mapping_name", - "fieldtype": "Data", - "label": "Mapping Name", - "reqd": 1, - "unique": 1 - } - ], - "modified": "2019-10-09 08:36:04.621397", - "modified_by": "Administrator", - "module": "Event Streaming", - "name": "Document Type Mapping", - "owner": "Administrator", - "permissions": [ - { - "create": 1, - "delete": 1, - "email": 1, - "export": 1, - "print": 1, - "read": 1, - "report": 1, - "role": "System Manager", - "share": 1, - "write": 1 - } - ], - "quick_entry": 1, - "sort_field": "modified", - "sort_order": "DESC", - "track_changes": 1 -} \ No newline at end of file diff --git a/frappe/event_streaming/doctype/document_type_mapping/document_type_mapping.py b/frappe/event_streaming/doctype/document_type_mapping/document_type_mapping.py deleted file mode 100644 index 04b5015296..0000000000 --- a/frappe/event_streaming/doctype/document_type_mapping/document_type_mapping.py +++ /dev/null @@ -1,181 +0,0 @@ -# Copyright (c) 2019, Frappe Technologies and contributors -# License: MIT. See LICENSE -import json - -import frappe -from frappe import _ -from frappe.model import child_table_fields, default_fields -from frappe.model.document import Document - - -class DocumentTypeMapping(Document): - def validate(self): - self.validate_inner_mapping() - - def validate_inner_mapping(self): - meta = frappe.get_meta(self.local_doctype) - for field_map in self.field_mapping: - if field_map.local_fieldname not in (default_fields + child_table_fields): - field = meta.get_field(field_map.local_fieldname) - if not field: - frappe.throw(_("Row #{0}: Invalid Local Fieldname").format(field_map.idx)) - - fieldtype = field.get("fieldtype") - if fieldtype in ["Link", "Dynamic Link", "Table"]: - if not field_map.mapping and not field_map.default_value: - msg = _( - "Row #{0}: Please set Mapping or Default Value for the field {1} since its a dependency field" - ).format(field_map.idx, frappe.bold(field_map.local_fieldname)) - frappe.throw(msg, title="Inner Mapping Missing") - - if field_map.mapping_type == "Document" and not field_map.remote_value_filters: - msg = _( - "Row #{0}: Please set remote value filters for the field {1} to fetch the unique remote dependency document" - ).format(field_map.idx, frappe.bold(field_map.remote_fieldname)) - frappe.throw(msg, title="Remote Value Filters Missing") - - def get_mapping(self, doc, producer_site, update_type): - remote_fields = [] - # list of tuples (local_fieldname, dependent_doc) - dependencies = [] - - for mapping in self.field_mapping: - if doc.get(mapping.remote_fieldname): - if mapping.mapping_type == "Document": - if not mapping.default_value: - dependency = self.get_mapped_dependency(mapping, producer_site, doc) - if dependency: - dependencies.append((mapping.local_fieldname, dependency)) - else: - doc[mapping.local_fieldname] = mapping.default_value - - if mapping.mapping_type == "Child Table" and update_type != "Update": - doc[mapping.local_fieldname] = get_mapped_child_table_docs( - mapping.mapping, doc[mapping.remote_fieldname], producer_site - ) - else: - # copy value into local fieldname key and remove remote fieldname key - doc[mapping.local_fieldname] = doc[mapping.remote_fieldname] - - if mapping.local_fieldname != mapping.remote_fieldname: - remote_fields.append(mapping.remote_fieldname) - - if not doc.get(mapping.remote_fieldname) and mapping.default_value and update_type != "Update": - doc[mapping.local_fieldname] = mapping.default_value - - # remove the remote fieldnames - for field in remote_fields: - doc.pop(field, None) - - if update_type != "Update": - doc["doctype"] = self.local_doctype - - mapping = {"doc": frappe.as_json(doc)} - if len(dependencies): - mapping["dependencies"] = dependencies - return mapping - - def get_mapped_update(self, update, producer_site): - update_diff = frappe._dict(json.loads(update.data)) - mapping = update_diff - dependencies = [] - if update_diff.changed: - doc_map = self.get_mapping(update_diff.changed, producer_site, "Update") - mapped_doc = doc_map.get("doc") - mapping.changed = json.loads(mapped_doc) - if doc_map.get("dependencies"): - dependencies += doc_map.get("dependencies") - - if update_diff.removed: - mapping = self.map_rows_removed(update_diff, mapping) - if update_diff.added: - mapping = self.map_rows(update_diff, mapping, producer_site, operation="added") - if update_diff.row_changed: - mapping = self.map_rows(update_diff, mapping, producer_site, operation="row_changed") - - update = {"doc": frappe.as_json(mapping)} - if len(dependencies): - update["dependencies"] = dependencies - return update - - def get_mapped_dependency(self, mapping, producer_site, doc): - inner_mapping = frappe.get_doc("Document Type Mapping", mapping.mapping) - filters = json.loads(mapping.remote_value_filters) - for key, value in filters.items(): - if value.startswith("eval:"): - val = frappe.safe_eval(value[5:], None, dict(doc=doc)) - filters[key] = val - if doc.get(value): - filters[key] = doc.get(value) - matching_docs = producer_site.get_doc(inner_mapping.remote_doctype, filters=filters) - if len(matching_docs): - remote_docname = matching_docs[0].get("name") - remote_doc = producer_site.get_doc(inner_mapping.remote_doctype, remote_docname) - doc = inner_mapping.get_mapping(remote_doc, producer_site, "Insert").get("doc") - return doc - return - - def map_rows_removed(self, update_diff, mapping): - removed = [] - mapping["removed"] = update_diff.removed - for key, value in update_diff.removed.copy().items(): - local_table_name = frappe.db.get_value( - "Document Type Field Mapping", - {"remote_fieldname": key, "parent": self.name}, - "local_fieldname", - ) - mapping.removed[local_table_name] = value - if local_table_name != key: - removed.append(key) - - # remove the remote fieldnames - for field in removed: - mapping.removed.pop(field, None) - return mapping - - def map_rows(self, update_diff, mapping, producer_site, operation): - remote_fields = [] - for tablename, entries in update_diff.get(operation).copy().items(): - local_table_name = frappe.db.get_value( - "Document Type Field Mapping", {"remote_fieldname": tablename}, "local_fieldname" - ) - table_map = frappe.db.get_value( - "Document Type Field Mapping", - {"local_fieldname": local_table_name, "parent": self.name}, - "mapping", - ) - table_map = frappe.get_doc("Document Type Mapping", table_map) - docs = [] - for entry in entries: - mapped_doc = table_map.get_mapping(entry, producer_site, "Update").get("doc") - docs.append(json.loads(mapped_doc)) - mapping.get(operation)[local_table_name] = docs - if local_table_name != tablename: - remote_fields.append(tablename) - - # remove the remote fieldnames - for field in remote_fields: - mapping.get(operation).pop(field, None) - - return mapping - - -def get_mapped_child_table_docs(child_map, table_entries, producer_site): - """Get mapping for child doctypes""" - child_map = frappe.get_doc("Document Type Mapping", child_map) - mapped_entries = [] - remote_fields = [] - for child_doc in table_entries: - for mapping in child_map.field_mapping: - if child_doc.get(mapping.remote_fieldname): - child_doc[mapping.local_fieldname] = child_doc[mapping.remote_fieldname] - if mapping.local_fieldname != mapping.remote_fieldname: - child_doc.pop(mapping.remote_fieldname, None) - mapped_entries.append(child_doc) - - # remove the remote fieldnames - for field in remote_fields: - child_doc.pop(field, None) - - child_doc["doctype"] = child_map.local_doctype - return mapped_entries diff --git a/frappe/event_streaming/doctype/document_type_mapping/test_document_type_mapping.py b/frappe/event_streaming/doctype/document_type_mapping/test_document_type_mapping.py deleted file mode 100644 index defaa8b9c6..0000000000 --- a/frappe/event_streaming/doctype/document_type_mapping/test_document_type_mapping.py +++ /dev/null @@ -1,8 +0,0 @@ -# Copyright (c) 2019, Frappe Technologies and Contributors -# License: MIT. See LICENSE -# import frappe -from frappe.tests.utils import FrappeTestCase - - -class TestDocumentTypeMapping(FrappeTestCase): - pass diff --git a/frappe/event_streaming/doctype/event_consumer/__init__.py b/frappe/event_streaming/doctype/event_consumer/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/frappe/event_streaming/doctype/event_consumer/event_consumer.js b/frappe/event_streaming/doctype/event_consumer/event_consumer.js deleted file mode 100644 index 2bcf96f9f3..0000000000 --- a/frappe/event_streaming/doctype/event_consumer/event_consumer.js +++ /dev/null @@ -1,17 +0,0 @@ -// Copyright (c) 2019, Frappe Technologies and contributors -// For license information, please see license.txt - -frappe.ui.form.on("Event Consumer", { - refresh: function (frm) { - // formatter for subscribed doctype approval status - frm.set_indicator_formatter("status", function (doc) { - let indicator = "orange"; - if (doc.status == "Approved") { - indicator = "green"; - } else if (doc.status == "Rejected") { - indicator = "red"; - } - return indicator; - }); - }, -}); diff --git a/frappe/event_streaming/doctype/event_consumer/event_consumer.json b/frappe/event_streaming/doctype/event_consumer/event_consumer.json deleted file mode 100644 index 42b47ce949..0000000000 --- a/frappe/event_streaming/doctype/event_consumer/event_consumer.json +++ /dev/null @@ -1,97 +0,0 @@ -{ - "actions": [], - "autoname": "field:callback_url", - "creation": "2019-08-26 17:45:15.479530", - "doctype": "DocType", - "editable_grid": 1, - "engine": "InnoDB", - "field_order": [ - "consumer_doctypes", - "callback_url", - "section_break_3", - "api_key", - "api_secret", - "column_break_6", - "user", - "incoming_change" - ], - "fields": [ - { - "fieldname": "callback_url", - "fieldtype": "Data", - "in_list_view": 1, - "label": "Callback URL", - "read_only": 1, - "reqd": 1, - "unique": 1 - }, - { - "fieldname": "api_key", - "fieldtype": "Data", - "label": "API Key", - "reqd": 1 - }, - { - "fieldname": "api_secret", - "fieldtype": "Password", - "label": "API Secret", - "reqd": 1 - }, - { - "fieldname": "user", - "fieldtype": "Link", - "label": "Event Subscriber", - "options": "User", - "read_only": 1, - "reqd": 1 - }, - { - "fieldname": "section_break_3", - "fieldtype": "Section Break" - }, - { - "fieldname": "column_break_6", - "fieldtype": "Column Break" - }, - { - "default": "0", - "fieldname": "incoming_change", - "fieldtype": "Check", - "hidden": 1, - "label": "Incoming Change", - "read_only": 1 - }, - { - "fieldname": "consumer_doctypes", - "fieldtype": "Table", - "label": "Event Consumer Document Types", - "options": "Event Consumer Document Type", - "reqd": 1 - } - ], - "in_create": 1, - "links": [], - "modified": "2020-09-08 16:42:39.828085", - "modified_by": "Administrator", - "module": "Event Streaming", - "name": "Event Consumer", - "owner": "Administrator", - "permissions": [ - { - "create": 1, - "delete": 1, - "email": 1, - "export": 1, - "print": 1, - "read": 1, - "report": 1, - "role": "System Manager", - "share": 1, - "write": 1 - } - ], - "quick_entry": 1, - "sort_field": "modified", - "sort_order": "DESC", - "track_changes": 1 -} \ No newline at end of file diff --git a/frappe/event_streaming/doctype/event_consumer/event_consumer.py b/frappe/event_streaming/doctype/event_consumer/event_consumer.py deleted file mode 100644 index a2ae6f6651..0000000000 --- a/frappe/event_streaming/doctype/event_consumer/event_consumer.py +++ /dev/null @@ -1,216 +0,0 @@ -# Copyright (c) 2019, Frappe Technologies and contributors -# License: MIT. See LICENSE - -import json -import os - -import requests - -import frappe -from frappe import _ -from frappe.frappeclient import FrappeClient -from frappe.model.document import Document -from frappe.utils.background_jobs import get_jobs -from frappe.utils.data import get_url - - -class EventConsumer(Document): - def validate(self): - # approve subscribed doctypes for tests - # frappe.flags.in_test won't work here as tests are running on the consumer site - if os.environ.get("CI"): - for entry in self.consumer_doctypes: - entry.status = "Approved" - - def on_update(self): - if not self.incoming_change: - doc_before_save = self.get_doc_before_save() - if doc_before_save.api_key != self.api_key or doc_before_save.api_secret != self.api_secret: - return - - self.update_consumer_status() - else: - frappe.db.set_value(self.doctype, self.name, "incoming_change", 0) - - frappe.cache().delete_value("event_consumer_document_type_map") - - def on_trash(self): - for i in frappe.get_all("Event Update Log Consumer", {"consumer": self.name}): - frappe.delete_doc("Event Update Log Consumer", i.name) - frappe.cache().delete_value("event_consumer_document_type_map") - - def update_consumer_status(self): - consumer_site = get_consumer_site(self.callback_url) - event_producer = consumer_site.get_doc("Event Producer", get_url()) - event_producer = frappe._dict(event_producer) - config = event_producer.producer_doctypes - event_producer.producer_doctypes = [] - for entry in config: - if entry.get("has_mapping"): - ref_doctype = consumer_site.get_value( - "Document Type Mapping", "remote_doctype", entry.get("mapping") - ).get("remote_doctype") - else: - ref_doctype = entry.get("ref_doctype") - - entry["status"] = frappe.db.get_value( - "Event Consumer Document Type", {"parent": self.name, "ref_doctype": ref_doctype}, "status" - ) - - event_producer.producer_doctypes = config - # when producer doc is updated it updates the consumer doc - # set flag to avoid deadlock - event_producer.incoming_change = True - consumer_site.update(event_producer) - - def get_consumer_status(self): - response = requests.get(self.callback_url) - if response.status_code != 200: - return "offline" - return "online" - - -@frappe.whitelist() -def register_consumer(data): - """create an event consumer document for registering a consumer""" - data = json.loads(data) - # to ensure that consumer is created only once - if frappe.db.exists("Event Consumer", data["event_consumer"]): - return None - - user = data["user"] - if not frappe.db.exists("User", user): - frappe.throw(_("User {0} not found on the producer site").format(user)) - - if "System Manager" not in frappe.get_roles(user): - frappe.throw(_("Event Subscriber has to be a System Manager.")) - - consumer = frappe.new_doc("Event Consumer") - consumer.callback_url = data["event_consumer"] - consumer.user = data["user"] - consumer.api_key = data["api_key"] - consumer.api_secret = data["api_secret"] - consumer.incoming_change = True - consumer_doctypes = json.loads(data["consumer_doctypes"]) - - for entry in consumer_doctypes: - consumer.append( - "consumer_doctypes", - {"ref_doctype": entry.get("doctype"), "status": "Pending", "condition": entry.get("condition")}, - ) - - consumer.insert() - - # consumer's 'last_update' field should point to the latest update - # in producer's update log when subscribing - # so that, updates after subscribing are consumed and not the old ones. - last_update = str(get_last_update()) - return json.dumps({"last_update": last_update}) - - -def get_consumer_site(consumer_url): - """create a FrappeClient object for event consumer site""" - consumer_doc = frappe.get_doc("Event Consumer", consumer_url) - consumer_site = FrappeClient( - url=consumer_url, - api_key=consumer_doc.api_key, - api_secret=consumer_doc.get_password("api_secret"), - ) - return consumer_site - - -def get_last_update(): - """get the creation timestamp of last update consumed""" - updates = frappe.get_list( - "Event Update Log", "creation", ignore_permissions=True, limit=1, order_by="creation desc" - ) - if updates: - return updates[0].creation - return frappe.utils.now_datetime() - - -@frappe.whitelist() -def notify_event_consumers(doctype): - """get all event consumers and set flag for notification status""" - event_consumers = frappe.get_all( - "Event Consumer Document Type", ["parent"], {"ref_doctype": doctype, "status": "Approved"} - ) - for entry in event_consumers: - consumer = frappe.get_doc("Event Consumer", entry.parent) - consumer.flags.notified = False - notify(consumer) - - -@frappe.whitelist() -def notify(consumer): - """notify individual event consumers about a new update""" - consumer_status = consumer.get_consumer_status() - if consumer_status == "online": - try: - client = get_consumer_site(consumer.callback_url) - client.post_request( - { - "cmd": "frappe.event_streaming.doctype.event_producer.event_producer.new_event_notification", - "producer_url": get_url(), - } - ) - consumer.flags.notified = True - except Exception: - consumer.flags.notified = False - else: - consumer.flags.notified = False - - # enqueue another job if the site was not notified - if not consumer.flags.notified: - enqueued_method = "frappe.event_streaming.doctype.event_consumer.event_consumer.notify" - jobs = get_jobs() - if not jobs or enqueued_method not in jobs[frappe.local.site] and not consumer.flags.notifed: - frappe.enqueue( - enqueued_method, queue="long", enqueue_after_commit=True, **{"consumer": consumer} - ) - - -def has_consumer_access(consumer, update_log): - """Checks if consumer has completely satisfied all the conditions on the doc""" - - if isinstance(consumer, str): - consumer = frappe.get_doc("Event Consumer", consumer) - - if not frappe.db.exists(update_log.ref_doctype, update_log.docname): - # Delete Log - # Check if the last Update Log of this document was read by this consumer - last_update_log = frappe.get_all( - "Event Update Log", - filters={ - "ref_doctype": update_log.ref_doctype, - "docname": update_log.docname, - "creation": ["<", update_log.creation], - }, - order_by="creation desc", - limit_page_length=1, - ) - if not len(last_update_log): - return False - - last_update_log = frappe.get_doc("Event Update Log", last_update_log[0].name) - return len([x for x in last_update_log.consumers if x.consumer == consumer.name]) - - doc = frappe.get_doc(update_log.ref_doctype, update_log.docname) - try: - for dt_entry in consumer.consumer_doctypes: - if dt_entry.ref_doctype != update_log.ref_doctype: - continue - - if not dt_entry.condition: - return True - - condition: str = dt_entry.condition - if condition.startswith("cmd:"): - cmd = condition.split("cmd:")[1].strip() - args = {"consumer": consumer, "doc": doc, "update_log": update_log} - return frappe.call(cmd, **args) - else: - return frappe.safe_eval(condition, frappe._dict(doc=doc)) - except Exception as e: - consumer.log_error("has_consumer_access error") - return False diff --git a/frappe/event_streaming/doctype/event_consumer/test_event_consumer.py b/frappe/event_streaming/doctype/event_consumer/test_event_consumer.py deleted file mode 100644 index 54bf718f17..0000000000 --- a/frappe/event_streaming/doctype/event_consumer/test_event_consumer.py +++ /dev/null @@ -1,8 +0,0 @@ -# Copyright (c) 2019, Frappe Technologies and Contributors -# License: MIT. See LICENSE -# import frappe -from frappe.tests.utils import FrappeTestCase - - -class TestEventConsumer(FrappeTestCase): - pass diff --git a/frappe/event_streaming/doctype/event_consumer_document_type/__init__.py b/frappe/event_streaming/doctype/event_consumer_document_type/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/frappe/event_streaming/doctype/event_consumer_document_type/event_consumer_document_type.json b/frappe/event_streaming/doctype/event_consumer_document_type/event_consumer_document_type.json deleted file mode 100644 index c243334a09..0000000000 --- a/frappe/event_streaming/doctype/event_consumer_document_type/event_consumer_document_type.json +++ /dev/null @@ -1,61 +0,0 @@ -{ - "actions": [], - "creation": "2019-10-03 21:10:54.754651", - "doctype": "DocType", - "editable_grid": 1, - "engine": "InnoDB", - "field_order": [ - "ref_doctype", - "status", - "unsubscribed", - "condition" - ], - "fields": [ - { - "columns": 4, - "fieldname": "ref_doctype", - "fieldtype": "Link", - "in_list_view": 1, - "label": "Document Type", - "options": "DocType", - "read_only": 1, - "reqd": 1 - }, - { - "columns": 4, - "default": "Pending", - "fieldname": "status", - "fieldtype": "Select", - "in_list_view": 1, - "label": "Approval Status", - "options": "Pending\nApproved\nRejected" - }, - { - "columns": 2, - "default": "0", - "fieldname": "unsubscribed", - "fieldtype": "Check", - "in_list_view": 1, - "label": "Unsubscribed", - "read_only": 1 - }, - { - "fieldname": "condition", - "fieldtype": "Code", - "label": "Condition", - "read_only": 1 - } - ], - "istable": 1, - "links": [], - "modified": "2020-11-07 09:26:49.894294", - "modified_by": "Administrator", - "module": "Event Streaming", - "name": "Event Consumer Document Type", - "owner": "Administrator", - "permissions": [], - "quick_entry": 1, - "sort_field": "modified", - "sort_order": "DESC", - "track_changes": 1 -} \ No newline at end of file diff --git a/frappe/event_streaming/doctype/event_consumer_document_type/event_consumer_document_type.py b/frappe/event_streaming/doctype/event_consumer_document_type/event_consumer_document_type.py deleted file mode 100644 index 1ed15c5a75..0000000000 --- a/frappe/event_streaming/doctype/event_consumer_document_type/event_consumer_document_type.py +++ /dev/null @@ -1,9 +0,0 @@ -# Copyright (c) 2019, Frappe Technologies and contributors -# License: MIT. See LICENSE - -# import frappe -from frappe.model.document import Document - - -class EventConsumerDocumentType(Document): - pass diff --git a/frappe/event_streaming/doctype/event_producer/__init__.py b/frappe/event_streaming/doctype/event_producer/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/frappe/event_streaming/doctype/event_producer/event_producer.js b/frappe/event_streaming/doctype/event_producer/event_producer.js deleted file mode 100644 index 23ca482433..0000000000 --- a/frappe/event_streaming/doctype/event_producer/event_producer.js +++ /dev/null @@ -1,25 +0,0 @@ -// Copyright (c) 2019, Frappe Technologies and contributors -// For license information, please see license.txt - -frappe.ui.form.on("Event Producer", { - refresh: function (frm) { - frm.set_query("ref_doctype", "producer_doctypes", function () { - return { - filters: { - issingle: 0, - istable: 0, - }, - }; - }); - - frm.set_indicator_formatter("status", function (doc) { - let indicator = "orange"; - if (doc.status == "Approved") { - indicator = "green"; - } else if (doc.status == "Rejected") { - indicator = "red"; - } - return indicator; - }); - }, -}); diff --git a/frappe/event_streaming/doctype/event_producer/event_producer.json b/frappe/event_streaming/doctype/event_producer/event_producer.json deleted file mode 100644 index d868f6c123..0000000000 --- a/frappe/event_streaming/doctype/event_producer/event_producer.json +++ /dev/null @@ -1,96 +0,0 @@ -{ - "actions": [], - "autoname": "field:producer_url", - "creation": "2019-08-26 19:17:24.919196", - "doctype": "DocType", - "editable_grid": 1, - "engine": "InnoDB", - "field_order": [ - "producer_url", - "producer_doctypes", - "section_break_3", - "api_key", - "api_secret", - "column_break_6", - "user", - "incoming_change" - ], - "fields": [ - { - "fieldname": "producer_url", - "fieldtype": "Data", - "in_list_view": 1, - "label": "Producer URL", - "reqd": 1, - "unique": 1 - }, - { - "description": "API Key of the user(Event Subscriber) on the producer site", - "fieldname": "api_key", - "fieldtype": "Data", - "label": "API Key", - "reqd": 1 - }, - { - "description": "API Secret of the user(Event Subscriber) on the producer site", - "fieldname": "api_secret", - "fieldtype": "Password", - "label": "API Secret", - "reqd": 1 - }, - { - "fieldname": "user", - "fieldtype": "Link", - "label": "Event Subscriber", - "options": "User", - "reqd": 1, - "set_only_once": 1 - }, - { - "fieldname": "column_break_6", - "fieldtype": "Column Break" - }, - { - "fieldname": "section_break_3", - "fieldtype": "Section Break" - }, - { - "default": "0", - "fieldname": "incoming_change", - "fieldtype": "Check", - "hidden": 1, - "label": "Incoming Change" - }, - { - "fieldname": "producer_doctypes", - "fieldtype": "Table", - "label": "Event Producer Document Types", - "options": "Event Producer Document Type", - "reqd": 1 - } - ], - "links": [], - "modified": "2020-10-26 13:00:15.361316", - "modified_by": "Administrator", - "module": "Event Streaming", - "name": "Event Producer", - "owner": "Administrator", - "permissions": [ - { - "create": 1, - "delete": 1, - "email": 1, - "export": 1, - "print": 1, - "read": 1, - "report": 1, - "role": "System Manager", - "share": 1, - "write": 1 - } - ], - "quick_entry": 1, - "sort_field": "modified", - "sort_order": "DESC", - "track_changes": 1 -} \ No newline at end of file diff --git a/frappe/event_streaming/doctype/event_producer/event_producer.py b/frappe/event_streaming/doctype/event_producer/event_producer.py deleted file mode 100644 index f91c8a4fd4..0000000000 --- a/frappe/event_streaming/doctype/event_producer/event_producer.py +++ /dev/null @@ -1,569 +0,0 @@ -# Copyright (c) 2019, Frappe Technologies and contributors -# License: MIT. See LICENSE - -import json -import time - -import requests - -import frappe -from frappe import _ -from frappe.custom.doctype.custom_field.custom_field import create_custom_field -from frappe.frappeclient import FrappeClient -from frappe.model.document import Document -from frappe.utils.background_jobs import get_jobs -from frappe.utils.data import get_link_to_form, get_url -from frappe.utils.password import get_decrypted_password - - -class EventProducer(Document): - def before_insert(self): - self.check_url() - self.validate_event_subscriber() - self.incoming_change = True - self.create_event_consumer() - self.create_custom_fields() - - def validate(self): - self.validate_event_subscriber() - if frappe.flags.in_test: - for entry in self.producer_doctypes: - entry.status = "Approved" - - def validate_event_subscriber(self): - if not frappe.db.get_value("User", self.user, "api_key"): - frappe.throw( - _("Please generate keys for the Event Subscriber User {0} first.").format( - frappe.bold(get_link_to_form("User", self.user)) - ) - ) - - def on_update(self): - if not self.incoming_change: - if frappe.db.exists("Event Producer", self.name): - if not self.api_key or not self.api_secret: - frappe.throw(_("Please set API Key and Secret on the producer and consumer sites first.")) - else: - doc_before_save = self.get_doc_before_save() - if doc_before_save.api_key != self.api_key or doc_before_save.api_secret != self.api_secret: - return - - self.update_event_consumer() - self.create_custom_fields() - else: - # when producer doc is updated it updates the consumer doc, set flag to avoid deadlock - self.db_set("incoming_change", 0) - self.reload() - - def on_trash(self): - last_update = frappe.db.get_value("Event Producer Last Update", dict(event_producer=self.name)) - if last_update: - frappe.delete_doc("Event Producer Last Update", last_update) - - def check_url(self): - valid_url_schemes = ("http", "https") - frappe.utils.validate_url(self.producer_url, throw=True, valid_schemes=valid_url_schemes) - - # remove '/' from the end of the url like http://test_site.com/ - # to prevent mismatch in get_url() results - if self.producer_url.endswith("/"): - self.producer_url = self.producer_url[:-1] - - def create_event_consumer(self): - """register event consumer on the producer site""" - if self.is_producer_online(): - producer_site = FrappeClient( - url=self.producer_url, api_key=self.api_key, api_secret=self.get_password("api_secret") - ) - - response = producer_site.post_api( - "frappe.event_streaming.doctype.event_consumer.event_consumer.register_consumer", - params={"data": json.dumps(self.get_request_data())}, - ) - if response: - response = json.loads(response) - self.set_last_update(response["last_update"]) - else: - frappe.throw( - _( - "Failed to create an Event Consumer or an Event Consumer for the current site is already registered." - ) - ) - - def set_last_update(self, last_update): - last_update_doc_name = frappe.db.get_value( - "Event Producer Last Update", dict(event_producer=self.name) - ) - if not last_update_doc_name: - frappe.get_doc( - dict( - doctype="Event Producer Last Update", - event_producer=self.producer_url, - last_update=last_update, - ) - ).insert(ignore_permissions=True) - else: - frappe.db.set_value( - "Event Producer Last Update", last_update_doc_name, "last_update", last_update - ) - - def get_last_update(self): - return frappe.db.get_value( - "Event Producer Last Update", dict(event_producer=self.name), "last_update" - ) - - def get_request_data(self): - consumer_doctypes = [] - for entry in self.producer_doctypes: - if entry.has_mapping: - # if mapping, subscribe to remote doctype on consumer's site - dt = frappe.db.get_value("Document Type Mapping", entry.mapping, "remote_doctype") - else: - dt = entry.ref_doctype - consumer_doctypes.append({"doctype": dt, "condition": entry.condition}) - - user_key = frappe.db.get_value("User", self.user, "api_key") - user_secret = get_decrypted_password("User", self.user, "api_secret") - return { - "event_consumer": get_url(), - "consumer_doctypes": json.dumps(consumer_doctypes), - "user": self.user, - "api_key": user_key, - "api_secret": user_secret, - } - - def create_custom_fields(self): - """create custom field to store remote docname and remote site url""" - for entry in self.producer_doctypes: - if not entry.use_same_name: - if not frappe.db.exists( - "Custom Field", {"fieldname": "remote_docname", "dt": entry.ref_doctype} - ): - df = dict( - fieldname="remote_docname", - label="Remote Document Name", - fieldtype="Data", - read_only=1, - print_hide=1, - ) - create_custom_field(entry.ref_doctype, df) - if not frappe.db.exists( - "Custom Field", {"fieldname": "remote_site_name", "dt": entry.ref_doctype} - ): - df = dict( - fieldname="remote_site_name", - label="Remote Site", - fieldtype="Data", - read_only=1, - print_hide=1, - ) - create_custom_field(entry.ref_doctype, df) - - def update_event_consumer(self): - if self.is_producer_online(): - producer_site = get_producer_site(self.producer_url) - event_consumer = producer_site.get_doc("Event Consumer", get_url()) - event_consumer = frappe._dict(event_consumer) - if event_consumer: - config = event_consumer.consumer_doctypes - event_consumer.consumer_doctypes = [] - for entry in self.producer_doctypes: - if entry.has_mapping: - # if mapping, subscribe to remote doctype on consumer's site - ref_doctype = frappe.db.get_value("Document Type Mapping", entry.mapping, "remote_doctype") - else: - ref_doctype = entry.ref_doctype - - event_consumer.consumer_doctypes.append( - { - "ref_doctype": ref_doctype, - "status": get_approval_status(config, ref_doctype), - "unsubscribed": entry.unsubscribe, - "condition": entry.condition, - } - ) - event_consumer.user = self.user - event_consumer.incoming_change = True - producer_site.update(event_consumer) - - def is_producer_online(self): - """check connection status for the Event Producer site""" - retry = 3 - while retry > 0: - res = requests.get(self.producer_url) - if res.status_code == 200: - return True - retry -= 1 - time.sleep(5) - frappe.throw(_("Failed to connect to the Event Producer site. Retry after some time.")) - - -def get_producer_site(producer_url): - """create a FrappeClient object for event producer site""" - producer_doc = frappe.get_doc("Event Producer", producer_url) - producer_site = FrappeClient( - url=producer_url, - api_key=producer_doc.api_key, - api_secret=producer_doc.get_password("api_secret"), - ) - return producer_site - - -def get_approval_status(config, ref_doctype): - """check the approval status for consumption""" - for entry in config: - if entry.get("ref_doctype") == ref_doctype: - return entry.get("status") - return "Pending" - - -@frappe.whitelist() -def pull_producer_data(): - """Fetch data from producer node.""" - response = requests.get(get_url()) - if response.status_code == 200: - for event_producer in frappe.get_all("Event Producer"): - pull_from_node(event_producer.name) - return "success" - return None - - -@frappe.whitelist() -def pull_from_node(event_producer): - """pull all updates after the last update timestamp from event producer site""" - event_producer = frappe.get_doc("Event Producer", event_producer) - producer_site = get_producer_site(event_producer.producer_url) - last_update = event_producer.get_last_update() - - (doctypes, mapping_config, naming_config) = get_config(event_producer.producer_doctypes) - - updates = get_updates(producer_site, last_update, doctypes) - - for update in updates: - update.use_same_name = naming_config.get(update.ref_doctype) - mapping = mapping_config.get(update.ref_doctype) - if mapping: - update.mapping = mapping - update = get_mapped_update(update, producer_site) - if not update.update_type == "Delete": - update.data = json.loads(update.data) - - sync(update, producer_site, event_producer) - - -def get_config(event_config): - """get the doctype mapping and naming configurations for consumption""" - doctypes, mapping_config, naming_config = [], {}, {} - - for entry in event_config: - if entry.status == "Approved": - if entry.has_mapping: - (mapped_doctype, mapping) = frappe.db.get_value( - "Document Type Mapping", entry.mapping, ["remote_doctype", "name"] - ) - mapping_config[mapped_doctype] = mapping - naming_config[mapped_doctype] = entry.use_same_name - doctypes.append(mapped_doctype) - else: - naming_config[entry.ref_doctype] = entry.use_same_name - doctypes.append(entry.ref_doctype) - return (doctypes, mapping_config, naming_config) - - -def sync(update, producer_site, event_producer, in_retry=False): - """Sync the individual update""" - try: - if update.update_type == "Create": - set_insert(update, producer_site, event_producer.name) - if update.update_type == "Update": - set_update(update, producer_site) - if update.update_type == "Delete": - set_delete(update) - if in_retry: - return "Synced" - log_event_sync(update, event_producer.name, "Synced") - - except Exception: - if in_retry: - if frappe.flags.in_test: - print(frappe.get_traceback()) - return "Failed" - log_event_sync(update, event_producer.name, "Failed", frappe.get_traceback()) - - event_producer.set_last_update(update.creation) - frappe.db.commit() - - -def set_insert(update, producer_site, event_producer): - """Sync insert type update""" - if frappe.db.get_value(update.ref_doctype, update.docname): - # doc already created - return - doc = frappe.get_doc(update.data) - - if update.mapping: - if update.get("dependencies"): - dependencies_created = sync_mapped_dependencies(update.dependencies, producer_site) - for fieldname, value in dependencies_created.items(): - doc.update({fieldname: value}) - else: - sync_dependencies(doc, producer_site) - - if update.use_same_name: - doc.insert(set_name=update.docname, set_child_names=False) - else: - # if event consumer is not saving documents with the same name as the producer - # store the remote docname in a custom field for future updates - doc.remote_docname = update.docname - doc.remote_site_name = event_producer - doc.insert(set_child_names=False) - - -def set_update(update, producer_site): - """Sync update type update""" - local_doc = get_local_doc(update) - if local_doc: - data = frappe._dict(update.data) - - if data.changed: - local_doc.update(data.changed) - if data.removed: - local_doc = update_row_removed(local_doc, data.removed) - if data.row_changed: - update_row_changed(local_doc, data.row_changed) - if data.added: - local_doc = update_row_added(local_doc, data.added) - - if update.mapping: - if update.get("dependencies"): - dependencies_created = sync_mapped_dependencies(update.dependencies, producer_site) - for fieldname, value in dependencies_created.items(): - local_doc.update({fieldname: value}) - else: - sync_dependencies(local_doc, producer_site) - - local_doc.save() - local_doc.db_update_all() - - -def update_row_removed(local_doc, removed): - """Sync child table row deletion type update""" - for tablename, rownames in removed.items(): - table = local_doc.get_table_field_doctype(tablename) - for row in rownames: - table_rows = local_doc.get(tablename) - child_table_row = get_child_table_row(table_rows, row) - table_rows.remove(child_table_row) - local_doc.set(tablename, table_rows) - return local_doc - - -def get_child_table_row(table_rows, row): - for entry in table_rows: - if entry.get("name") == row: - return entry - - -def update_row_changed(local_doc, changed): - """Sync child table row updation type update""" - for tablename, rows in changed.items(): - old = local_doc.get(tablename) - for doc in old: - for row in rows: - if row["name"] == doc.get("name"): - doc.update(row) - - -def update_row_added(local_doc, added): - """Sync child table row addition type update""" - for tablename, rows in added.items(): - local_doc.extend(tablename, rows) - for child in rows: - child_doc = frappe.get_doc(child) - child_doc.parent = local_doc.name - child_doc.parenttype = local_doc.doctype - child_doc.insert(set_name=child_doc.name) - return local_doc - - -def set_delete(update): - """Sync delete type update""" - local_doc = get_local_doc(update) - if local_doc: - local_doc.delete() - - -def get_updates(producer_site, last_update, doctypes): - """Get all updates generated after the last update timestamp""" - docs = producer_site.post_request( - { - "cmd": "frappe.event_streaming.doctype.event_update_log.event_update_log.get_update_logs_for_consumer", - "event_consumer": get_url(), - "doctypes": frappe.as_json(doctypes), - "last_update": last_update, - } - ) - return [frappe._dict(d) for d in (docs or [])] - - -def get_local_doc(update): - """Get the local document if created with a different name""" - try: - if not update.use_same_name: - return frappe.get_doc(update.ref_doctype, {"remote_docname": update.docname}) - return frappe.get_doc(update.ref_doctype, update.docname) - except frappe.DoesNotExistError: - return None - - -def sync_dependencies(document, producer_site): - """ - dependencies is a dictionary to store all the docs - having dependencies and their sync status, - which is shared among all nested functions. - """ - dependencies = {document: True} - - def check_doc_has_dependencies(doc, producer_site): - """Sync child table link fields first, - then sync link fields, - then dynamic links""" - meta = frappe.get_meta(doc.doctype) - table_fields = meta.get_table_fields() - link_fields = meta.get_link_fields() - dl_fields = meta.get_dynamic_link_fields() - if table_fields: - sync_child_table_dependencies(doc, table_fields, producer_site) - if link_fields: - sync_link_dependencies(doc, link_fields, producer_site) - if dl_fields: - sync_dynamic_link_dependencies(doc, dl_fields, producer_site) - - def sync_child_table_dependencies(doc, table_fields, producer_site): - for df in table_fields: - child_table = doc.get(df.fieldname) - for entry in child_table: - child_doc = producer_site.get_doc(entry.doctype, entry.name) - if child_doc: - child_doc = frappe._dict(child_doc) - set_dependencies(child_doc, frappe.get_meta(entry.doctype).get_link_fields(), producer_site) - - def sync_link_dependencies(doc, link_fields, producer_site): - set_dependencies(doc, link_fields, producer_site) - - def sync_dynamic_link_dependencies(doc, dl_fields, producer_site): - for df in dl_fields: - docname = doc.get(df.fieldname) - linked_doctype = doc.get(df.options) - if docname and not check_dependency_fulfilled(linked_doctype, docname): - master_doc = producer_site.get_doc(linked_doctype, docname) - frappe.get_doc(master_doc).insert(set_name=docname) - - def set_dependencies(doc, link_fields, producer_site): - for df in link_fields: - docname = doc.get(df.fieldname) - linked_doctype = df.get_link_doctype() - if docname and not check_dependency_fulfilled(linked_doctype, docname): - master_doc = producer_site.get_doc(linked_doctype, docname) - try: - master_doc = frappe.get_doc(master_doc) - master_doc.insert(set_name=docname) - frappe.db.commit() - - # for dependency inside a dependency - except Exception: - dependencies[master_doc] = True - - def check_dependency_fulfilled(linked_doctype, docname): - return frappe.db.exists(linked_doctype, docname) - - while dependencies[document]: - # find the first non synced dependency - for item in reversed(list(dependencies.keys())): - if dependencies[item]: - dependency = item - break - - check_doc_has_dependencies(dependency, producer_site) - - # mark synced for nested dependency - if dependency != document: - dependencies[dependency] = False - dependency.insert() - - # no more dependencies left to be synced, the main doc is ready to be synced - # end the dependency loop - if not any(list(dependencies.values())[1:]): - dependencies[document] = False - - -def sync_mapped_dependencies(dependencies, producer_site): - dependencies_created = {} - for entry in dependencies: - doc = frappe._dict(json.loads(entry[1])) - docname = frappe.db.exists(doc.doctype, doc.name) - if not docname: - doc = frappe.get_doc(doc).insert(set_child_names=False) - dependencies_created[entry[0]] = doc.name - else: - dependencies_created[entry[0]] = docname - - return dependencies_created - - -def log_event_sync(update, event_producer, sync_status, error=None): - """Log event update received with the sync_status as Synced or Failed""" - doc = frappe.new_doc("Event Sync Log") - doc.update_type = update.update_type - doc.ref_doctype = update.ref_doctype - doc.status = sync_status - doc.event_producer = event_producer - doc.producer_doc = update.docname - doc.data = frappe.as_json(update.data) - doc.use_same_name = update.use_same_name - doc.mapping = update.mapping if update.mapping else None - if update.use_same_name: - doc.docname = update.docname - else: - doc.docname = frappe.db.get_value(update.ref_doctype, {"remote_docname": update.docname}, "name") - if error: - doc.error = error - doc.insert() - - -def get_mapped_update(update, producer_site): - """get the new update document with mapped fields""" - mapping = frappe.get_doc("Document Type Mapping", update.mapping) - if update.update_type == "Create": - doc = frappe._dict(json.loads(update.data)) - mapped_update = mapping.get_mapping(doc, producer_site, update.update_type) - update.data = mapped_update.get("doc") - update.dependencies = mapped_update.get("dependencies", None) - elif update.update_type == "Update": - mapped_update = mapping.get_mapped_update(update, producer_site) - update.data = mapped_update.get("doc") - update.dependencies = mapped_update.get("dependencies", None) - - update["ref_doctype"] = mapping.local_doctype - return update - - -@frappe.whitelist() -def new_event_notification(producer_url): - """Pull data from producer when notified""" - enqueued_method = "frappe.event_streaming.doctype.event_producer.event_producer.pull_from_node" - jobs = get_jobs() - if not jobs or enqueued_method not in jobs[frappe.local.site]: - frappe.enqueue(enqueued_method, queue="default", **{"event_producer": producer_url}) - - -@frappe.whitelist() -def resync(update): - """Retry syncing update if failed""" - update = frappe._dict(json.loads(update)) - producer_site = get_producer_site(update.event_producer) - event_producer = frappe.get_doc("Event Producer", update.event_producer) - if update.mapping: - update = get_mapped_update(update, producer_site) - update.data = json.loads(update.data) - return sync(update, producer_site, event_producer, in_retry=True) diff --git a/frappe/event_streaming/doctype/event_producer/test_event_producer.py b/frappe/event_streaming/doctype/event_producer/test_event_producer.py deleted file mode 100644 index 70e7483f92..0000000000 --- a/frappe/event_streaming/doctype/event_producer/test_event_producer.py +++ /dev/null @@ -1,438 +0,0 @@ -# Copyright (c) 2019, Frappe Technologies and Contributors -# License: MIT. See LICENSE -import json - -import frappe -from frappe.core.doctype.user.user import generate_keys -from frappe.event_streaming.doctype.event_producer.event_producer import pull_from_node -from frappe.frappeclient import FrappeClient -from frappe.query_builder.utils import db_type_is -from frappe.tests.test_query_builder import run_only_if -from frappe.tests.utils import FrappeTestCase - -producer_url = "http://test_site_producer:8000" - - -class TestEventProducer(FrappeTestCase): - def setUp(self): - create_event_producer(producer_url) - - def tearDown(self): - unsubscribe_doctypes(producer_url) - - def test_insert(self): - producer = get_remote_site() - producer_doc = insert_into_producer(producer, "test creation 1 sync") - self.pull_producer_data() - self.assertTrue(frappe.db.exists("ToDo", producer_doc.name)) - - def test_update(self): - producer = get_remote_site() - producer_doc = insert_into_producer(producer, "test update 1") - producer_doc["description"] = "test update 2" - producer_doc = producer.update(producer_doc) - self.pull_producer_data() - local_doc = frappe.get_doc(producer_doc.doctype, producer_doc.name) - self.assertEqual(local_doc.description, producer_doc.description) - - def test_delete(self): - producer = get_remote_site() - producer_doc = insert_into_producer(producer, "test delete sync") - self.pull_producer_data() - self.assertTrue(frappe.db.exists("ToDo", producer_doc.name)) - producer.delete("ToDo", producer_doc.name) - self.pull_producer_data() - self.assertFalse(frappe.db.exists("ToDo", producer_doc.name)) - - @run_only_if(db_type_is.MARIADB) - def test_multiple_doctypes_sync(self): - # TODO: This test is extremely flaky with Postgres. Rewrite this! - producer = get_remote_site() - - # insert todo and note in producer - producer_todo = insert_into_producer(producer, "test multiple doc sync") - producer_note1 = frappe._dict(doctype="Note", title="test multiple doc sync 1") - delete_on_remote_if_exists(producer, "Note", {"title": producer_note1["title"]}) - frappe.db.delete("Note", {"title": producer_note1["title"]}) - producer_note1 = producer.insert(producer_note1) - producer_note2 = frappe._dict(doctype="Note", title="test multiple doc sync 2") - delete_on_remote_if_exists(producer, "Note", {"title": producer_note2["title"]}) - frappe.db.delete("Note", {"title": producer_note2["title"]}) - producer_note2 = producer.insert(producer_note2) - - # update in producer - producer_todo["description"] = "test multiple doc update sync" - producer_todo = producer.update(producer_todo) - producer_note1["content"] = "testing update sync" - producer_note1 = producer.update(producer_note1) - - producer.delete("Note", producer_note2.name) - - self.pull_producer_data() - - # check inserted - self.assertTrue(frappe.db.exists("ToDo", producer_todo.name)) - - # check update - local_todo = frappe.get_doc("ToDo", producer_todo.name) - self.assertEqual(local_todo.description, producer_todo.description) - local_note1 = frappe.get_doc("Note", producer_note1.name) - self.assertEqual(local_note1.content, producer_note1.content) - - # check delete - self.assertFalse(frappe.db.exists("Note", producer_note2.name)) - - def test_child_table_sync_with_dependencies(self): - producer = get_remote_site() - producer_user = frappe._dict( - doctype="User", - email="test_user@sync.com", - send_welcome_email=0, - first_name="Test Sync User", - enabled=1, - roles=[{"role": "System Manager"}], - ) - delete_on_remote_if_exists(producer, "User", {"email": producer_user.email}) - frappe.db.delete("User", {"email": producer_user.email}) - producer_user = producer.insert(producer_user) - - producer_note = frappe._dict( - doctype="Note", title="test child table dependency sync", seen_by=[{"user": producer_user.name}] - ) - delete_on_remote_if_exists(producer, "Note", {"title": producer_note.title}) - frappe.db.delete("Note", {"title": producer_note.title}) - producer_note = producer.insert(producer_note) - - self.pull_producer_data() - self.assertTrue(frappe.db.exists("User", producer_user.name)) - if self.assertTrue(frappe.db.exists("Note", producer_note.name)): - local_note = frappe.get_doc("Note", producer_note.name) - self.assertEqual(len(local_note.seen_by), 1) - - def test_dynamic_link_dependencies_synced(self): - producer = get_remote_site() - # unsubscribe for Note to check whether dependency is fulfilled - event_producer = frappe.get_doc("Event Producer", producer_url, for_update=True) - event_producer.producer_doctypes = [] - event_producer.append("producer_doctypes", {"ref_doctype": "ToDo", "use_same_name": 1}) - event_producer.save() - - producer_link_doc = frappe._dict(doctype="Note", title="Test Dynamic Link 1") - - delete_on_remote_if_exists(producer, "Note", {"title": producer_link_doc.title}) - frappe.db.delete("Note", {"title": producer_link_doc.title}) - producer_link_doc = producer.insert(producer_link_doc) - producer_doc = frappe._dict( - doctype="ToDo", - description="Test Dynamic Link 2", - assigned_by="Administrator", - reference_type="Note", - reference_name=producer_link_doc.name, - ) - producer_doc = producer.insert(producer_doc) - - self.pull_producer_data() - - # check dynamic link dependency created - self.assertTrue(frappe.db.exists("Note", producer_link_doc.name)) - self.assertEqual( - producer_link_doc.name, frappe.db.get_value("ToDo", producer_doc.name, "reference_name") - ) - - reset_configuration(producer_url) - - def test_naming_configuration(self): - # test with use_same_name = 0 - producer = get_remote_site() - event_producer = frappe.get_doc("Event Producer", producer_url, for_update=True) - event_producer.producer_doctypes = [] - event_producer.append("producer_doctypes", {"ref_doctype": "ToDo", "use_same_name": 0}) - event_producer.save() - - producer_doc = insert_into_producer(producer, "test different name sync") - self.pull_producer_data() - self.assertTrue( - frappe.db.exists( - "ToDo", {"remote_docname": producer_doc.name, "remote_site_name": producer_url} - ) - ) - - reset_configuration(producer_url) - - def test_conditional_events(self): - producer = get_remote_site() - - # Add Condition - event_producer = frappe.get_doc("Event Producer", producer_url) - note_producer_entry = [x for x in event_producer.producer_doctypes if x.ref_doctype == "Note"][0] - note_producer_entry.condition = "doc.public == 1" - event_producer.save() - - # Make test doc - producer_note1 = frappe._dict(doctype="Note", public=0, title="test conditional sync") - delete_on_remote_if_exists(producer, "Note", {"title": producer_note1["title"]}) - producer_note1 = producer.insert(producer_note1) - - # Make Update - producer_note1["content"] = "Test Conditional Sync Content" - producer_note1 = producer.update(producer_note1) - - self.pull_producer_data() - - # Check if synced here - self.assertFalse(frappe.db.exists("Note", producer_note1.name)) - - # Lets satisfy the condition - producer_note1["public"] = 1 - producer_note1 = producer.update(producer_note1) - - self.pull_producer_data() - - # it should sync now - self.assertTrue(frappe.db.exists("Note", producer_note1.name)) - local_note = frappe.get_doc("Note", producer_note1.name) - self.assertEqual(local_note.content, producer_note1.content) - - reset_configuration(producer_url) - - def test_conditional_events_with_cmd(self): - producer = get_remote_site() - - # Add Condition - event_producer = frappe.get_doc("Event Producer", producer_url) - note_producer_entry = [x for x in event_producer.producer_doctypes if x.ref_doctype == "Note"][0] - note_producer_entry.condition = ( - "cmd: frappe.event_streaming.doctype.event_producer.test_event_producer.can_sync_note" - ) - event_producer.save() - - # Make test doc - producer_note1 = frappe._dict(doctype="Note", public=0, title="test conditional sync cmd") - delete_on_remote_if_exists(producer, "Note", {"title": producer_note1["title"]}) - producer_note1 = producer.insert(producer_note1) - - # Make Update - producer_note1["content"] = "Test Conditional Sync Content" - producer_note1 = producer.update(producer_note1) - - self.pull_producer_data() - - # Check if synced here - self.assertFalse(frappe.db.exists("Note", producer_note1.name)) - - # Lets satisfy the condition - producer_note1["public"] = 1 - producer_note1 = producer.update(producer_note1) - - self.pull_producer_data() - - # it should sync now - self.assertTrue(frappe.db.exists("Note", producer_note1.name)) - local_note = frappe.get_doc("Note", producer_note1.name) - self.assertEqual(local_note.content, producer_note1.content) - - reset_configuration(producer_url) - - def test_update_log(self): - producer = get_remote_site() - producer_doc = insert_into_producer(producer, "test update log") - update_log_doc = producer.get_value( - "Event Update Log", "docname", {"docname": producer_doc.get("name")} - ) - self.assertEqual(update_log_doc.get("docname"), producer_doc.get("name")) - - def test_event_sync_log(self): - producer = get_remote_site() - producer_doc = insert_into_producer(producer, "test event sync log") - self.pull_producer_data() - self.assertTrue(frappe.db.exists("Event Sync Log", {"docname": producer_doc.name})) - - def pull_producer_data(self): - pull_from_node(producer_url) - - def test_mapping(self): - producer = get_remote_site() - event_producer = frappe.get_doc("Event Producer", producer_url, for_update=True) - event_producer.producer_doctypes = [] - mapping = [{"local_fieldname": "description", "remote_fieldname": "content"}] - event_producer.append( - "producer_doctypes", - { - "ref_doctype": "ToDo", - "use_same_name": 1, - "has_mapping": 1, - "mapping": get_mapping("ToDo to Note", "ToDo", "Note", mapping), - }, - ) - event_producer.save() - - producer_note = frappe._dict(doctype="Note", title="Test Mapping", content="Test Mapping") - delete_on_remote_if_exists(producer, "Note", {"title": producer_note.title}) - producer_note = producer.insert(producer_note) - self.pull_producer_data() - # check inserted - self.assertTrue(frappe.db.exists("ToDo", {"description": producer_note.content})) - - # update in producer - producer_note["content"] = "test mapped doc update sync" - producer_note = producer.update(producer_note) - self.pull_producer_data() - - # check updated - self.assertTrue(frappe.db.exists("ToDo", {"description": producer_note["content"]})) - - producer.delete("Note", producer_note.name) - self.pull_producer_data() - # check delete - self.assertFalse(frappe.db.exists("ToDo", {"description": producer_note.content})) - - reset_configuration(producer_url) - - def test_inner_mapping(self): - producer = get_remote_site() - - setup_event_producer_for_inner_mapping() - producer_note = frappe._dict( - doctype="Note", title="Inner Mapping Tester", content="Test Inner Mapping" - ) - delete_on_remote_if_exists(producer, "Note", {"title": producer_note.title}) - producer_note = producer.insert(producer_note) - self.pull_producer_data() - - # check dependency inserted - self.assertTrue(frappe.db.exists("Role", {"role_name": producer_note.title})) - # check doc inserted - self.assertTrue(frappe.db.exists("ToDo", {"description": producer_note.content})) - - reset_configuration(producer_url) - - -def can_sync_note(consumer, doc, update_log): - return doc.public == 1 - - -def setup_event_producer_for_inner_mapping(): - event_producer = frappe.get_doc("Event Producer", producer_url, for_update=True) - event_producer.producer_doctypes = [] - inner_mapping = [{"local_fieldname": "role_name", "remote_fieldname": "title"}] - inner_map = get_mapping("Role to Note Dependency Creation", "Role", "Note", inner_mapping) - mapping = [ - { - "local_fieldname": "description", - "remote_fieldname": "content", - }, - { - "local_fieldname": "role", - "remote_fieldname": "title", - "mapping_type": "Document", - "mapping": inner_map, - "remote_value_filters": json.dumps({"title": "title"}), - }, - ] - event_producer.append( - "producer_doctypes", - { - "ref_doctype": "ToDo", - "use_same_name": 1, - "has_mapping": 1, - "mapping": get_mapping("ToDo to Note Mapping", "ToDo", "Note", mapping), - }, - ) - event_producer.save() - return event_producer - - -def insert_into_producer(producer, description): - # create and insert todo on remote site - todo = dict(doctype="ToDo", description=description, assigned_by="Administrator") - return producer.insert(todo) - - -def delete_on_remote_if_exists(producer, doctype, filters): - remote_doc = producer.get_value(doctype, "name", filters) - if remote_doc: - producer.delete(doctype, remote_doc.get("name")) - - -def get_mapping(mapping_name, local, remote, field_map): - name = frappe.db.exists("Document Type Mapping", mapping_name) - if name: - doc = frappe.get_doc("Document Type Mapping", name) - else: - doc = frappe.new_doc("Document Type Mapping") - - doc.mapping_name = mapping_name - doc.local_doctype = local - doc.remote_doctype = remote - for entry in field_map: - doc.append("field_mapping", entry) - doc.save() - return doc.name - - -def create_event_producer(producer_url): - if frappe.db.exists("Event Producer", producer_url): - event_producer = frappe.get_doc("Event Producer", producer_url) - for entry in event_producer.producer_doctypes: - entry.unsubscribe = 0 - event_producer.save() - return - - generate_keys("Administrator") - - producer_site = connect() - - response = producer_site.post_api( - "frappe.core.doctype.user.user.generate_keys", params={"user": "Administrator"} - ) - - api_secret = response.get("api_secret") - - response = producer_site.get_value("User", "api_key", {"name": "Administrator"}) - api_key = response.get("api_key") - - event_producer = frappe.new_doc("Event Producer") - event_producer.producer_doctypes = [] - event_producer.producer_url = producer_url - event_producer.append("producer_doctypes", {"ref_doctype": "ToDo", "use_same_name": 1}) - event_producer.append("producer_doctypes", {"ref_doctype": "Note", "use_same_name": 1}) - event_producer.user = "Administrator" - event_producer.api_key = api_key - event_producer.api_secret = api_secret - event_producer.save() - - -def reset_configuration(producer_url): - event_producer = frappe.get_doc("Event Producer", producer_url, for_update=True) - event_producer.producer_doctypes = [] - event_producer.conditions = [] - event_producer.producer_url = producer_url - event_producer.append("producer_doctypes", {"ref_doctype": "ToDo", "use_same_name": 1}) - event_producer.append("producer_doctypes", {"ref_doctype": "Note", "use_same_name": 1}) - event_producer.user = "Administrator" - event_producer.save() - - -def get_remote_site(): - producer_doc = frappe.get_doc("Event Producer", producer_url) - producer_site = FrappeClient( - url=producer_doc.producer_url, username="Administrator", password="admin", verify=False - ) - return producer_site - - -def unsubscribe_doctypes(producer_url): - event_producer = frappe.get_doc("Event Producer", producer_url) - for entry in event_producer.producer_doctypes: - entry.unsubscribe = 1 - event_producer.save() - - -def connect(): - def _connect(): - return FrappeClient(url=producer_url, username="Administrator", password="admin", verify=False) - - try: - return _connect() - except Exception: - return _connect() diff --git a/frappe/event_streaming/doctype/event_producer_document_type/__init__.py b/frappe/event_streaming/doctype/event_producer_document_type/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/frappe/event_streaming/doctype/event_producer_document_type/event_producer_document_type.json b/frappe/event_streaming/doctype/event_producer_document_type/event_producer_document_type.json deleted file mode 100644 index 17fd51d12d..0000000000 --- a/frappe/event_streaming/doctype/event_producer_document_type/event_producer_document_type.json +++ /dev/null @@ -1,86 +0,0 @@ -{ - "actions": [], - "creation": "2019-10-03 21:08:25.890352", - "doctype": "DocType", - "editable_grid": 1, - "engine": "InnoDB", - "field_order": [ - "ref_doctype", - "status", - "use_same_name", - "unsubscribe", - "has_mapping", - "mapping", - "condition" - ], - "fields": [ - { - "columns": 3, - "fieldname": "ref_doctype", - "fieldtype": "Link", - "in_list_view": 1, - "label": "Document Type", - "options": "DocType", - "reqd": 1, - "set_only_once": 1 - }, - { - "default": "0", - "description": "If the document has different field names on the Producer and Consumer's end check this and set up the Mapping", - "fieldname": "has_mapping", - "fieldtype": "Check", - "label": "Has Mapping" - }, - { - "depends_on": "eval: doc.has_mapping", - "fieldname": "mapping", - "fieldtype": "Link", - "label": "Mapping", - "options": "Document Type Mapping" - }, - { - "columns": 2, - "default": "0", - "description": "If this is checked the documents will have the same name as they have on the Event Producer's site", - "fieldname": "use_same_name", - "fieldtype": "Check", - "in_list_view": 1, - "label": "Use Same Name" - }, - { - "columns": 3, - "default": "Pending", - "fieldname": "status", - "fieldtype": "Select", - "in_list_view": 1, - "label": "Approval Status", - "options": "Pending\nApproved\nRejected", - "read_only": 1 - }, - { - "columns": 2, - "default": "0", - "fieldname": "unsubscribe", - "fieldtype": "Check", - "in_list_view": 1, - "label": "Unsubscribe" - }, - { - "fieldname": "condition", - "fieldtype": "Code", - "label": "Condition" - } - ], - "istable": 1, - "links": [], - "modified": "2020-11-07 09:26:58.463868", - "modified_by": "Administrator", - "module": "Event Streaming", - "name": "Event Producer Document Type", - "owner": "Administrator", - "permissions": [], - "quick_entry": 1, - "sort_field": "modified", - "sort_order": "DESC", - "track_changes": 1 -} \ No newline at end of file diff --git a/frappe/event_streaming/doctype/event_producer_document_type/event_producer_document_type.py b/frappe/event_streaming/doctype/event_producer_document_type/event_producer_document_type.py deleted file mode 100644 index 8f4c936792..0000000000 --- a/frappe/event_streaming/doctype/event_producer_document_type/event_producer_document_type.py +++ /dev/null @@ -1,9 +0,0 @@ -# Copyright (c) 2019, Frappe Technologies and contributors -# License: MIT. See LICENSE - -# import frappe -from frappe.model.document import Document - - -class EventProducerDocumentType(Document): - pass diff --git a/frappe/event_streaming/doctype/event_producer_last_update/__init__.py b/frappe/event_streaming/doctype/event_producer_last_update/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/frappe/event_streaming/doctype/event_producer_last_update/event_producer_last_update.js b/frappe/event_streaming/doctype/event_producer_last_update/event_producer_last_update.js deleted file mode 100644 index 6d18be43e3..0000000000 --- a/frappe/event_streaming/doctype/event_producer_last_update/event_producer_last_update.js +++ /dev/null @@ -1,7 +0,0 @@ -// Copyright (c) 2020, Frappe Technologies and contributors -// For license information, please see license.txt - -frappe.ui.form.on("Event Producer Last Update", { - // refresh: function(frm) { - // } -}); diff --git a/frappe/event_streaming/doctype/event_producer_last_update/event_producer_last_update.json b/frappe/event_streaming/doctype/event_producer_last_update/event_producer_last_update.json deleted file mode 100644 index 27f8ed2f81..0000000000 --- a/frappe/event_streaming/doctype/event_producer_last_update/event_producer_last_update.json +++ /dev/null @@ -1,53 +0,0 @@ -{ - "actions": [], - "autoname": "field:event_producer", - "creation": "2020-10-26 12:53:11.940177", - "doctype": "DocType", - "editable_grid": 1, - "engine": "InnoDB", - "field_order": [ - "event_producer", - "last_update" - ], - "fields": [ - { - "fieldname": "event_producer", - "fieldtype": "Data", - "in_list_view": 1, - "label": "Event Producer", - "reqd": 1, - "unique": 1 - }, - { - "fieldname": "last_update", - "fieldtype": "Data", - "label": "Last Update" - } - ], - "in_create": 1, - "index_web_pages_for_search": 1, - "links": [], - "modified": "2020-10-26 13:22:27.056599", - "modified_by": "Administrator", - "module": "Event Streaming", - "name": "Event Producer Last Update", - "owner": "Administrator", - "permissions": [ - { - "create": 1, - "delete": 1, - "email": 1, - "export": 1, - "print": 1, - "read": 1, - "report": 1, - "role": "System Manager", - "share": 1, - "write": 1 - } - ], - "read_only": 1, - "sort_field": "modified", - "sort_order": "DESC", - "track_changes": 1 -} \ No newline at end of file diff --git a/frappe/event_streaming/doctype/event_producer_last_update/event_producer_last_update.py b/frappe/event_streaming/doctype/event_producer_last_update/event_producer_last_update.py deleted file mode 100644 index ec5cee7e78..0000000000 --- a/frappe/event_streaming/doctype/event_producer_last_update/event_producer_last_update.py +++ /dev/null @@ -1,9 +0,0 @@ -# Copyright (c) 2020, Frappe Technologies and contributors -# License: MIT. See LICENSE - -# import frappe -from frappe.model.document import Document - - -class EventProducerLastUpdate(Document): - pass diff --git a/frappe/event_streaming/doctype/event_producer_last_update/test_event_producer_last_update.py b/frappe/event_streaming/doctype/event_producer_last_update/test_event_producer_last_update.py deleted file mode 100644 index 3e68159790..0000000000 --- a/frappe/event_streaming/doctype/event_producer_last_update/test_event_producer_last_update.py +++ /dev/null @@ -1,8 +0,0 @@ -# Copyright (c) 2020, Frappe Technologies and Contributors -# License: MIT. See LICENSE -# import frappe -from frappe.tests.utils import FrappeTestCase - - -class TestEventProducerLastUpdate(FrappeTestCase): - pass diff --git a/frappe/event_streaming/doctype/event_sync_log/__init__.py b/frappe/event_streaming/doctype/event_sync_log/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/frappe/event_streaming/doctype/event_sync_log/event_sync_log.js b/frappe/event_streaming/doctype/event_sync_log/event_sync_log.js deleted file mode 100644 index 7cc3198bae..0000000000 --- a/frappe/event_streaming/doctype/event_sync_log/event_sync_log.js +++ /dev/null @@ -1,24 +0,0 @@ -// Copyright (c) 2019, Frappe Technologies and contributors -// For license information, please see license.txt - -frappe.ui.form.on("Event Sync Log", { - refresh: function (frm) { - if (frm.doc.status == "Failed") { - frm.add_custom_button(__("Resync"), function () { - frappe.call({ - method: "frappe.event_streaming.doctype.event_producer.event_producer.resync", - args: { - update: frm.doc, - }, - callback: function (r) { - if (r.message) { - frappe.msgprint(r.message); - frm.set_value("status", r.message); - frm.save(); - } - }, - }); - }); - } - }, -}); diff --git a/frappe/event_streaming/doctype/event_sync_log/event_sync_log.json b/frappe/event_streaming/doctype/event_sync_log/event_sync_log.json deleted file mode 100644 index f82128bd7b..0000000000 --- a/frappe/event_streaming/doctype/event_sync_log/event_sync_log.json +++ /dev/null @@ -1,137 +0,0 @@ -{ - "creation": "2019-09-24 22:22:05.845089", - "doctype": "DocType", - "editable_grid": 1, - "engine": "InnoDB", - "field_order": [ - "update_type", - "ref_doctype", - "docname", - "column_break_4", - "status", - "event_producer", - "producer_doc", - "event_configurations_section", - "use_same_name", - "column_break_9", - "mapping", - "section_break_8", - "data", - "error" - ], - "fields": [ - { - "fieldname": "update_type", - "fieldtype": "Select", - "in_list_view": 1, - "label": "Update Type", - "options": "Create\nUpdate\nDelete", - "read_only": 1 - }, - { - "fieldname": "ref_doctype", - "fieldtype": "Link", - "label": "Doctype", - "options": "DocType", - "read_only": 1 - }, - { - "fieldname": "docname", - "fieldtype": "Data", - "in_list_view": 1, - "label": "Document Name", - "options": "ref_doctype", - "read_only": 1 - }, - { - "fieldname": "column_break_4", - "fieldtype": "Column Break" - }, - { - "fieldname": "status", - "fieldtype": "Select", - "in_list_view": 1, - "label": "Status", - "options": "\nSynced\nFailed", - "read_only": 1 - }, - { - "fieldname": "event_producer", - "fieldtype": "Data", - "in_list_view": 1, - "label": "Event Producer", - "options": "Event Producer", - "read_only": 1 - }, - { - "fieldname": "section_break_8", - "fieldtype": "Section Break", - "label": "Data" - }, - { - "fieldname": "data", - "fieldtype": "Code", - "label": "Data", - "read_only": 1 - }, - { - "fieldname": "producer_doc", - "fieldtype": "Data", - "label": "Producer Document Name", - "read_only": 1 - }, - { - "depends_on": "eval:doc.status=='Failed'", - "fieldname": "error", - "fieldtype": "Code", - "label": "Error", - "read_only": 1 - }, - { - "fieldname": "event_configurations_section", - "fieldtype": "Section Break", - "label": "Event Configurations" - }, - { - "default": "0", - "fieldname": "use_same_name", - "fieldtype": "Data", - "label": "Use Same Name", - "read_only": 1 - }, - { - "fieldname": "column_break_9", - "fieldtype": "Column Break" - }, - { - "fieldname": "mapping", - "fieldtype": "Data", - "label": "Mapping", - "read_only": 1 - } - ], - "in_create": 1, - "modified": "2019-10-07 13:22:10.401479", - "modified_by": "Administrator", - "module": "Event Streaming", - "name": "Event Sync Log", - "owner": "Administrator", - "permissions": [ - { - "create": 1, - "delete": 1, - "email": 1, - "export": 1, - "print": 1, - "read": 1, - "report": 1, - "role": "System Manager", - "share": 1, - "write": 1 - } - ], - "quick_entry": 1, - "sort_field": "modified", - "sort_order": "DESC", - "track_changes": 1 -} \ No newline at end of file diff --git a/frappe/event_streaming/doctype/event_sync_log/event_sync_log.py b/frappe/event_streaming/doctype/event_sync_log/event_sync_log.py deleted file mode 100644 index a1d82ad08f..0000000000 --- a/frappe/event_streaming/doctype/event_sync_log/event_sync_log.py +++ /dev/null @@ -1,9 +0,0 @@ -# Copyright (c) 2019, Frappe Technologies and contributors -# License: MIT. See LICENSE - -# import frappe -from frappe.model.document import Document - - -class EventSyncLog(Document): - pass diff --git a/frappe/event_streaming/doctype/event_sync_log/event_sync_log_list.js b/frappe/event_streaming/doctype/event_sync_log/event_sync_log_list.js deleted file mode 100644 index 97d2ee0a1d..0000000000 --- a/frappe/event_streaming/doctype/event_sync_log/event_sync_log_list.js +++ /dev/null @@ -1,9 +0,0 @@ -frappe.listview_settings["Event Sync Log"] = { - get_indicator: function (doc) { - var colors = { - Failed: "red", - Synced: "green", - }; - return [__(doc.status), colors[doc.status], "status,=," + doc.status]; - }, -}; diff --git a/frappe/event_streaming/doctype/event_sync_log/test_event_sync_log.py b/frappe/event_streaming/doctype/event_sync_log/test_event_sync_log.py deleted file mode 100644 index 5efc030026..0000000000 --- a/frappe/event_streaming/doctype/event_sync_log/test_event_sync_log.py +++ /dev/null @@ -1,8 +0,0 @@ -# Copyright (c) 2019, Frappe Technologies and Contributors -# License: MIT. See LICENSE -# import frappe -from frappe.tests.utils import FrappeTestCase - - -class TestEventSyncLog(FrappeTestCase): - pass diff --git a/frappe/event_streaming/doctype/event_update_log/__init__.py b/frappe/event_streaming/doctype/event_update_log/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/frappe/event_streaming/doctype/event_update_log/event_update_log.js b/frappe/event_streaming/doctype/event_update_log/event_update_log.js deleted file mode 100644 index d901799780..0000000000 --- a/frappe/event_streaming/doctype/event_update_log/event_update_log.js +++ /dev/null @@ -1,7 +0,0 @@ -// Copyright (c) 2019, Frappe Technologies Pvt. Ltd. and contributors -// For license information, please see license.txt - -frappe.ui.form.on("Event Update Log", { - // refresh: function(frm) { - // } -}); diff --git a/frappe/event_streaming/doctype/event_update_log/event_update_log.json b/frappe/event_streaming/doctype/event_update_log/event_update_log.json deleted file mode 100644 index a42bc7ec87..0000000000 --- a/frappe/event_streaming/doctype/event_update_log/event_update_log.json +++ /dev/null @@ -1,77 +0,0 @@ -{ - "actions": [], - "creation": "2019-07-30 15:31:26.352527", - "doctype": "DocType", - "editable_grid": 1, - "engine": "InnoDB", - "field_order": [ - "update_type", - "ref_doctype", - "docname", - "data", - "consumers" - ], - "fields": [ - { - "fieldname": "update_type", - "fieldtype": "Select", - "in_list_view": 1, - "label": "Update Type", - "options": "Create\nUpdate\nDelete", - "read_only": 1 - }, - { - "fieldname": "ref_doctype", - "fieldtype": "Link", - "in_list_view": 1, - "label": "DocType", - "options": "DocType", - "read_only": 1 - }, - { - "fieldname": "docname", - "fieldtype": "Data", - "in_list_view": 1, - "label": "Document Name", - "read_only": 1 - }, - { - "fieldname": "data", - "fieldtype": "Code", - "label": "Data", - "read_only": 1 - }, - { - "fieldname": "consumers", - "fieldtype": "Table MultiSelect", - "label": "Consumers", - "options": "Event Update Log Consumer", - "read_only": 1 - } - ], - "in_create": 1, - "links": [], - "modified": "2020-09-04 07:31:52.599804", - "modified_by": "Administrator", - "module": "Event Streaming", - "name": "Event Update Log", - "owner": "Administrator", - "permissions": [ - { - "create": 1, - "delete": 1, - "email": 1, - "export": 1, - "print": 1, - "read": 1, - "report": 1, - "role": "System Manager", - "share": 1, - "write": 1 - } - ], - "quick_entry": 1, - "sort_field": "modified", - "sort_order": "DESC", - "track_changes": 1 -} \ No newline at end of file diff --git a/frappe/event_streaming/doctype/event_update_log/event_update_log.py b/frappe/event_streaming/doctype/event_update_log/event_update_log.py deleted file mode 100644 index e40f600484..0000000000 --- a/frappe/event_streaming/doctype/event_update_log/event_update_log.py +++ /dev/null @@ -1,296 +0,0 @@ -# Copyright (c) 2019, Frappe Technologies Pvt. Ltd. and contributors -# License: MIT. See LICENSE - -import frappe -from frappe.model import no_value_fields, table_fields -from frappe.model.document import Document -from frappe.utils.background_jobs import get_jobs - - -class EventUpdateLog(Document): - def after_insert(self): - """Send update notification updates to event consumers - whenever update log is generated""" - enqueued_method = ( - "frappe.event_streaming.doctype.event_consumer.event_consumer.notify_event_consumers" - ) - jobs = get_jobs() - if not jobs or enqueued_method not in jobs[frappe.local.site]: - frappe.enqueue( - enqueued_method, doctype=self.ref_doctype, queue="long", enqueue_after_commit=True - ) - - -def notify_consumers(doc, event): - """called via hooks""" - # make event update log for doctypes having event consumers - if frappe.flags.in_install or frappe.flags.in_migrate: - return - - consumers = check_doctype_has_consumers(doc.doctype) - if consumers: - if event == "after_insert": - doc.flags.event_update_log = make_event_update_log(doc, update_type="Create") - elif event == "on_trash": - make_event_update_log(doc, update_type="Delete") - else: - # on_update - # called after saving - if not doc.flags.event_update_log: # if not already inserted - diff = get_update(doc.get_doc_before_save(), doc) - if diff: - doc.diff = diff - make_event_update_log(doc, update_type="Update") - - -def check_doctype_has_consumers(doctype): - """Check if doctype has event consumers for event streaming""" - return frappe.cache_manager.get_doctype_map( - "Event Consumer Document Type", - doctype, - dict(ref_doctype=doctype, status="Approved", unsubscribed=0), - ) - - -def get_update(old, new, for_child=False): - """ - Get document objects with updates only - If there is a change, then returns a dict like: - { - "changed" : {fieldname1: new_value1, fieldname2: new_value2, }, - "added" : {table_fieldname1: [{row_dict1}, {row_dict2}], }, - "removed" : {table_fieldname1: [row_name1, row_name2], }, - "row_changed" : {table_fieldname1: - { - child_fieldname1: new_val, - child_fieldname2: new_val - }, - }, - } - """ - if not new: - return None - - out = frappe._dict(changed={}, added={}, removed={}, row_changed={}) - for df in new.meta.fields: - if df.fieldtype in no_value_fields and df.fieldtype not in table_fields: - continue - - old_value, new_value = old.get(df.fieldname), new.get(df.fieldname) - - if df.fieldtype in table_fields: - old_row_by_name, new_row_by_name = make_maps(old_value, new_value) - out = check_for_additions(out, df, new_value, old_row_by_name) - out = check_for_deletions(out, df, old_value, new_row_by_name) - - elif old_value != new_value: - out.changed[df.fieldname] = new_value - - out = check_docstatus(out, old, new, for_child) - if any((out.changed, out.added, out.removed, out.row_changed)): - return out - return None - - -def make_event_update_log(doc, update_type): - """Save update info for doctypes that have event consumers""" - if update_type != "Delete": - # diff for update type, doc for create type - data = frappe.as_json(doc) if not doc.get("diff") else frappe.as_json(doc.diff) - else: - data = None - return frappe.get_doc( - { - "doctype": "Event Update Log", - "update_type": update_type, - "ref_doctype": doc.doctype, - "docname": doc.name, - "data": data, - } - ).insert(ignore_permissions=True) - - -def make_maps(old_value, new_value): - """make maps""" - old_row_by_name, new_row_by_name = {}, {} - for d in old_value: - old_row_by_name[d.name] = d - for d in new_value: - new_row_by_name[d.name] = d - return old_row_by_name, new_row_by_name - - -def check_for_additions(out, df, new_value, old_row_by_name): - """check rows for additions, changes""" - for _i, d in enumerate(new_value): - if d.name in old_row_by_name: - diff = get_update(old_row_by_name[d.name], d, for_child=True) - if diff and diff.changed: - if not out.row_changed.get(df.fieldname): - out.row_changed[df.fieldname] = [] - diff.changed["name"] = d.name - out.row_changed[df.fieldname].append(diff.changed) - else: - if not out.added.get(df.fieldname): - out.added[df.fieldname] = [] - out.added[df.fieldname].append(d.as_dict()) - return out - - -def check_for_deletions(out, df, old_value, new_row_by_name): - """check for deletions""" - for d in old_value: - if d.name not in new_row_by_name: - if not out.removed.get(df.fieldname): - out.removed[df.fieldname] = [] - out.removed[df.fieldname].append(d.name) - return out - - -def check_docstatus(out, old, new, for_child): - """docstatus changes""" - if not for_child and old.docstatus != new.docstatus: - out.changed["docstatus"] = new.docstatus - return out - - -def is_consumer_uptodate(update_log, consumer): - """ - Checks if Consumer has read all the UpdateLogs before the specified update_log - :param update_log: The UpdateLog Doc in context - :param consumer: The EventConsumer doc - """ - if update_log.update_type == "Create": - # consumer is obviously up to date - return True - - prev_logs = frappe.get_all( - "Event Update Log", - filters={ - "ref_doctype": update_log.ref_doctype, - "docname": update_log.docname, - "creation": ["<", update_log.creation], - }, - order_by="creation desc", - limit_page_length=1, - ) - - if not len(prev_logs): - return False - - prev_log_consumers = frappe.get_all( - "Event Update Log Consumer", - fields=["consumer"], - filters={ - "parent": prev_logs[0].name, - "parenttype": "Event Update Log", - "consumer": consumer.name, - }, - ) - - return len(prev_log_consumers) > 0 - - -def mark_consumer_read(update_log_name, consumer_name): - """ - This function appends the Consumer to the list of Consumers that has 'read' an Update Log - """ - update_log = frappe.get_doc("Event Update Log", update_log_name) - if len([x for x in update_log.consumers if x.consumer == consumer_name]): - return - - frappe.get_doc( - frappe._dict( - doctype="Event Update Log Consumer", - consumer=consumer_name, - parent=update_log_name, - parenttype="Event Update Log", - parentfield="consumers", - ) - ).insert(ignore_permissions=True) - - -def get_unread_update_logs(consumer_name, dt, dn): - """ - Get old logs unread by the consumer on a particular document - """ - already_consumed = [ - x[0] - for x in frappe.db.sql( - """ - SELECT - update_log.name - FROM `tabEvent Update Log` update_log - JOIN `tabEvent Update Log Consumer` consumer ON consumer.parent = %(log_name)s - WHERE - consumer.consumer = %(consumer)s - AND update_log.ref_doctype = %(dt)s - AND update_log.docname = %(dn)s - """, - { - "consumer": consumer_name, - "dt": dt, - "dn": dn, - "log_name": "update_log.name" - if frappe.conf.db_type == "mariadb" - else "CAST(update_log.name AS VARCHAR)", - }, - as_dict=0, - ) - ] - - logs = frappe.get_all( - "Event Update Log", - fields=["update_type", "ref_doctype", "docname", "data", "name", "creation"], - filters={"ref_doctype": dt, "docname": dn, "name": ["not in", already_consumed]}, - order_by="creation", - ) - - return logs - - -@frappe.whitelist() -def get_update_logs_for_consumer(event_consumer, doctypes, last_update): - """ - Fetches all the UpdateLogs for the consumer - It will inject old un-consumed Update Logs if a doc was just found to be accessible to the Consumer - """ - - if isinstance(doctypes, str): - doctypes = frappe.parse_json(doctypes) - - from frappe.event_streaming.doctype.event_consumer.event_consumer import has_consumer_access - - consumer = frappe.get_doc("Event Consumer", event_consumer) - docs = frappe.get_list( - doctype="Event Update Log", - filters={"ref_doctype": ("in", doctypes), "creation": (">", last_update)}, - fields=["update_type", "ref_doctype", "docname", "data", "name", "creation"], - order_by="creation desc", - ) - - result = [] - to_update_history = [] - for d in docs: - if (d.ref_doctype, d.docname) in to_update_history: - # will be notified by background jobs - continue - - if not has_consumer_access(consumer=consumer, update_log=d): - continue - - if not is_consumer_uptodate(d, consumer): - to_update_history.append((d.ref_doctype, d.docname)) - # get_unread_update_logs will have the current log - old_logs = get_unread_update_logs(consumer.name, d.ref_doctype, d.docname) - if old_logs: - old_logs.reverse() - result.extend(old_logs) - else: - result.append(d) - - for d in result: - mark_consumer_read(update_log_name=d.name, consumer_name=consumer.name) - - result.reverse() - return result diff --git a/frappe/event_streaming/doctype/event_update_log/test_event_update_log.py b/frappe/event_streaming/doctype/event_update_log/test_event_update_log.py deleted file mode 100644 index a9065ab4ed..0000000000 --- a/frappe/event_streaming/doctype/event_update_log/test_event_update_log.py +++ /dev/null @@ -1,8 +0,0 @@ -# Copyright (c) 2019, Frappe Technologies Pvt. Ltd. and Contributors -# License: MIT. See LICENSE -# import frappe -from frappe.tests.utils import FrappeTestCase - - -class TestEventUpdateLog(FrappeTestCase): - pass diff --git a/frappe/event_streaming/doctype/event_update_log_consumer/__init__.py b/frappe/event_streaming/doctype/event_update_log_consumer/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/frappe/event_streaming/doctype/event_update_log_consumer/event_update_log_consumer.json b/frappe/event_streaming/doctype/event_update_log_consumer/event_update_log_consumer.json deleted file mode 100644 index b3484c6481..0000000000 --- a/frappe/event_streaming/doctype/event_update_log_consumer/event_update_log_consumer.json +++ /dev/null @@ -1,32 +0,0 @@ -{ - "actions": [], - "creation": "2020-06-30 10:54:53.301787", - "doctype": "DocType", - "editable_grid": 1, - "engine": "InnoDB", - "field_order": [ - "consumer" - ], - "fields": [ - { - "fieldname": "consumer", - "fieldtype": "Link", - "in_list_view": 1, - "label": "Consumer", - "options": "Event Consumer", - "reqd": 1 - } - ], - "istable": 1, - "links": [], - "modified": "2020-06-30 10:54:53.301787", - "modified_by": "Administrator", - "module": "Event Streaming", - "name": "Event Update Log Consumer", - "owner": "Administrator", - "permissions": [], - "quick_entry": 1, - "sort_field": "modified", - "sort_order": "DESC", - "track_changes": 1 -} \ No newline at end of file diff --git a/frappe/event_streaming/doctype/event_update_log_consumer/event_update_log_consumer.py b/frappe/event_streaming/doctype/event_update_log_consumer/event_update_log_consumer.py deleted file mode 100644 index 69da7db92e..0000000000 --- a/frappe/event_streaming/doctype/event_update_log_consumer/event_update_log_consumer.py +++ /dev/null @@ -1,9 +0,0 @@ -# Copyright (c) 2020, Frappe Technologies and contributors -# License: MIT. See LICENSE - -# import frappe -from frappe.model.document import Document - - -class EventUpdateLogConsumer(Document): - pass