diff --git a/.github/helper/consumer_db/mariadb.json b/.github/helper/db/mariadb.json
similarity index 92%
rename from .github/helper/consumer_db/mariadb.json
rename to .github/helper/db/mariadb.json
index 2e32157e1a..8bb654da66 100644
--- a/.github/helper/consumer_db/mariadb.json
+++ b/.github/helper/db/mariadb.json
@@ -1,7 +1,7 @@
{
"db_host": "127.0.0.1",
"db_port": 3306,
- "db_name": "test_frappe_consumer",
+ "db_name": "test_frappe",
"db_password": "test_frappe",
"allow_tests": true,
"db_type": "mariadb",
diff --git a/.github/helper/consumer_db/postgres.json b/.github/helper/db/postgres.json
similarity index 92%
rename from .github/helper/consumer_db/postgres.json
rename to .github/helper/db/postgres.json
index 9532670029..6ca83b9e96 100644
--- a/.github/helper/consumer_db/postgres.json
+++ b/.github/helper/db/postgres.json
@@ -1,7 +1,7 @@
{
"db_host": "127.0.0.1",
"db_port": 5432,
- "db_name": "test_frappe_consumer",
+ "db_name": "test_frappe",
"db_password": "test_frappe",
"db_type": "postgres",
"allow_tests": true,
diff --git a/.github/helper/install.sh b/.github/helper/install.sh
index 1514236ecb..39880e35e7 100644
--- a/.github/helper/install.sh
+++ b/.github/helper/install.sh
@@ -17,37 +17,23 @@ fi
echo "Setting Up Sites & Database..."
mkdir ~/frappe-bench/sites/test_site
-cp "${GITHUB_WORKSPACE}/.github/helper/consumer_db/$DB.json" ~/frappe-bench/sites/test_site/site_config.json
-
-if [ "$TYPE" == "server" ]
-then
- mkdir ~/frappe-bench/sites/test_site_producer
- cp "${GITHUB_WORKSPACE}/.github/helper/producer_db/$DB.json" ~/frappe-bench/sites/test_site_producer/site_config.json
-fi
+cp "${GITHUB_WORKSPACE}/.github/helper/db/$DB.json" ~/frappe-bench/sites/test_site/site_config.json
if [ "$DB" == "mariadb" ]
then
- mariadb --host 127.0.0.1 --port 3306 -u root -ptravis -e "SET GLOBAL character_set_server = 'utf8mb4'"
- mariadb --host 127.0.0.1 --port 3306 -u root -ptravis -e "SET GLOBAL collation_server = 'utf8mb4_unicode_ci'"
+ mariadb --host 127.0.0.1 --port 3306 -u root -ptravis -e "SET GLOBAL character_set_server = 'utf8mb4'";
+ mariadb --host 127.0.0.1 --port 3306 -u root -ptravis -e "SET GLOBAL collation_server = 'utf8mb4_unicode_ci'";
- mariadb --host 127.0.0.1 --port 3306 -u root -ptravis -e "CREATE DATABASE test_frappe_consumer"
- mariadb --host 127.0.0.1 --port 3306 -u root -ptravis -e "CREATE USER 'test_frappe_consumer'@'localhost' IDENTIFIED BY 'test_frappe_consumer'"
- mariadb --host 127.0.0.1 --port 3306 -u root -ptravis -e "GRANT ALL PRIVILEGES ON \`test_frappe_consumer\`.* TO 'test_frappe_consumer'@'localhost'"
+ mariadb --host 127.0.0.1 --port 3306 -u root -ptravis -e "CREATE DATABASE test_frappe";
+ mariadb --host 127.0.0.1 --port 3306 -u root -ptravis -e "CREATE USER 'test_frappe'@'localhost' IDENTIFIED BY 'test_frappe'";
+ mariadb --host 127.0.0.1 --port 3306 -u root -ptravis -e "GRANT ALL PRIVILEGES ON \`test_frappe\`.* TO 'test_frappe'@'localhost'";
- mariadb --host 127.0.0.1 --port 3306 -u root -ptravis -e "CREATE DATABASE test_frappe_producer"
- mariadb --host 127.0.0.1 --port 3306 -u root -ptravis -e "CREATE USER 'test_frappe_producer'@'localhost' IDENTIFIED BY 'test_frappe_producer'"
- mariadb --host 127.0.0.1 --port 3306 -u root -ptravis -e "GRANT ALL PRIVILEGES ON \`test_frappe_producer\`.* TO 'test_frappe_producer'@'localhost'"
-
- mariadb --host 127.0.0.1 --port 3306 -u root -ptravis -e "FLUSH PRIVILEGES"
+ mariadb --host 127.0.0.1 --port 3306 -u root -ptravis -e "FLUSH PRIVILEGES";
fi
-
if [ "$DB" == "postgres" ]
then
- echo "travis" | psql -h 127.0.0.1 -p 5432 -c "CREATE DATABASE test_frappe_consumer" -U postgres
- echo "travis" | psql -h 127.0.0.1 -p 5432 -c "CREATE USER test_frappe_consumer WITH PASSWORD 'test_frappe'" -U postgres
-
- echo "travis" | psql -h 127.0.0.1 -p 5432 -c "CREATE DATABASE test_frappe_producer" -U postgres
- echo "travis" | psql -h 127.0.0.1 -p 5432 -c "CREATE USER test_frappe_producer WITH PASSWORD 'test_frappe'" -U postgres
+ echo "travis" | psql -h 127.0.0.1 -p 5432 -c "CREATE DATABASE test_frappe" -U postgres;
+ echo "travis" | psql -h 127.0.0.1 -p 5432 -c "CREATE USER test_frappe WITH PASSWORD 'test_frappe'" -U postgres;
fi
echo "Setting Up Procfile..."
@@ -78,11 +64,6 @@ fi
bench --site test_site reinstall --yes
-if [ "$TYPE" == "server" ]
-then
- bench --site test_site_producer reinstall --yes
-fi
-
if [ "$TYPE" == "server" ]
then
# wait till assets are built succesfully
diff --git a/.github/helper/producer_db/mariadb.json b/.github/helper/producer_db/mariadb.json
deleted file mode 100644
index c1db0d765f..0000000000
--- a/.github/helper/producer_db/mariadb.json
+++ /dev/null
@@ -1,16 +0,0 @@
-{
- "db_host": "127.0.0.1",
- "db_port": 3306,
- "db_name": "test_frappe_producer",
- "db_password": "test_frappe",
- "allow_tests": true,
- "db_type": "mariadb",
- "auto_email_id": "test@example.com",
- "mail_server": "smtp.example.com",
- "mail_login": "test@example.com",
- "mail_password": "test",
- "admin_password": "admin",
- "root_login": "root",
- "root_password": "travis",
- "host_name": "http://test_site_producer:8000"
-}
diff --git a/.github/helper/producer_db/postgres.json b/.github/helper/producer_db/postgres.json
deleted file mode 100644
index 8b9d2a20fd..0000000000
--- a/.github/helper/producer_db/postgres.json
+++ /dev/null
@@ -1,16 +0,0 @@
-{
- "db_host": "127.0.0.1",
- "db_port": 5432,
- "db_name": "test_frappe_producer",
- "db_password": "test_frappe",
- "db_type": "postgres",
- "allow_tests": true,
- "auto_email_id": "test@example.com",
- "mail_server": "smtp.example.com",
- "mail_login": "test@example.com",
- "mail_password": "test",
- "admin_password": "admin",
- "root_login": "postgres",
- "root_password": "travis",
- "host_name": "http://test_site_producer:8000"
-}
diff --git a/.github/workflows/server-mariadb-tests.yml b/.github/workflows/server-mariadb-tests.yml
index f6b6575a1f..1eab828ffb 100644
--- a/.github/workflows/server-mariadb-tests.yml
+++ b/.github/workflows/server-mariadb-tests.yml
@@ -82,7 +82,6 @@ jobs:
- name: Add to Hosts
run: |
echo "127.0.0.1 test_site" | sudo tee -a /etc/hosts
- echo "127.0.0.1 test_site_producer" | sudo tee -a /etc/hosts
- name: Cache pip
uses: actions/cache@v3
diff --git a/.github/workflows/server-postgres-tests.yml b/.github/workflows/server-postgres-tests.yml
index e0bb471d39..300f888de6 100644
--- a/.github/workflows/server-postgres-tests.yml
+++ b/.github/workflows/server-postgres-tests.yml
@@ -84,7 +84,6 @@ jobs:
- name: Add to Hosts
run: |
echo "127.0.0.1 test_site" | sudo tee -a /etc/hosts
- echo "127.0.0.1 test_site_producer" | sudo tee -a /etc/hosts
- name: Cache pip
uses: actions/cache@v3
diff --git a/.github/workflows/ui-tests.yml b/.github/workflows/ui-tests.yml
index cacb5ce833..6aeec5b425 100644
--- a/.github/workflows/ui-tests.yml
+++ b/.github/workflows/ui-tests.yml
@@ -82,7 +82,6 @@ jobs:
- name: Add to Hosts
run: |
echo "127.0.0.1 test_site" | sudo tee -a /etc/hosts
- echo "127.0.0.1 test_site_producer" | sudo tee -a /etc/hosts
- name: Cache pip
uses: actions/cache@v3
diff --git a/CODEOWNERS b/CODEOWNERS
index aff03e2082..861016710a 100644
--- a/CODEOWNERS
+++ b/CODEOWNERS
@@ -7,7 +7,6 @@
templates/ @surajshetty3416
www/ @surajshetty3416
patches/ @surajshetty3416
-event_streaming/ @ruchamahabal
data_import* @netchampfaris
core/ @surajshetty3416
workspace @shariquerik
diff --git a/frappe/automation/workspace/tools/tools.json b/frappe/automation/workspace/tools/tools.json
index 40b265b34f..9962e27f55 100644
--- a/frappe/automation/workspace/tools/tools.json
+++ b/frappe/automation/workspace/tools/tools.json
@@ -1,6 +1,6 @@
{
"charts": [],
- "content": "[{\"type\":\"header\",\"data\":{\"text\":\"Your Shortcuts\",\"col\":12}},{\"type\":\"shortcut\",\"data\":{\"shortcut_name\":\"ToDo\",\"col\":3}},{\"type\":\"shortcut\",\"data\":{\"shortcut_name\":\"Note\",\"col\":3}},{\"type\":\"shortcut\",\"data\":{\"shortcut_name\":\"File\",\"col\":3}},{\"type\":\"shortcut\",\"data\":{\"shortcut_name\":\"Assignment Rule\",\"col\":3}},{\"type\":\"shortcut\",\"data\":{\"shortcut_name\":\"Auto Repeat\",\"col\":3}},{\"type\":\"spacer\",\"data\":{\"col\":12}},{\"type\":\"header\",\"data\":{\"text\":\"Reports & Masters\",\"col\":12}},{\"type\":\"card\",\"data\":{\"card_name\":\"Tools\",\"col\":4}},{\"type\":\"card\",\"data\":{\"card_name\":\"Email\",\"col\":4}},{\"type\":\"card\",\"data\":{\"card_name\":\"Automation\",\"col\":4}},{\"type\":\"card\",\"data\":{\"card_name\":\"Event Streaming\",\"col\":4}}]",
+ "content": "[{\"type\":\"header\",\"data\":{\"text\":\"Your Shortcuts\",\"col\":12}},{\"type\":\"shortcut\",\"data\":{\"shortcut_name\":\"ToDo\",\"col\":3}},{\"type\":\"shortcut\",\"data\":{\"shortcut_name\":\"Note\",\"col\":3}},{\"type\":\"shortcut\",\"data\":{\"shortcut_name\":\"File\",\"col\":3}},{\"type\":\"shortcut\",\"data\":{\"shortcut_name\":\"Assignment Rule\",\"col\":3}},{\"type\":\"shortcut\",\"data\":{\"shortcut_name\":\"Auto Repeat\",\"col\":3}},{\"type\":\"spacer\",\"data\":{\"col\":12}},{\"type\":\"header\",\"data\":{\"text\":\"Reports & Masters\",\"col\":12}},{\"type\":\"card\",\"data\":{\"card_name\":\"Tools\",\"col\":4}},{\"type\":\"card\",\"data\":{\"card_name\":\"Email\",\"col\":4}},{\"type\":\"card\",\"data\":{\"card_name\":\"Automation\",\"col\":4}}]",
"creation": "2020-03-02 14:53:24.980279",
"docstatus": 0,
"doctype": "Workspace",
@@ -107,7 +107,7 @@
"hidden": 0,
"is_query_report": 0,
"label": "Automation",
- "link_count": 0,
+ "link_count": 3,
"onboard": 0,
"type": "Card Break"
},
@@ -143,78 +143,16 @@
"link_type": "DocType",
"onboard": 0,
"type": "Link"
- },
- {
- "hidden": 0,
- "is_query_report": 0,
- "label": "Event Streaming",
- "link_count": 0,
- "onboard": 0,
- "type": "Card Break"
- },
- {
- "dependencies": "",
- "hidden": 0,
- "is_query_report": 0,
- "label": "Event Producer",
- "link_count": 0,
- "link_to": "Event Producer",
- "link_type": "DocType",
- "onboard": 0,
- "type": "Link"
- },
- {
- "dependencies": "",
- "hidden": 0,
- "is_query_report": 0,
- "label": "Event Consumer",
- "link_count": 0,
- "link_to": "Event Consumer",
- "link_type": "DocType",
- "onboard": 0,
- "type": "Link"
- },
- {
- "dependencies": "",
- "hidden": 0,
- "is_query_report": 0,
- "label": "Event Update Log",
- "link_count": 0,
- "link_to": "Event Update Log",
- "link_type": "DocType",
- "onboard": 0,
- "type": "Link"
- },
- {
- "dependencies": "",
- "hidden": 0,
- "is_query_report": 0,
- "label": "Event Sync Log",
- "link_count": 0,
- "link_to": "Event Sync Log",
- "link_type": "DocType",
- "onboard": 0,
- "type": "Link"
- },
- {
- "dependencies": "",
- "hidden": 0,
- "is_query_report": 0,
- "label": "Document Type Mapping",
- "link_count": 0,
- "link_to": "Document Type Mapping",
- "link_type": "DocType",
- "onboard": 0,
- "type": "Link"
}
],
- "modified": "2022-01-13 17:48:48.456763",
+ "modified": "2022-08-23 14:42:58.364898",
"modified_by": "Administrator",
"module": "Automation",
"name": "Tools",
"owner": "Administrator",
"parent_page": "",
"public": 1,
+ "quick_lists": [],
"restrict_to_domain": "",
"roles": [],
"sequence_id": 26.0,
diff --git a/frappe/cache_manager.py b/frappe/cache_manager.py
index 868329ec1e..942adfa190 100644
--- a/frappe/cache_manager.py
+++ b/frappe/cache_manager.py
@@ -12,7 +12,6 @@ doctype_map_keys = (
"energy_point_rule_map",
"assignment_rule_map",
"milestone_tracker_map",
- "event_consumer_document_type_map",
)
bench_cache_keys = ("assets_json",)
diff --git a/frappe/event_streaming/__init__.py b/frappe/event_streaming/__init__.py
deleted file mode 100644
index e69de29bb2..0000000000
diff --git a/frappe/event_streaming/doctype/__init__.py b/frappe/event_streaming/doctype/__init__.py
deleted file mode 100644
index e69de29bb2..0000000000
diff --git a/frappe/event_streaming/doctype/document_type_field_mapping/__init__.py b/frappe/event_streaming/doctype/document_type_field_mapping/__init__.py
deleted file mode 100644
index e69de29bb2..0000000000
diff --git a/frappe/event_streaming/doctype/document_type_field_mapping/document_type_field_mapping.json b/frappe/event_streaming/doctype/document_type_field_mapping/document_type_field_mapping.json
deleted file mode 100644
index bba0a98237..0000000000
--- a/frappe/event_streaming/doctype/document_type_field_mapping/document_type_field_mapping.json
+++ /dev/null
@@ -1,73 +0,0 @@
-{
- "actions": [],
- "creation": "2019-09-27 12:46:50.165135",
- "doctype": "DocType",
- "editable_grid": 1,
- "engine": "InnoDB",
- "field_order": [
- "local_fieldname",
- "mapping_type",
- "mapping",
- "remote_value_filters",
- "column_break_5",
- "remote_fieldname",
- "default_value"
- ],
- "fields": [
- {
- "fieldname": "remote_fieldname",
- "fieldtype": "Data",
- "in_list_view": 1,
- "label": "Remote Fieldname"
- },
- {
- "fieldname": "local_fieldname",
- "fieldtype": "Data",
- "in_list_view": 1,
- "label": "Local Fieldname",
- "reqd": 1
- },
- {
- "fieldname": "column_break_5",
- "fieldtype": "Column Break"
- },
- {
- "fieldname": "default_value",
- "fieldtype": "Data",
- "label": "Default Value"
- },
- {
- "fieldname": "mapping_type",
- "fieldtype": "Select",
- "label": "Mapping Type",
- "options": "\nChild Table\nDocument"
- },
- {
- "depends_on": "eval:doc.mapping_type;",
- "fieldname": "mapping",
- "fieldtype": "Link",
- "label": "Mapping",
- "options": "Document Type Mapping"
- },
- {
- "depends_on": "eval:doc.mapping_type==\"Document\";",
- "fieldname": "remote_value_filters",
- "fieldtype": "Code",
- "label": "Remote Value Filters",
- "mandatory_depends_on": "eval:doc.mapping_type===\"Document\";",
- "options": "JSON"
- }
- ],
- "istable": 1,
- "links": [],
- "modified": "2020-03-19 13:56:36.223799",
- "modified_by": "Administrator",
- "module": "Event Streaming",
- "name": "Document Type Field Mapping",
- "owner": "Administrator",
- "permissions": [],
- "quick_entry": 1,
- "sort_field": "modified",
- "sort_order": "DESC",
- "track_changes": 1
-}
\ No newline at end of file
diff --git a/frappe/event_streaming/doctype/document_type_field_mapping/document_type_field_mapping.py b/frappe/event_streaming/doctype/document_type_field_mapping/document_type_field_mapping.py
deleted file mode 100644
index 96d9e0fcb3..0000000000
--- a/frappe/event_streaming/doctype/document_type_field_mapping/document_type_field_mapping.py
+++ /dev/null
@@ -1,9 +0,0 @@
-# Copyright (c) 2019, Frappe Technologies and contributors
-# License: MIT. See LICENSE
-
-# import frappe
-from frappe.model.document import Document
-
-
-class DocumentTypeFieldMapping(Document):
- pass
diff --git a/frappe/event_streaming/doctype/document_type_mapping/__init__.py b/frappe/event_streaming/doctype/document_type_mapping/__init__.py
deleted file mode 100644
index e69de29bb2..0000000000
diff --git a/frappe/event_streaming/doctype/document_type_mapping/document_type_mapping.js b/frappe/event_streaming/doctype/document_type_mapping/document_type_mapping.js
deleted file mode 100644
index ad9ab0f51d..0000000000
--- a/frappe/event_streaming/doctype/document_type_mapping/document_type_mapping.js
+++ /dev/null
@@ -1,37 +0,0 @@
-// Copyright (c) 2019, Frappe Technologies and contributors
-// For license information, please see license.txt
-
-frappe.ui.form.on("Document Type Mapping", {
- local_doctype: function (frm) {
- if (frm.doc.local_doctype) {
- frappe.model.clear_table(frm.doc, "field_mapping");
- let fields = frm.events.get_fields(frm);
- $.each(fields, function (i, data) {
- let row = frappe.model.add_child(
- frm.doc,
- "Document Type Field Mapping",
- "field_mapping"
- );
- row.local_fieldname = data;
- });
- refresh_field("field_mapping");
- }
- },
-
- get_fields: function (frm) {
- let filtered_fields = [];
- frappe.model.with_doctype(frm.doc.local_doctype, () => {
- frappe.get_meta(frm.doc.local_doctype).fields.map((field) => {
- if (
- field.fieldname !== "remote_docname" &&
- field.fieldname !== "remote_site_name" &&
- frappe.model.is_value_type(field) &&
- !field.hidden
- ) {
- filtered_fields.push(field.fieldname);
- }
- });
- });
- return filtered_fields;
- },
-});
diff --git a/frappe/event_streaming/doctype/document_type_mapping/document_type_mapping.json b/frappe/event_streaming/doctype/document_type_mapping/document_type_mapping.json
deleted file mode 100644
index 6a59cf3b70..0000000000
--- a/frappe/event_streaming/doctype/document_type_mapping/document_type_mapping.json
+++ /dev/null
@@ -1,71 +0,0 @@
-{
- "autoname": "field:mapping_name",
- "creation": "2019-09-27 12:45:56.529124",
- "doctype": "DocType",
- "editable_grid": 1,
- "engine": "InnoDB",
- "field_order": [
- "mapping_name",
- "local_doctype",
- "remote_doctype",
- "section_break_3",
- "field_mapping"
- ],
- "fields": [
- {
- "fieldname": "local_doctype",
- "fieldtype": "Link",
- "in_list_view": 1,
- "label": "Local Document Type",
- "options": "DocType",
- "reqd": 1
- },
- {
- "fieldname": "remote_doctype",
- "fieldtype": "Data",
- "in_list_view": 1,
- "label": "Remote Document Type",
- "reqd": 1
- },
- {
- "fieldname": "section_break_3",
- "fieldtype": "Section Break"
- },
- {
- "fieldname": "field_mapping",
- "fieldtype": "Table",
- "label": "Field Mapping",
- "options": "Document Type Field Mapping"
- },
- {
- "fieldname": "mapping_name",
- "fieldtype": "Data",
- "label": "Mapping Name",
- "reqd": 1,
- "unique": 1
- }
- ],
- "modified": "2019-10-09 08:36:04.621397",
- "modified_by": "Administrator",
- "module": "Event Streaming",
- "name": "Document Type Mapping",
- "owner": "Administrator",
- "permissions": [
- {
- "create": 1,
- "delete": 1,
- "email": 1,
- "export": 1,
- "print": 1,
- "read": 1,
- "report": 1,
- "role": "System Manager",
- "share": 1,
- "write": 1
- }
- ],
- "quick_entry": 1,
- "sort_field": "modified",
- "sort_order": "DESC",
- "track_changes": 1
-}
\ No newline at end of file
diff --git a/frappe/event_streaming/doctype/document_type_mapping/document_type_mapping.py b/frappe/event_streaming/doctype/document_type_mapping/document_type_mapping.py
deleted file mode 100644
index 04b5015296..0000000000
--- a/frappe/event_streaming/doctype/document_type_mapping/document_type_mapping.py
+++ /dev/null
@@ -1,181 +0,0 @@
-# Copyright (c) 2019, Frappe Technologies and contributors
-# License: MIT. See LICENSE
-import json
-
-import frappe
-from frappe import _
-from frappe.model import child_table_fields, default_fields
-from frappe.model.document import Document
-
-
-class DocumentTypeMapping(Document):
- def validate(self):
- self.validate_inner_mapping()
-
- def validate_inner_mapping(self):
- meta = frappe.get_meta(self.local_doctype)
- for field_map in self.field_mapping:
- if field_map.local_fieldname not in (default_fields + child_table_fields):
- field = meta.get_field(field_map.local_fieldname)
- if not field:
- frappe.throw(_("Row #{0}: Invalid Local Fieldname").format(field_map.idx))
-
- fieldtype = field.get("fieldtype")
- if fieldtype in ["Link", "Dynamic Link", "Table"]:
- if not field_map.mapping and not field_map.default_value:
- msg = _(
- "Row #{0}: Please set Mapping or Default Value for the field {1} since its a dependency field"
- ).format(field_map.idx, frappe.bold(field_map.local_fieldname))
- frappe.throw(msg, title="Inner Mapping Missing")
-
- if field_map.mapping_type == "Document" and not field_map.remote_value_filters:
- msg = _(
- "Row #{0}: Please set remote value filters for the field {1} to fetch the unique remote dependency document"
- ).format(field_map.idx, frappe.bold(field_map.remote_fieldname))
- frappe.throw(msg, title="Remote Value Filters Missing")
-
- def get_mapping(self, doc, producer_site, update_type):
- remote_fields = []
- # list of tuples (local_fieldname, dependent_doc)
- dependencies = []
-
- for mapping in self.field_mapping:
- if doc.get(mapping.remote_fieldname):
- if mapping.mapping_type == "Document":
- if not mapping.default_value:
- dependency = self.get_mapped_dependency(mapping, producer_site, doc)
- if dependency:
- dependencies.append((mapping.local_fieldname, dependency))
- else:
- doc[mapping.local_fieldname] = mapping.default_value
-
- if mapping.mapping_type == "Child Table" and update_type != "Update":
- doc[mapping.local_fieldname] = get_mapped_child_table_docs(
- mapping.mapping, doc[mapping.remote_fieldname], producer_site
- )
- else:
- # copy value into local fieldname key and remove remote fieldname key
- doc[mapping.local_fieldname] = doc[mapping.remote_fieldname]
-
- if mapping.local_fieldname != mapping.remote_fieldname:
- remote_fields.append(mapping.remote_fieldname)
-
- if not doc.get(mapping.remote_fieldname) and mapping.default_value and update_type != "Update":
- doc[mapping.local_fieldname] = mapping.default_value
-
- # remove the remote fieldnames
- for field in remote_fields:
- doc.pop(field, None)
-
- if update_type != "Update":
- doc["doctype"] = self.local_doctype
-
- mapping = {"doc": frappe.as_json(doc)}
- if len(dependencies):
- mapping["dependencies"] = dependencies
- return mapping
-
- def get_mapped_update(self, update, producer_site):
- update_diff = frappe._dict(json.loads(update.data))
- mapping = update_diff
- dependencies = []
- if update_diff.changed:
- doc_map = self.get_mapping(update_diff.changed, producer_site, "Update")
- mapped_doc = doc_map.get("doc")
- mapping.changed = json.loads(mapped_doc)
- if doc_map.get("dependencies"):
- dependencies += doc_map.get("dependencies")
-
- if update_diff.removed:
- mapping = self.map_rows_removed(update_diff, mapping)
- if update_diff.added:
- mapping = self.map_rows(update_diff, mapping, producer_site, operation="added")
- if update_diff.row_changed:
- mapping = self.map_rows(update_diff, mapping, producer_site, operation="row_changed")
-
- update = {"doc": frappe.as_json(mapping)}
- if len(dependencies):
- update["dependencies"] = dependencies
- return update
-
- def get_mapped_dependency(self, mapping, producer_site, doc):
- inner_mapping = frappe.get_doc("Document Type Mapping", mapping.mapping)
- filters = json.loads(mapping.remote_value_filters)
- for key, value in filters.items():
- if value.startswith("eval:"):
- val = frappe.safe_eval(value[5:], None, dict(doc=doc))
- filters[key] = val
- if doc.get(value):
- filters[key] = doc.get(value)
- matching_docs = producer_site.get_doc(inner_mapping.remote_doctype, filters=filters)
- if len(matching_docs):
- remote_docname = matching_docs[0].get("name")
- remote_doc = producer_site.get_doc(inner_mapping.remote_doctype, remote_docname)
- doc = inner_mapping.get_mapping(remote_doc, producer_site, "Insert").get("doc")
- return doc
- return
-
- def map_rows_removed(self, update_diff, mapping):
- removed = []
- mapping["removed"] = update_diff.removed
- for key, value in update_diff.removed.copy().items():
- local_table_name = frappe.db.get_value(
- "Document Type Field Mapping",
- {"remote_fieldname": key, "parent": self.name},
- "local_fieldname",
- )
- mapping.removed[local_table_name] = value
- if local_table_name != key:
- removed.append(key)
-
- # remove the remote fieldnames
- for field in removed:
- mapping.removed.pop(field, None)
- return mapping
-
- def map_rows(self, update_diff, mapping, producer_site, operation):
- remote_fields = []
- for tablename, entries in update_diff.get(operation).copy().items():
- local_table_name = frappe.db.get_value(
- "Document Type Field Mapping", {"remote_fieldname": tablename}, "local_fieldname"
- )
- table_map = frappe.db.get_value(
- "Document Type Field Mapping",
- {"local_fieldname": local_table_name, "parent": self.name},
- "mapping",
- )
- table_map = frappe.get_doc("Document Type Mapping", table_map)
- docs = []
- for entry in entries:
- mapped_doc = table_map.get_mapping(entry, producer_site, "Update").get("doc")
- docs.append(json.loads(mapped_doc))
- mapping.get(operation)[local_table_name] = docs
- if local_table_name != tablename:
- remote_fields.append(tablename)
-
- # remove the remote fieldnames
- for field in remote_fields:
- mapping.get(operation).pop(field, None)
-
- return mapping
-
-
-def get_mapped_child_table_docs(child_map, table_entries, producer_site):
- """Get mapping for child doctypes"""
- child_map = frappe.get_doc("Document Type Mapping", child_map)
- mapped_entries = []
- remote_fields = []
- for child_doc in table_entries:
- for mapping in child_map.field_mapping:
- if child_doc.get(mapping.remote_fieldname):
- child_doc[mapping.local_fieldname] = child_doc[mapping.remote_fieldname]
- if mapping.local_fieldname != mapping.remote_fieldname:
- child_doc.pop(mapping.remote_fieldname, None)
- mapped_entries.append(child_doc)
-
- # remove the remote fieldnames
- for field in remote_fields:
- child_doc.pop(field, None)
-
- child_doc["doctype"] = child_map.local_doctype
- return mapped_entries
diff --git a/frappe/event_streaming/doctype/document_type_mapping/test_document_type_mapping.py b/frappe/event_streaming/doctype/document_type_mapping/test_document_type_mapping.py
deleted file mode 100644
index defaa8b9c6..0000000000
--- a/frappe/event_streaming/doctype/document_type_mapping/test_document_type_mapping.py
+++ /dev/null
@@ -1,8 +0,0 @@
-# Copyright (c) 2019, Frappe Technologies and Contributors
-# License: MIT. See LICENSE
-# import frappe
-from frappe.tests.utils import FrappeTestCase
-
-
-class TestDocumentTypeMapping(FrappeTestCase):
- pass
diff --git a/frappe/event_streaming/doctype/event_consumer/__init__.py b/frappe/event_streaming/doctype/event_consumer/__init__.py
deleted file mode 100644
index e69de29bb2..0000000000
diff --git a/frappe/event_streaming/doctype/event_consumer/event_consumer.js b/frappe/event_streaming/doctype/event_consumer/event_consumer.js
deleted file mode 100644
index 2bcf96f9f3..0000000000
--- a/frappe/event_streaming/doctype/event_consumer/event_consumer.js
+++ /dev/null
@@ -1,17 +0,0 @@
-// Copyright (c) 2019, Frappe Technologies and contributors
-// For license information, please see license.txt
-
-frappe.ui.form.on("Event Consumer", {
- refresh: function (frm) {
- // formatter for subscribed doctype approval status
- frm.set_indicator_formatter("status", function (doc) {
- let indicator = "orange";
- if (doc.status == "Approved") {
- indicator = "green";
- } else if (doc.status == "Rejected") {
- indicator = "red";
- }
- return indicator;
- });
- },
-});
diff --git a/frappe/event_streaming/doctype/event_consumer/event_consumer.json b/frappe/event_streaming/doctype/event_consumer/event_consumer.json
deleted file mode 100644
index 42b47ce949..0000000000
--- a/frappe/event_streaming/doctype/event_consumer/event_consumer.json
+++ /dev/null
@@ -1,97 +0,0 @@
-{
- "actions": [],
- "autoname": "field:callback_url",
- "creation": "2019-08-26 17:45:15.479530",
- "doctype": "DocType",
- "editable_grid": 1,
- "engine": "InnoDB",
- "field_order": [
- "consumer_doctypes",
- "callback_url",
- "section_break_3",
- "api_key",
- "api_secret",
- "column_break_6",
- "user",
- "incoming_change"
- ],
- "fields": [
- {
- "fieldname": "callback_url",
- "fieldtype": "Data",
- "in_list_view": 1,
- "label": "Callback URL",
- "read_only": 1,
- "reqd": 1,
- "unique": 1
- },
- {
- "fieldname": "api_key",
- "fieldtype": "Data",
- "label": "API Key",
- "reqd": 1
- },
- {
- "fieldname": "api_secret",
- "fieldtype": "Password",
- "label": "API Secret",
- "reqd": 1
- },
- {
- "fieldname": "user",
- "fieldtype": "Link",
- "label": "Event Subscriber",
- "options": "User",
- "read_only": 1,
- "reqd": 1
- },
- {
- "fieldname": "section_break_3",
- "fieldtype": "Section Break"
- },
- {
- "fieldname": "column_break_6",
- "fieldtype": "Column Break"
- },
- {
- "default": "0",
- "fieldname": "incoming_change",
- "fieldtype": "Check",
- "hidden": 1,
- "label": "Incoming Change",
- "read_only": 1
- },
- {
- "fieldname": "consumer_doctypes",
- "fieldtype": "Table",
- "label": "Event Consumer Document Types",
- "options": "Event Consumer Document Type",
- "reqd": 1
- }
- ],
- "in_create": 1,
- "links": [],
- "modified": "2020-09-08 16:42:39.828085",
- "modified_by": "Administrator",
- "module": "Event Streaming",
- "name": "Event Consumer",
- "owner": "Administrator",
- "permissions": [
- {
- "create": 1,
- "delete": 1,
- "email": 1,
- "export": 1,
- "print": 1,
- "read": 1,
- "report": 1,
- "role": "System Manager",
- "share": 1,
- "write": 1
- }
- ],
- "quick_entry": 1,
- "sort_field": "modified",
- "sort_order": "DESC",
- "track_changes": 1
-}
\ No newline at end of file
diff --git a/frappe/event_streaming/doctype/event_consumer/event_consumer.py b/frappe/event_streaming/doctype/event_consumer/event_consumer.py
deleted file mode 100644
index a2ae6f6651..0000000000
--- a/frappe/event_streaming/doctype/event_consumer/event_consumer.py
+++ /dev/null
@@ -1,216 +0,0 @@
-# Copyright (c) 2019, Frappe Technologies and contributors
-# License: MIT. See LICENSE
-
-import json
-import os
-
-import requests
-
-import frappe
-from frappe import _
-from frappe.frappeclient import FrappeClient
-from frappe.model.document import Document
-from frappe.utils.background_jobs import get_jobs
-from frappe.utils.data import get_url
-
-
-class EventConsumer(Document):
- def validate(self):
- # approve subscribed doctypes for tests
- # frappe.flags.in_test won't work here as tests are running on the consumer site
- if os.environ.get("CI"):
- for entry in self.consumer_doctypes:
- entry.status = "Approved"
-
- def on_update(self):
- if not self.incoming_change:
- doc_before_save = self.get_doc_before_save()
- if doc_before_save.api_key != self.api_key or doc_before_save.api_secret != self.api_secret:
- return
-
- self.update_consumer_status()
- else:
- frappe.db.set_value(self.doctype, self.name, "incoming_change", 0)
-
- frappe.cache().delete_value("event_consumer_document_type_map")
-
- def on_trash(self):
- for i in frappe.get_all("Event Update Log Consumer", {"consumer": self.name}):
- frappe.delete_doc("Event Update Log Consumer", i.name)
- frappe.cache().delete_value("event_consumer_document_type_map")
-
- def update_consumer_status(self):
- consumer_site = get_consumer_site(self.callback_url)
- event_producer = consumer_site.get_doc("Event Producer", get_url())
- event_producer = frappe._dict(event_producer)
- config = event_producer.producer_doctypes
- event_producer.producer_doctypes = []
- for entry in config:
- if entry.get("has_mapping"):
- ref_doctype = consumer_site.get_value(
- "Document Type Mapping", "remote_doctype", entry.get("mapping")
- ).get("remote_doctype")
- else:
- ref_doctype = entry.get("ref_doctype")
-
- entry["status"] = frappe.db.get_value(
- "Event Consumer Document Type", {"parent": self.name, "ref_doctype": ref_doctype}, "status"
- )
-
- event_producer.producer_doctypes = config
- # when producer doc is updated it updates the consumer doc
- # set flag to avoid deadlock
- event_producer.incoming_change = True
- consumer_site.update(event_producer)
-
- def get_consumer_status(self):
- response = requests.get(self.callback_url)
- if response.status_code != 200:
- return "offline"
- return "online"
-
-
-@frappe.whitelist()
-def register_consumer(data):
- """create an event consumer document for registering a consumer"""
- data = json.loads(data)
- # to ensure that consumer is created only once
- if frappe.db.exists("Event Consumer", data["event_consumer"]):
- return None
-
- user = data["user"]
- if not frappe.db.exists("User", user):
- frappe.throw(_("User {0} not found on the producer site").format(user))
-
- if "System Manager" not in frappe.get_roles(user):
- frappe.throw(_("Event Subscriber has to be a System Manager."))
-
- consumer = frappe.new_doc("Event Consumer")
- consumer.callback_url = data["event_consumer"]
- consumer.user = data["user"]
- consumer.api_key = data["api_key"]
- consumer.api_secret = data["api_secret"]
- consumer.incoming_change = True
- consumer_doctypes = json.loads(data["consumer_doctypes"])
-
- for entry in consumer_doctypes:
- consumer.append(
- "consumer_doctypes",
- {"ref_doctype": entry.get("doctype"), "status": "Pending", "condition": entry.get("condition")},
- )
-
- consumer.insert()
-
- # consumer's 'last_update' field should point to the latest update
- # in producer's update log when subscribing
- # so that, updates after subscribing are consumed and not the old ones.
- last_update = str(get_last_update())
- return json.dumps({"last_update": last_update})
-
-
-def get_consumer_site(consumer_url):
- """create a FrappeClient object for event consumer site"""
- consumer_doc = frappe.get_doc("Event Consumer", consumer_url)
- consumer_site = FrappeClient(
- url=consumer_url,
- api_key=consumer_doc.api_key,
- api_secret=consumer_doc.get_password("api_secret"),
- )
- return consumer_site
-
-
-def get_last_update():
- """get the creation timestamp of last update consumed"""
- updates = frappe.get_list(
- "Event Update Log", "creation", ignore_permissions=True, limit=1, order_by="creation desc"
- )
- if updates:
- return updates[0].creation
- return frappe.utils.now_datetime()
-
-
-@frappe.whitelist()
-def notify_event_consumers(doctype):
- """get all event consumers and set flag for notification status"""
- event_consumers = frappe.get_all(
- "Event Consumer Document Type", ["parent"], {"ref_doctype": doctype, "status": "Approved"}
- )
- for entry in event_consumers:
- consumer = frappe.get_doc("Event Consumer", entry.parent)
- consumer.flags.notified = False
- notify(consumer)
-
-
-@frappe.whitelist()
-def notify(consumer):
- """notify individual event consumers about a new update"""
- consumer_status = consumer.get_consumer_status()
- if consumer_status == "online":
- try:
- client = get_consumer_site(consumer.callback_url)
- client.post_request(
- {
- "cmd": "frappe.event_streaming.doctype.event_producer.event_producer.new_event_notification",
- "producer_url": get_url(),
- }
- )
- consumer.flags.notified = True
- except Exception:
- consumer.flags.notified = False
- else:
- consumer.flags.notified = False
-
- # enqueue another job if the site was not notified
- if not consumer.flags.notified:
- enqueued_method = "frappe.event_streaming.doctype.event_consumer.event_consumer.notify"
- jobs = get_jobs()
- if not jobs or enqueued_method not in jobs[frappe.local.site] and not consumer.flags.notifed:
- frappe.enqueue(
- enqueued_method, queue="long", enqueue_after_commit=True, **{"consumer": consumer}
- )
-
-
-def has_consumer_access(consumer, update_log):
- """Checks if consumer has completely satisfied all the conditions on the doc"""
-
- if isinstance(consumer, str):
- consumer = frappe.get_doc("Event Consumer", consumer)
-
- if not frappe.db.exists(update_log.ref_doctype, update_log.docname):
- # Delete Log
- # Check if the last Update Log of this document was read by this consumer
- last_update_log = frappe.get_all(
- "Event Update Log",
- filters={
- "ref_doctype": update_log.ref_doctype,
- "docname": update_log.docname,
- "creation": ["<", update_log.creation],
- },
- order_by="creation desc",
- limit_page_length=1,
- )
- if not len(last_update_log):
- return False
-
- last_update_log = frappe.get_doc("Event Update Log", last_update_log[0].name)
- return len([x for x in last_update_log.consumers if x.consumer == consumer.name])
-
- doc = frappe.get_doc(update_log.ref_doctype, update_log.docname)
- try:
- for dt_entry in consumer.consumer_doctypes:
- if dt_entry.ref_doctype != update_log.ref_doctype:
- continue
-
- if not dt_entry.condition:
- return True
-
- condition: str = dt_entry.condition
- if condition.startswith("cmd:"):
- cmd = condition.split("cmd:")[1].strip()
- args = {"consumer": consumer, "doc": doc, "update_log": update_log}
- return frappe.call(cmd, **args)
- else:
- return frappe.safe_eval(condition, frappe._dict(doc=doc))
- except Exception as e:
- consumer.log_error("has_consumer_access error")
- return False
diff --git a/frappe/event_streaming/doctype/event_consumer/test_event_consumer.py b/frappe/event_streaming/doctype/event_consumer/test_event_consumer.py
deleted file mode 100644
index 54bf718f17..0000000000
--- a/frappe/event_streaming/doctype/event_consumer/test_event_consumer.py
+++ /dev/null
@@ -1,8 +0,0 @@
-# Copyright (c) 2019, Frappe Technologies and Contributors
-# License: MIT. See LICENSE
-# import frappe
-from frappe.tests.utils import FrappeTestCase
-
-
-class TestEventConsumer(FrappeTestCase):
- pass
diff --git a/frappe/event_streaming/doctype/event_consumer_document_type/__init__.py b/frappe/event_streaming/doctype/event_consumer_document_type/__init__.py
deleted file mode 100644
index e69de29bb2..0000000000
diff --git a/frappe/event_streaming/doctype/event_consumer_document_type/event_consumer_document_type.json b/frappe/event_streaming/doctype/event_consumer_document_type/event_consumer_document_type.json
deleted file mode 100644
index c243334a09..0000000000
--- a/frappe/event_streaming/doctype/event_consumer_document_type/event_consumer_document_type.json
+++ /dev/null
@@ -1,61 +0,0 @@
-{
- "actions": [],
- "creation": "2019-10-03 21:10:54.754651",
- "doctype": "DocType",
- "editable_grid": 1,
- "engine": "InnoDB",
- "field_order": [
- "ref_doctype",
- "status",
- "unsubscribed",
- "condition"
- ],
- "fields": [
- {
- "columns": 4,
- "fieldname": "ref_doctype",
- "fieldtype": "Link",
- "in_list_view": 1,
- "label": "Document Type",
- "options": "DocType",
- "read_only": 1,
- "reqd": 1
- },
- {
- "columns": 4,
- "default": "Pending",
- "fieldname": "status",
- "fieldtype": "Select",
- "in_list_view": 1,
- "label": "Approval Status",
- "options": "Pending\nApproved\nRejected"
- },
- {
- "columns": 2,
- "default": "0",
- "fieldname": "unsubscribed",
- "fieldtype": "Check",
- "in_list_view": 1,
- "label": "Unsubscribed",
- "read_only": 1
- },
- {
- "fieldname": "condition",
- "fieldtype": "Code",
- "label": "Condition",
- "read_only": 1
- }
- ],
- "istable": 1,
- "links": [],
- "modified": "2020-11-07 09:26:49.894294",
- "modified_by": "Administrator",
- "module": "Event Streaming",
- "name": "Event Consumer Document Type",
- "owner": "Administrator",
- "permissions": [],
- "quick_entry": 1,
- "sort_field": "modified",
- "sort_order": "DESC",
- "track_changes": 1
-}
\ No newline at end of file
diff --git a/frappe/event_streaming/doctype/event_consumer_document_type/event_consumer_document_type.py b/frappe/event_streaming/doctype/event_consumer_document_type/event_consumer_document_type.py
deleted file mode 100644
index 1ed15c5a75..0000000000
--- a/frappe/event_streaming/doctype/event_consumer_document_type/event_consumer_document_type.py
+++ /dev/null
@@ -1,9 +0,0 @@
-# Copyright (c) 2019, Frappe Technologies and contributors
-# License: MIT. See LICENSE
-
-# import frappe
-from frappe.model.document import Document
-
-
-class EventConsumerDocumentType(Document):
- pass
diff --git a/frappe/event_streaming/doctype/event_producer/__init__.py b/frappe/event_streaming/doctype/event_producer/__init__.py
deleted file mode 100644
index e69de29bb2..0000000000
diff --git a/frappe/event_streaming/doctype/event_producer/event_producer.js b/frappe/event_streaming/doctype/event_producer/event_producer.js
deleted file mode 100644
index 23ca482433..0000000000
--- a/frappe/event_streaming/doctype/event_producer/event_producer.js
+++ /dev/null
@@ -1,25 +0,0 @@
-// Copyright (c) 2019, Frappe Technologies and contributors
-// For license information, please see license.txt
-
-frappe.ui.form.on("Event Producer", {
- refresh: function (frm) {
- frm.set_query("ref_doctype", "producer_doctypes", function () {
- return {
- filters: {
- issingle: 0,
- istable: 0,
- },
- };
- });
-
- frm.set_indicator_formatter("status", function (doc) {
- let indicator = "orange";
- if (doc.status == "Approved") {
- indicator = "green";
- } else if (doc.status == "Rejected") {
- indicator = "red";
- }
- return indicator;
- });
- },
-});
diff --git a/frappe/event_streaming/doctype/event_producer/event_producer.json b/frappe/event_streaming/doctype/event_producer/event_producer.json
deleted file mode 100644
index d868f6c123..0000000000
--- a/frappe/event_streaming/doctype/event_producer/event_producer.json
+++ /dev/null
@@ -1,96 +0,0 @@
-{
- "actions": [],
- "autoname": "field:producer_url",
- "creation": "2019-08-26 19:17:24.919196",
- "doctype": "DocType",
- "editable_grid": 1,
- "engine": "InnoDB",
- "field_order": [
- "producer_url",
- "producer_doctypes",
- "section_break_3",
- "api_key",
- "api_secret",
- "column_break_6",
- "user",
- "incoming_change"
- ],
- "fields": [
- {
- "fieldname": "producer_url",
- "fieldtype": "Data",
- "in_list_view": 1,
- "label": "Producer URL",
- "reqd": 1,
- "unique": 1
- },
- {
- "description": "API Key of the user(Event Subscriber) on the producer site",
- "fieldname": "api_key",
- "fieldtype": "Data",
- "label": "API Key",
- "reqd": 1
- },
- {
- "description": "API Secret of the user(Event Subscriber) on the producer site",
- "fieldname": "api_secret",
- "fieldtype": "Password",
- "label": "API Secret",
- "reqd": 1
- },
- {
- "fieldname": "user",
- "fieldtype": "Link",
- "label": "Event Subscriber",
- "options": "User",
- "reqd": 1,
- "set_only_once": 1
- },
- {
- "fieldname": "column_break_6",
- "fieldtype": "Column Break"
- },
- {
- "fieldname": "section_break_3",
- "fieldtype": "Section Break"
- },
- {
- "default": "0",
- "fieldname": "incoming_change",
- "fieldtype": "Check",
- "hidden": 1,
- "label": "Incoming Change"
- },
- {
- "fieldname": "producer_doctypes",
- "fieldtype": "Table",
- "label": "Event Producer Document Types",
- "options": "Event Producer Document Type",
- "reqd": 1
- }
- ],
- "links": [],
- "modified": "2020-10-26 13:00:15.361316",
- "modified_by": "Administrator",
- "module": "Event Streaming",
- "name": "Event Producer",
- "owner": "Administrator",
- "permissions": [
- {
- "create": 1,
- "delete": 1,
- "email": 1,
- "export": 1,
- "print": 1,
- "read": 1,
- "report": 1,
- "role": "System Manager",
- "share": 1,
- "write": 1
- }
- ],
- "quick_entry": 1,
- "sort_field": "modified",
- "sort_order": "DESC",
- "track_changes": 1
-}
\ No newline at end of file
diff --git a/frappe/event_streaming/doctype/event_producer/event_producer.py b/frappe/event_streaming/doctype/event_producer/event_producer.py
deleted file mode 100644
index f91c8a4fd4..0000000000
--- a/frappe/event_streaming/doctype/event_producer/event_producer.py
+++ /dev/null
@@ -1,569 +0,0 @@
-# Copyright (c) 2019, Frappe Technologies and contributors
-# License: MIT. See LICENSE
-
-import json
-import time
-
-import requests
-
-import frappe
-from frappe import _
-from frappe.custom.doctype.custom_field.custom_field import create_custom_field
-from frappe.frappeclient import FrappeClient
-from frappe.model.document import Document
-from frappe.utils.background_jobs import get_jobs
-from frappe.utils.data import get_link_to_form, get_url
-from frappe.utils.password import get_decrypted_password
-
-
-class EventProducer(Document):
- def before_insert(self):
- self.check_url()
- self.validate_event_subscriber()
- self.incoming_change = True
- self.create_event_consumer()
- self.create_custom_fields()
-
- def validate(self):
- self.validate_event_subscriber()
- if frappe.flags.in_test:
- for entry in self.producer_doctypes:
- entry.status = "Approved"
-
- def validate_event_subscriber(self):
- if not frappe.db.get_value("User", self.user, "api_key"):
- frappe.throw(
- _("Please generate keys for the Event Subscriber User {0} first.").format(
- frappe.bold(get_link_to_form("User", self.user))
- )
- )
-
- def on_update(self):
- if not self.incoming_change:
- if frappe.db.exists("Event Producer", self.name):
- if not self.api_key or not self.api_secret:
- frappe.throw(_("Please set API Key and Secret on the producer and consumer sites first."))
- else:
- doc_before_save = self.get_doc_before_save()
- if doc_before_save.api_key != self.api_key or doc_before_save.api_secret != self.api_secret:
- return
-
- self.update_event_consumer()
- self.create_custom_fields()
- else:
- # when producer doc is updated it updates the consumer doc, set flag to avoid deadlock
- self.db_set("incoming_change", 0)
- self.reload()
-
- def on_trash(self):
- last_update = frappe.db.get_value("Event Producer Last Update", dict(event_producer=self.name))
- if last_update:
- frappe.delete_doc("Event Producer Last Update", last_update)
-
- def check_url(self):
- valid_url_schemes = ("http", "https")
- frappe.utils.validate_url(self.producer_url, throw=True, valid_schemes=valid_url_schemes)
-
- # remove '/' from the end of the url like http://test_site.com/
- # to prevent mismatch in get_url() results
- if self.producer_url.endswith("/"):
- self.producer_url = self.producer_url[:-1]
-
- def create_event_consumer(self):
- """register event consumer on the producer site"""
- if self.is_producer_online():
- producer_site = FrappeClient(
- url=self.producer_url, api_key=self.api_key, api_secret=self.get_password("api_secret")
- )
-
- response = producer_site.post_api(
- "frappe.event_streaming.doctype.event_consumer.event_consumer.register_consumer",
- params={"data": json.dumps(self.get_request_data())},
- )
- if response:
- response = json.loads(response)
- self.set_last_update(response["last_update"])
- else:
- frappe.throw(
- _(
- "Failed to create an Event Consumer or an Event Consumer for the current site is already registered."
- )
- )
-
- def set_last_update(self, last_update):
- last_update_doc_name = frappe.db.get_value(
- "Event Producer Last Update", dict(event_producer=self.name)
- )
- if not last_update_doc_name:
- frappe.get_doc(
- dict(
- doctype="Event Producer Last Update",
- event_producer=self.producer_url,
- last_update=last_update,
- )
- ).insert(ignore_permissions=True)
- else:
- frappe.db.set_value(
- "Event Producer Last Update", last_update_doc_name, "last_update", last_update
- )
-
- def get_last_update(self):
- return frappe.db.get_value(
- "Event Producer Last Update", dict(event_producer=self.name), "last_update"
- )
-
- def get_request_data(self):
- consumer_doctypes = []
- for entry in self.producer_doctypes:
- if entry.has_mapping:
- # if mapping, subscribe to remote doctype on consumer's site
- dt = frappe.db.get_value("Document Type Mapping", entry.mapping, "remote_doctype")
- else:
- dt = entry.ref_doctype
- consumer_doctypes.append({"doctype": dt, "condition": entry.condition})
-
- user_key = frappe.db.get_value("User", self.user, "api_key")
- user_secret = get_decrypted_password("User", self.user, "api_secret")
- return {
- "event_consumer": get_url(),
- "consumer_doctypes": json.dumps(consumer_doctypes),
- "user": self.user,
- "api_key": user_key,
- "api_secret": user_secret,
- }
-
- def create_custom_fields(self):
- """create custom field to store remote docname and remote site url"""
- for entry in self.producer_doctypes:
- if not entry.use_same_name:
- if not frappe.db.exists(
- "Custom Field", {"fieldname": "remote_docname", "dt": entry.ref_doctype}
- ):
- df = dict(
- fieldname="remote_docname",
- label="Remote Document Name",
- fieldtype="Data",
- read_only=1,
- print_hide=1,
- )
- create_custom_field(entry.ref_doctype, df)
- if not frappe.db.exists(
- "Custom Field", {"fieldname": "remote_site_name", "dt": entry.ref_doctype}
- ):
- df = dict(
- fieldname="remote_site_name",
- label="Remote Site",
- fieldtype="Data",
- read_only=1,
- print_hide=1,
- )
- create_custom_field(entry.ref_doctype, df)
-
- def update_event_consumer(self):
- if self.is_producer_online():
- producer_site = get_producer_site(self.producer_url)
- event_consumer = producer_site.get_doc("Event Consumer", get_url())
- event_consumer = frappe._dict(event_consumer)
- if event_consumer:
- config = event_consumer.consumer_doctypes
- event_consumer.consumer_doctypes = []
- for entry in self.producer_doctypes:
- if entry.has_mapping:
- # if mapping, subscribe to remote doctype on consumer's site
- ref_doctype = frappe.db.get_value("Document Type Mapping", entry.mapping, "remote_doctype")
- else:
- ref_doctype = entry.ref_doctype
-
- event_consumer.consumer_doctypes.append(
- {
- "ref_doctype": ref_doctype,
- "status": get_approval_status(config, ref_doctype),
- "unsubscribed": entry.unsubscribe,
- "condition": entry.condition,
- }
- )
- event_consumer.user = self.user
- event_consumer.incoming_change = True
- producer_site.update(event_consumer)
-
- def is_producer_online(self):
- """check connection status for the Event Producer site"""
- retry = 3
- while retry > 0:
- res = requests.get(self.producer_url)
- if res.status_code == 200:
- return True
- retry -= 1
- time.sleep(5)
- frappe.throw(_("Failed to connect to the Event Producer site. Retry after some time."))
-
-
-def get_producer_site(producer_url):
- """create a FrappeClient object for event producer site"""
- producer_doc = frappe.get_doc("Event Producer", producer_url)
- producer_site = FrappeClient(
- url=producer_url,
- api_key=producer_doc.api_key,
- api_secret=producer_doc.get_password("api_secret"),
- )
- return producer_site
-
-
-def get_approval_status(config, ref_doctype):
- """check the approval status for consumption"""
- for entry in config:
- if entry.get("ref_doctype") == ref_doctype:
- return entry.get("status")
- return "Pending"
-
-
-@frappe.whitelist()
-def pull_producer_data():
- """Fetch data from producer node."""
- response = requests.get(get_url())
- if response.status_code == 200:
- for event_producer in frappe.get_all("Event Producer"):
- pull_from_node(event_producer.name)
- return "success"
- return None
-
-
-@frappe.whitelist()
-def pull_from_node(event_producer):
- """pull all updates after the last update timestamp from event producer site"""
- event_producer = frappe.get_doc("Event Producer", event_producer)
- producer_site = get_producer_site(event_producer.producer_url)
- last_update = event_producer.get_last_update()
-
- (doctypes, mapping_config, naming_config) = get_config(event_producer.producer_doctypes)
-
- updates = get_updates(producer_site, last_update, doctypes)
-
- for update in updates:
- update.use_same_name = naming_config.get(update.ref_doctype)
- mapping = mapping_config.get(update.ref_doctype)
- if mapping:
- update.mapping = mapping
- update = get_mapped_update(update, producer_site)
- if not update.update_type == "Delete":
- update.data = json.loads(update.data)
-
- sync(update, producer_site, event_producer)
-
-
-def get_config(event_config):
- """get the doctype mapping and naming configurations for consumption"""
- doctypes, mapping_config, naming_config = [], {}, {}
-
- for entry in event_config:
- if entry.status == "Approved":
- if entry.has_mapping:
- (mapped_doctype, mapping) = frappe.db.get_value(
- "Document Type Mapping", entry.mapping, ["remote_doctype", "name"]
- )
- mapping_config[mapped_doctype] = mapping
- naming_config[mapped_doctype] = entry.use_same_name
- doctypes.append(mapped_doctype)
- else:
- naming_config[entry.ref_doctype] = entry.use_same_name
- doctypes.append(entry.ref_doctype)
- return (doctypes, mapping_config, naming_config)
-
-
-def sync(update, producer_site, event_producer, in_retry=False):
- """Sync the individual update"""
- try:
- if update.update_type == "Create":
- set_insert(update, producer_site, event_producer.name)
- if update.update_type == "Update":
- set_update(update, producer_site)
- if update.update_type == "Delete":
- set_delete(update)
- if in_retry:
- return "Synced"
- log_event_sync(update, event_producer.name, "Synced")
-
- except Exception:
- if in_retry:
- if frappe.flags.in_test:
- print(frappe.get_traceback())
- return "Failed"
- log_event_sync(update, event_producer.name, "Failed", frappe.get_traceback())
-
- event_producer.set_last_update(update.creation)
- frappe.db.commit()
-
-
-def set_insert(update, producer_site, event_producer):
- """Sync insert type update"""
- if frappe.db.get_value(update.ref_doctype, update.docname):
- # doc already created
- return
- doc = frappe.get_doc(update.data)
-
- if update.mapping:
- if update.get("dependencies"):
- dependencies_created = sync_mapped_dependencies(update.dependencies, producer_site)
- for fieldname, value in dependencies_created.items():
- doc.update({fieldname: value})
- else:
- sync_dependencies(doc, producer_site)
-
- if update.use_same_name:
- doc.insert(set_name=update.docname, set_child_names=False)
- else:
- # if event consumer is not saving documents with the same name as the producer
- # store the remote docname in a custom field for future updates
- doc.remote_docname = update.docname
- doc.remote_site_name = event_producer
- doc.insert(set_child_names=False)
-
-
-def set_update(update, producer_site):
- """Sync update type update"""
- local_doc = get_local_doc(update)
- if local_doc:
- data = frappe._dict(update.data)
-
- if data.changed:
- local_doc.update(data.changed)
- if data.removed:
- local_doc = update_row_removed(local_doc, data.removed)
- if data.row_changed:
- update_row_changed(local_doc, data.row_changed)
- if data.added:
- local_doc = update_row_added(local_doc, data.added)
-
- if update.mapping:
- if update.get("dependencies"):
- dependencies_created = sync_mapped_dependencies(update.dependencies, producer_site)
- for fieldname, value in dependencies_created.items():
- local_doc.update({fieldname: value})
- else:
- sync_dependencies(local_doc, producer_site)
-
- local_doc.save()
- local_doc.db_update_all()
-
-
-def update_row_removed(local_doc, removed):
- """Sync child table row deletion type update"""
- for tablename, rownames in removed.items():
- table = local_doc.get_table_field_doctype(tablename)
- for row in rownames:
- table_rows = local_doc.get(tablename)
- child_table_row = get_child_table_row(table_rows, row)
- table_rows.remove(child_table_row)
- local_doc.set(tablename, table_rows)
- return local_doc
-
-
-def get_child_table_row(table_rows, row):
- for entry in table_rows:
- if entry.get("name") == row:
- return entry
-
-
-def update_row_changed(local_doc, changed):
- """Sync child table row updation type update"""
- for tablename, rows in changed.items():
- old = local_doc.get(tablename)
- for doc in old:
- for row in rows:
- if row["name"] == doc.get("name"):
- doc.update(row)
-
-
-def update_row_added(local_doc, added):
- """Sync child table row addition type update"""
- for tablename, rows in added.items():
- local_doc.extend(tablename, rows)
- for child in rows:
- child_doc = frappe.get_doc(child)
- child_doc.parent = local_doc.name
- child_doc.parenttype = local_doc.doctype
- child_doc.insert(set_name=child_doc.name)
- return local_doc
-
-
-def set_delete(update):
- """Sync delete type update"""
- local_doc = get_local_doc(update)
- if local_doc:
- local_doc.delete()
-
-
-def get_updates(producer_site, last_update, doctypes):
- """Get all updates generated after the last update timestamp"""
- docs = producer_site.post_request(
- {
- "cmd": "frappe.event_streaming.doctype.event_update_log.event_update_log.get_update_logs_for_consumer",
- "event_consumer": get_url(),
- "doctypes": frappe.as_json(doctypes),
- "last_update": last_update,
- }
- )
- return [frappe._dict(d) for d in (docs or [])]
-
-
-def get_local_doc(update):
- """Get the local document if created with a different name"""
- try:
- if not update.use_same_name:
- return frappe.get_doc(update.ref_doctype, {"remote_docname": update.docname})
- return frappe.get_doc(update.ref_doctype, update.docname)
- except frappe.DoesNotExistError:
- return None
-
-
-def sync_dependencies(document, producer_site):
- """
- dependencies is a dictionary to store all the docs
- having dependencies and their sync status,
- which is shared among all nested functions.
- """
- dependencies = {document: True}
-
- def check_doc_has_dependencies(doc, producer_site):
- """Sync child table link fields first,
- then sync link fields,
- then dynamic links"""
- meta = frappe.get_meta(doc.doctype)
- table_fields = meta.get_table_fields()
- link_fields = meta.get_link_fields()
- dl_fields = meta.get_dynamic_link_fields()
- if table_fields:
- sync_child_table_dependencies(doc, table_fields, producer_site)
- if link_fields:
- sync_link_dependencies(doc, link_fields, producer_site)
- if dl_fields:
- sync_dynamic_link_dependencies(doc, dl_fields, producer_site)
-
- def sync_child_table_dependencies(doc, table_fields, producer_site):
- for df in table_fields:
- child_table = doc.get(df.fieldname)
- for entry in child_table:
- child_doc = producer_site.get_doc(entry.doctype, entry.name)
- if child_doc:
- child_doc = frappe._dict(child_doc)
- set_dependencies(child_doc, frappe.get_meta(entry.doctype).get_link_fields(), producer_site)
-
- def sync_link_dependencies(doc, link_fields, producer_site):
- set_dependencies(doc, link_fields, producer_site)
-
- def sync_dynamic_link_dependencies(doc, dl_fields, producer_site):
- for df in dl_fields:
- docname = doc.get(df.fieldname)
- linked_doctype = doc.get(df.options)
- if docname and not check_dependency_fulfilled(linked_doctype, docname):
- master_doc = producer_site.get_doc(linked_doctype, docname)
- frappe.get_doc(master_doc).insert(set_name=docname)
-
- def set_dependencies(doc, link_fields, producer_site):
- for df in link_fields:
- docname = doc.get(df.fieldname)
- linked_doctype = df.get_link_doctype()
- if docname and not check_dependency_fulfilled(linked_doctype, docname):
- master_doc = producer_site.get_doc(linked_doctype, docname)
- try:
- master_doc = frappe.get_doc(master_doc)
- master_doc.insert(set_name=docname)
- frappe.db.commit()
-
- # for dependency inside a dependency
- except Exception:
- dependencies[master_doc] = True
-
- def check_dependency_fulfilled(linked_doctype, docname):
- return frappe.db.exists(linked_doctype, docname)
-
- while dependencies[document]:
- # find the first non synced dependency
- for item in reversed(list(dependencies.keys())):
- if dependencies[item]:
- dependency = item
- break
-
- check_doc_has_dependencies(dependency, producer_site)
-
- # mark synced for nested dependency
- if dependency != document:
- dependencies[dependency] = False
- dependency.insert()
-
- # no more dependencies left to be synced, the main doc is ready to be synced
- # end the dependency loop
- if not any(list(dependencies.values())[1:]):
- dependencies[document] = False
-
-
-def sync_mapped_dependencies(dependencies, producer_site):
- dependencies_created = {}
- for entry in dependencies:
- doc = frappe._dict(json.loads(entry[1]))
- docname = frappe.db.exists(doc.doctype, doc.name)
- if not docname:
- doc = frappe.get_doc(doc).insert(set_child_names=False)
- dependencies_created[entry[0]] = doc.name
- else:
- dependencies_created[entry[0]] = docname
-
- return dependencies_created
-
-
-def log_event_sync(update, event_producer, sync_status, error=None):
- """Log event update received with the sync_status as Synced or Failed"""
- doc = frappe.new_doc("Event Sync Log")
- doc.update_type = update.update_type
- doc.ref_doctype = update.ref_doctype
- doc.status = sync_status
- doc.event_producer = event_producer
- doc.producer_doc = update.docname
- doc.data = frappe.as_json(update.data)
- doc.use_same_name = update.use_same_name
- doc.mapping = update.mapping if update.mapping else None
- if update.use_same_name:
- doc.docname = update.docname
- else:
- doc.docname = frappe.db.get_value(update.ref_doctype, {"remote_docname": update.docname}, "name")
- if error:
- doc.error = error
- doc.insert()
-
-
-def get_mapped_update(update, producer_site):
- """get the new update document with mapped fields"""
- mapping = frappe.get_doc("Document Type Mapping", update.mapping)
- if update.update_type == "Create":
- doc = frappe._dict(json.loads(update.data))
- mapped_update = mapping.get_mapping(doc, producer_site, update.update_type)
- update.data = mapped_update.get("doc")
- update.dependencies = mapped_update.get("dependencies", None)
- elif update.update_type == "Update":
- mapped_update = mapping.get_mapped_update(update, producer_site)
- update.data = mapped_update.get("doc")
- update.dependencies = mapped_update.get("dependencies", None)
-
- update["ref_doctype"] = mapping.local_doctype
- return update
-
-
-@frappe.whitelist()
-def new_event_notification(producer_url):
- """Pull data from producer when notified"""
- enqueued_method = "frappe.event_streaming.doctype.event_producer.event_producer.pull_from_node"
- jobs = get_jobs()
- if not jobs or enqueued_method not in jobs[frappe.local.site]:
- frappe.enqueue(enqueued_method, queue="default", **{"event_producer": producer_url})
-
-
-@frappe.whitelist()
-def resync(update):
- """Retry syncing update if failed"""
- update = frappe._dict(json.loads(update))
- producer_site = get_producer_site(update.event_producer)
- event_producer = frappe.get_doc("Event Producer", update.event_producer)
- if update.mapping:
- update = get_mapped_update(update, producer_site)
- update.data = json.loads(update.data)
- return sync(update, producer_site, event_producer, in_retry=True)
diff --git a/frappe/event_streaming/doctype/event_producer/test_event_producer.py b/frappe/event_streaming/doctype/event_producer/test_event_producer.py
deleted file mode 100644
index 70e7483f92..0000000000
--- a/frappe/event_streaming/doctype/event_producer/test_event_producer.py
+++ /dev/null
@@ -1,438 +0,0 @@
-# Copyright (c) 2019, Frappe Technologies and Contributors
-# License: MIT. See LICENSE
-import json
-
-import frappe
-from frappe.core.doctype.user.user import generate_keys
-from frappe.event_streaming.doctype.event_producer.event_producer import pull_from_node
-from frappe.frappeclient import FrappeClient
-from frappe.query_builder.utils import db_type_is
-from frappe.tests.test_query_builder import run_only_if
-from frappe.tests.utils import FrappeTestCase
-
-producer_url = "http://test_site_producer:8000"
-
-
-class TestEventProducer(FrappeTestCase):
- def setUp(self):
- create_event_producer(producer_url)
-
- def tearDown(self):
- unsubscribe_doctypes(producer_url)
-
- def test_insert(self):
- producer = get_remote_site()
- producer_doc = insert_into_producer(producer, "test creation 1 sync")
- self.pull_producer_data()
- self.assertTrue(frappe.db.exists("ToDo", producer_doc.name))
-
- def test_update(self):
- producer = get_remote_site()
- producer_doc = insert_into_producer(producer, "test update 1")
- producer_doc["description"] = "test update 2"
- producer_doc = producer.update(producer_doc)
- self.pull_producer_data()
- local_doc = frappe.get_doc(producer_doc.doctype, producer_doc.name)
- self.assertEqual(local_doc.description, producer_doc.description)
-
- def test_delete(self):
- producer = get_remote_site()
- producer_doc = insert_into_producer(producer, "test delete sync")
- self.pull_producer_data()
- self.assertTrue(frappe.db.exists("ToDo", producer_doc.name))
- producer.delete("ToDo", producer_doc.name)
- self.pull_producer_data()
- self.assertFalse(frappe.db.exists("ToDo", producer_doc.name))
-
- @run_only_if(db_type_is.MARIADB)
- def test_multiple_doctypes_sync(self):
- # TODO: This test is extremely flaky with Postgres. Rewrite this!
- producer = get_remote_site()
-
- # insert todo and note in producer
- producer_todo = insert_into_producer(producer, "test multiple doc sync")
- producer_note1 = frappe._dict(doctype="Note", title="test multiple doc sync 1")
- delete_on_remote_if_exists(producer, "Note", {"title": producer_note1["title"]})
- frappe.db.delete("Note", {"title": producer_note1["title"]})
- producer_note1 = producer.insert(producer_note1)
- producer_note2 = frappe._dict(doctype="Note", title="test multiple doc sync 2")
- delete_on_remote_if_exists(producer, "Note", {"title": producer_note2["title"]})
- frappe.db.delete("Note", {"title": producer_note2["title"]})
- producer_note2 = producer.insert(producer_note2)
-
- # update in producer
- producer_todo["description"] = "test multiple doc update sync"
- producer_todo = producer.update(producer_todo)
- producer_note1["content"] = "testing update sync"
- producer_note1 = producer.update(producer_note1)
-
- producer.delete("Note", producer_note2.name)
-
- self.pull_producer_data()
-
- # check inserted
- self.assertTrue(frappe.db.exists("ToDo", producer_todo.name))
-
- # check update
- local_todo = frappe.get_doc("ToDo", producer_todo.name)
- self.assertEqual(local_todo.description, producer_todo.description)
- local_note1 = frappe.get_doc("Note", producer_note1.name)
- self.assertEqual(local_note1.content, producer_note1.content)
-
- # check delete
- self.assertFalse(frappe.db.exists("Note", producer_note2.name))
-
- def test_child_table_sync_with_dependencies(self):
- producer = get_remote_site()
- producer_user = frappe._dict(
- doctype="User",
- email="test_user@sync.com",
- send_welcome_email=0,
- first_name="Test Sync User",
- enabled=1,
- roles=[{"role": "System Manager"}],
- )
- delete_on_remote_if_exists(producer, "User", {"email": producer_user.email})
- frappe.db.delete("User", {"email": producer_user.email})
- producer_user = producer.insert(producer_user)
-
- producer_note = frappe._dict(
- doctype="Note", title="test child table dependency sync", seen_by=[{"user": producer_user.name}]
- )
- delete_on_remote_if_exists(producer, "Note", {"title": producer_note.title})
- frappe.db.delete("Note", {"title": producer_note.title})
- producer_note = producer.insert(producer_note)
-
- self.pull_producer_data()
- self.assertTrue(frappe.db.exists("User", producer_user.name))
- if self.assertTrue(frappe.db.exists("Note", producer_note.name)):
- local_note = frappe.get_doc("Note", producer_note.name)
- self.assertEqual(len(local_note.seen_by), 1)
-
- def test_dynamic_link_dependencies_synced(self):
- producer = get_remote_site()
- # unsubscribe for Note to check whether dependency is fulfilled
- event_producer = frappe.get_doc("Event Producer", producer_url, for_update=True)
- event_producer.producer_doctypes = []
- event_producer.append("producer_doctypes", {"ref_doctype": "ToDo", "use_same_name": 1})
- event_producer.save()
-
- producer_link_doc = frappe._dict(doctype="Note", title="Test Dynamic Link 1")
-
- delete_on_remote_if_exists(producer, "Note", {"title": producer_link_doc.title})
- frappe.db.delete("Note", {"title": producer_link_doc.title})
- producer_link_doc = producer.insert(producer_link_doc)
- producer_doc = frappe._dict(
- doctype="ToDo",
- description="Test Dynamic Link 2",
- assigned_by="Administrator",
- reference_type="Note",
- reference_name=producer_link_doc.name,
- )
- producer_doc = producer.insert(producer_doc)
-
- self.pull_producer_data()
-
- # check dynamic link dependency created
- self.assertTrue(frappe.db.exists("Note", producer_link_doc.name))
- self.assertEqual(
- producer_link_doc.name, frappe.db.get_value("ToDo", producer_doc.name, "reference_name")
- )
-
- reset_configuration(producer_url)
-
- def test_naming_configuration(self):
- # test with use_same_name = 0
- producer = get_remote_site()
- event_producer = frappe.get_doc("Event Producer", producer_url, for_update=True)
- event_producer.producer_doctypes = []
- event_producer.append("producer_doctypes", {"ref_doctype": "ToDo", "use_same_name": 0})
- event_producer.save()
-
- producer_doc = insert_into_producer(producer, "test different name sync")
- self.pull_producer_data()
- self.assertTrue(
- frappe.db.exists(
- "ToDo", {"remote_docname": producer_doc.name, "remote_site_name": producer_url}
- )
- )
-
- reset_configuration(producer_url)
-
- def test_conditional_events(self):
- producer = get_remote_site()
-
- # Add Condition
- event_producer = frappe.get_doc("Event Producer", producer_url)
- note_producer_entry = [x for x in event_producer.producer_doctypes if x.ref_doctype == "Note"][0]
- note_producer_entry.condition = "doc.public == 1"
- event_producer.save()
-
- # Make test doc
- producer_note1 = frappe._dict(doctype="Note", public=0, title="test conditional sync")
- delete_on_remote_if_exists(producer, "Note", {"title": producer_note1["title"]})
- producer_note1 = producer.insert(producer_note1)
-
- # Make Update
- producer_note1["content"] = "Test Conditional Sync Content"
- producer_note1 = producer.update(producer_note1)
-
- self.pull_producer_data()
-
- # Check if synced here
- self.assertFalse(frappe.db.exists("Note", producer_note1.name))
-
- # Lets satisfy the condition
- producer_note1["public"] = 1
- producer_note1 = producer.update(producer_note1)
-
- self.pull_producer_data()
-
- # it should sync now
- self.assertTrue(frappe.db.exists("Note", producer_note1.name))
- local_note = frappe.get_doc("Note", producer_note1.name)
- self.assertEqual(local_note.content, producer_note1.content)
-
- reset_configuration(producer_url)
-
- def test_conditional_events_with_cmd(self):
- producer = get_remote_site()
-
- # Add Condition
- event_producer = frappe.get_doc("Event Producer", producer_url)
- note_producer_entry = [x for x in event_producer.producer_doctypes if x.ref_doctype == "Note"][0]
- note_producer_entry.condition = (
- "cmd: frappe.event_streaming.doctype.event_producer.test_event_producer.can_sync_note"
- )
- event_producer.save()
-
- # Make test doc
- producer_note1 = frappe._dict(doctype="Note", public=0, title="test conditional sync cmd")
- delete_on_remote_if_exists(producer, "Note", {"title": producer_note1["title"]})
- producer_note1 = producer.insert(producer_note1)
-
- # Make Update
- producer_note1["content"] = "Test Conditional Sync Content"
- producer_note1 = producer.update(producer_note1)
-
- self.pull_producer_data()
-
- # Check if synced here
- self.assertFalse(frappe.db.exists("Note", producer_note1.name))
-
- # Lets satisfy the condition
- producer_note1["public"] = 1
- producer_note1 = producer.update(producer_note1)
-
- self.pull_producer_data()
-
- # it should sync now
- self.assertTrue(frappe.db.exists("Note", producer_note1.name))
- local_note = frappe.get_doc("Note", producer_note1.name)
- self.assertEqual(local_note.content, producer_note1.content)
-
- reset_configuration(producer_url)
-
- def test_update_log(self):
- producer = get_remote_site()
- producer_doc = insert_into_producer(producer, "test update log")
- update_log_doc = producer.get_value(
- "Event Update Log", "docname", {"docname": producer_doc.get("name")}
- )
- self.assertEqual(update_log_doc.get("docname"), producer_doc.get("name"))
-
- def test_event_sync_log(self):
- producer = get_remote_site()
- producer_doc = insert_into_producer(producer, "test event sync log")
- self.pull_producer_data()
- self.assertTrue(frappe.db.exists("Event Sync Log", {"docname": producer_doc.name}))
-
- def pull_producer_data(self):
- pull_from_node(producer_url)
-
- def test_mapping(self):
- producer = get_remote_site()
- event_producer = frappe.get_doc("Event Producer", producer_url, for_update=True)
- event_producer.producer_doctypes = []
- mapping = [{"local_fieldname": "description", "remote_fieldname": "content"}]
- event_producer.append(
- "producer_doctypes",
- {
- "ref_doctype": "ToDo",
- "use_same_name": 1,
- "has_mapping": 1,
- "mapping": get_mapping("ToDo to Note", "ToDo", "Note", mapping),
- },
- )
- event_producer.save()
-
- producer_note = frappe._dict(doctype="Note", title="Test Mapping", content="Test Mapping")
- delete_on_remote_if_exists(producer, "Note", {"title": producer_note.title})
- producer_note = producer.insert(producer_note)
- self.pull_producer_data()
- # check inserted
- self.assertTrue(frappe.db.exists("ToDo", {"description": producer_note.content}))
-
- # update in producer
- producer_note["content"] = "test mapped doc update sync"
- producer_note = producer.update(producer_note)
- self.pull_producer_data()
-
- # check updated
- self.assertTrue(frappe.db.exists("ToDo", {"description": producer_note["content"]}))
-
- producer.delete("Note", producer_note.name)
- self.pull_producer_data()
- # check delete
- self.assertFalse(frappe.db.exists("ToDo", {"description": producer_note.content}))
-
- reset_configuration(producer_url)
-
- def test_inner_mapping(self):
- producer = get_remote_site()
-
- setup_event_producer_for_inner_mapping()
- producer_note = frappe._dict(
- doctype="Note", title="Inner Mapping Tester", content="Test Inner Mapping"
- )
- delete_on_remote_if_exists(producer, "Note", {"title": producer_note.title})
- producer_note = producer.insert(producer_note)
- self.pull_producer_data()
-
- # check dependency inserted
- self.assertTrue(frappe.db.exists("Role", {"role_name": producer_note.title}))
- # check doc inserted
- self.assertTrue(frappe.db.exists("ToDo", {"description": producer_note.content}))
-
- reset_configuration(producer_url)
-
-
-def can_sync_note(consumer, doc, update_log):
- return doc.public == 1
-
-
-def setup_event_producer_for_inner_mapping():
- event_producer = frappe.get_doc("Event Producer", producer_url, for_update=True)
- event_producer.producer_doctypes = []
- inner_mapping = [{"local_fieldname": "role_name", "remote_fieldname": "title"}]
- inner_map = get_mapping("Role to Note Dependency Creation", "Role", "Note", inner_mapping)
- mapping = [
- {
- "local_fieldname": "description",
- "remote_fieldname": "content",
- },
- {
- "local_fieldname": "role",
- "remote_fieldname": "title",
- "mapping_type": "Document",
- "mapping": inner_map,
- "remote_value_filters": json.dumps({"title": "title"}),
- },
- ]
- event_producer.append(
- "producer_doctypes",
- {
- "ref_doctype": "ToDo",
- "use_same_name": 1,
- "has_mapping": 1,
- "mapping": get_mapping("ToDo to Note Mapping", "ToDo", "Note", mapping),
- },
- )
- event_producer.save()
- return event_producer
-
-
-def insert_into_producer(producer, description):
- # create and insert todo on remote site
- todo = dict(doctype="ToDo", description=description, assigned_by="Administrator")
- return producer.insert(todo)
-
-
-def delete_on_remote_if_exists(producer, doctype, filters):
- remote_doc = producer.get_value(doctype, "name", filters)
- if remote_doc:
- producer.delete(doctype, remote_doc.get("name"))
-
-
-def get_mapping(mapping_name, local, remote, field_map):
- name = frappe.db.exists("Document Type Mapping", mapping_name)
- if name:
- doc = frappe.get_doc("Document Type Mapping", name)
- else:
- doc = frappe.new_doc("Document Type Mapping")
-
- doc.mapping_name = mapping_name
- doc.local_doctype = local
- doc.remote_doctype = remote
- for entry in field_map:
- doc.append("field_mapping", entry)
- doc.save()
- return doc.name
-
-
-def create_event_producer(producer_url):
- if frappe.db.exists("Event Producer", producer_url):
- event_producer = frappe.get_doc("Event Producer", producer_url)
- for entry in event_producer.producer_doctypes:
- entry.unsubscribe = 0
- event_producer.save()
- return
-
- generate_keys("Administrator")
-
- producer_site = connect()
-
- response = producer_site.post_api(
- "frappe.core.doctype.user.user.generate_keys", params={"user": "Administrator"}
- )
-
- api_secret = response.get("api_secret")
-
- response = producer_site.get_value("User", "api_key", {"name": "Administrator"})
- api_key = response.get("api_key")
-
- event_producer = frappe.new_doc("Event Producer")
- event_producer.producer_doctypes = []
- event_producer.producer_url = producer_url
- event_producer.append("producer_doctypes", {"ref_doctype": "ToDo", "use_same_name": 1})
- event_producer.append("producer_doctypes", {"ref_doctype": "Note", "use_same_name": 1})
- event_producer.user = "Administrator"
- event_producer.api_key = api_key
- event_producer.api_secret = api_secret
- event_producer.save()
-
-
-def reset_configuration(producer_url):
- event_producer = frappe.get_doc("Event Producer", producer_url, for_update=True)
- event_producer.producer_doctypes = []
- event_producer.conditions = []
- event_producer.producer_url = producer_url
- event_producer.append("producer_doctypes", {"ref_doctype": "ToDo", "use_same_name": 1})
- event_producer.append("producer_doctypes", {"ref_doctype": "Note", "use_same_name": 1})
- event_producer.user = "Administrator"
- event_producer.save()
-
-
-def get_remote_site():
- producer_doc = frappe.get_doc("Event Producer", producer_url)
- producer_site = FrappeClient(
- url=producer_doc.producer_url, username="Administrator", password="admin", verify=False
- )
- return producer_site
-
-
-def unsubscribe_doctypes(producer_url):
- event_producer = frappe.get_doc("Event Producer", producer_url)
- for entry in event_producer.producer_doctypes:
- entry.unsubscribe = 1
- event_producer.save()
-
-
-def connect():
- def _connect():
- return FrappeClient(url=producer_url, username="Administrator", password="admin", verify=False)
-
- try:
- return _connect()
- except Exception:
- return _connect()
diff --git a/frappe/event_streaming/doctype/event_producer_document_type/__init__.py b/frappe/event_streaming/doctype/event_producer_document_type/__init__.py
deleted file mode 100644
index e69de29bb2..0000000000
diff --git a/frappe/event_streaming/doctype/event_producer_document_type/event_producer_document_type.json b/frappe/event_streaming/doctype/event_producer_document_type/event_producer_document_type.json
deleted file mode 100644
index 17fd51d12d..0000000000
--- a/frappe/event_streaming/doctype/event_producer_document_type/event_producer_document_type.json
+++ /dev/null
@@ -1,86 +0,0 @@
-{
- "actions": [],
- "creation": "2019-10-03 21:08:25.890352",
- "doctype": "DocType",
- "editable_grid": 1,
- "engine": "InnoDB",
- "field_order": [
- "ref_doctype",
- "status",
- "use_same_name",
- "unsubscribe",
- "has_mapping",
- "mapping",
- "condition"
- ],
- "fields": [
- {
- "columns": 3,
- "fieldname": "ref_doctype",
- "fieldtype": "Link",
- "in_list_view": 1,
- "label": "Document Type",
- "options": "DocType",
- "reqd": 1,
- "set_only_once": 1
- },
- {
- "default": "0",
- "description": "If the document has different field names on the Producer and Consumer's end check this and set up the Mapping",
- "fieldname": "has_mapping",
- "fieldtype": "Check",
- "label": "Has Mapping"
- },
- {
- "depends_on": "eval: doc.has_mapping",
- "fieldname": "mapping",
- "fieldtype": "Link",
- "label": "Mapping",
- "options": "Document Type Mapping"
- },
- {
- "columns": 2,
- "default": "0",
- "description": "If this is checked the documents will have the same name as they have on the Event Producer's site",
- "fieldname": "use_same_name",
- "fieldtype": "Check",
- "in_list_view": 1,
- "label": "Use Same Name"
- },
- {
- "columns": 3,
- "default": "Pending",
- "fieldname": "status",
- "fieldtype": "Select",
- "in_list_view": 1,
- "label": "Approval Status",
- "options": "Pending\nApproved\nRejected",
- "read_only": 1
- },
- {
- "columns": 2,
- "default": "0",
- "fieldname": "unsubscribe",
- "fieldtype": "Check",
- "in_list_view": 1,
- "label": "Unsubscribe"
- },
- {
- "fieldname": "condition",
- "fieldtype": "Code",
- "label": "Condition"
- }
- ],
- "istable": 1,
- "links": [],
- "modified": "2020-11-07 09:26:58.463868",
- "modified_by": "Administrator",
- "module": "Event Streaming",
- "name": "Event Producer Document Type",
- "owner": "Administrator",
- "permissions": [],
- "quick_entry": 1,
- "sort_field": "modified",
- "sort_order": "DESC",
- "track_changes": 1
-}
\ No newline at end of file
diff --git a/frappe/event_streaming/doctype/event_producer_document_type/event_producer_document_type.py b/frappe/event_streaming/doctype/event_producer_document_type/event_producer_document_type.py
deleted file mode 100644
index 8f4c936792..0000000000
--- a/frappe/event_streaming/doctype/event_producer_document_type/event_producer_document_type.py
+++ /dev/null
@@ -1,9 +0,0 @@
-# Copyright (c) 2019, Frappe Technologies and contributors
-# License: MIT. See LICENSE
-
-# import frappe
-from frappe.model.document import Document
-
-
-class EventProducerDocumentType(Document):
- pass
diff --git a/frappe/event_streaming/doctype/event_producer_last_update/__init__.py b/frappe/event_streaming/doctype/event_producer_last_update/__init__.py
deleted file mode 100644
index e69de29bb2..0000000000
diff --git a/frappe/event_streaming/doctype/event_producer_last_update/event_producer_last_update.js b/frappe/event_streaming/doctype/event_producer_last_update/event_producer_last_update.js
deleted file mode 100644
index 6d18be43e3..0000000000
--- a/frappe/event_streaming/doctype/event_producer_last_update/event_producer_last_update.js
+++ /dev/null
@@ -1,7 +0,0 @@
-// Copyright (c) 2020, Frappe Technologies and contributors
-// For license information, please see license.txt
-
-frappe.ui.form.on("Event Producer Last Update", {
- // refresh: function(frm) {
- // }
-});
diff --git a/frappe/event_streaming/doctype/event_producer_last_update/event_producer_last_update.json b/frappe/event_streaming/doctype/event_producer_last_update/event_producer_last_update.json
deleted file mode 100644
index 27f8ed2f81..0000000000
--- a/frappe/event_streaming/doctype/event_producer_last_update/event_producer_last_update.json
+++ /dev/null
@@ -1,53 +0,0 @@
-{
- "actions": [],
- "autoname": "field:event_producer",
- "creation": "2020-10-26 12:53:11.940177",
- "doctype": "DocType",
- "editable_grid": 1,
- "engine": "InnoDB",
- "field_order": [
- "event_producer",
- "last_update"
- ],
- "fields": [
- {
- "fieldname": "event_producer",
- "fieldtype": "Data",
- "in_list_view": 1,
- "label": "Event Producer",
- "reqd": 1,
- "unique": 1
- },
- {
- "fieldname": "last_update",
- "fieldtype": "Data",
- "label": "Last Update"
- }
- ],
- "in_create": 1,
- "index_web_pages_for_search": 1,
- "links": [],
- "modified": "2020-10-26 13:22:27.056599",
- "modified_by": "Administrator",
- "module": "Event Streaming",
- "name": "Event Producer Last Update",
- "owner": "Administrator",
- "permissions": [
- {
- "create": 1,
- "delete": 1,
- "email": 1,
- "export": 1,
- "print": 1,
- "read": 1,
- "report": 1,
- "role": "System Manager",
- "share": 1,
- "write": 1
- }
- ],
- "read_only": 1,
- "sort_field": "modified",
- "sort_order": "DESC",
- "track_changes": 1
-}
\ No newline at end of file
diff --git a/frappe/event_streaming/doctype/event_producer_last_update/event_producer_last_update.py b/frappe/event_streaming/doctype/event_producer_last_update/event_producer_last_update.py
deleted file mode 100644
index ec5cee7e78..0000000000
--- a/frappe/event_streaming/doctype/event_producer_last_update/event_producer_last_update.py
+++ /dev/null
@@ -1,9 +0,0 @@
-# Copyright (c) 2020, Frappe Technologies and contributors
-# License: MIT. See LICENSE
-
-# import frappe
-from frappe.model.document import Document
-
-
-class EventProducerLastUpdate(Document):
- pass
diff --git a/frappe/event_streaming/doctype/event_producer_last_update/test_event_producer_last_update.py b/frappe/event_streaming/doctype/event_producer_last_update/test_event_producer_last_update.py
deleted file mode 100644
index 3e68159790..0000000000
--- a/frappe/event_streaming/doctype/event_producer_last_update/test_event_producer_last_update.py
+++ /dev/null
@@ -1,8 +0,0 @@
-# Copyright (c) 2020, Frappe Technologies and Contributors
-# License: MIT. See LICENSE
-# import frappe
-from frappe.tests.utils import FrappeTestCase
-
-
-class TestEventProducerLastUpdate(FrappeTestCase):
- pass
diff --git a/frappe/event_streaming/doctype/event_sync_log/__init__.py b/frappe/event_streaming/doctype/event_sync_log/__init__.py
deleted file mode 100644
index e69de29bb2..0000000000
diff --git a/frappe/event_streaming/doctype/event_sync_log/event_sync_log.js b/frappe/event_streaming/doctype/event_sync_log/event_sync_log.js
deleted file mode 100644
index 7cc3198bae..0000000000
--- a/frappe/event_streaming/doctype/event_sync_log/event_sync_log.js
+++ /dev/null
@@ -1,24 +0,0 @@
-// Copyright (c) 2019, Frappe Technologies and contributors
-// For license information, please see license.txt
-
-frappe.ui.form.on("Event Sync Log", {
- refresh: function (frm) {
- if (frm.doc.status == "Failed") {
- frm.add_custom_button(__("Resync"), function () {
- frappe.call({
- method: "frappe.event_streaming.doctype.event_producer.event_producer.resync",
- args: {
- update: frm.doc,
- },
- callback: function (r) {
- if (r.message) {
- frappe.msgprint(r.message);
- frm.set_value("status", r.message);
- frm.save();
- }
- },
- });
- });
- }
- },
-});
diff --git a/frappe/event_streaming/doctype/event_sync_log/event_sync_log.json b/frappe/event_streaming/doctype/event_sync_log/event_sync_log.json
deleted file mode 100644
index f82128bd7b..0000000000
--- a/frappe/event_streaming/doctype/event_sync_log/event_sync_log.json
+++ /dev/null
@@ -1,137 +0,0 @@
-{
- "creation": "2019-09-24 22:22:05.845089",
- "doctype": "DocType",
- "editable_grid": 1,
- "engine": "InnoDB",
- "field_order": [
- "update_type",
- "ref_doctype",
- "docname",
- "column_break_4",
- "status",
- "event_producer",
- "producer_doc",
- "event_configurations_section",
- "use_same_name",
- "column_break_9",
- "mapping",
- "section_break_8",
- "data",
- "error"
- ],
- "fields": [
- {
- "fieldname": "update_type",
- "fieldtype": "Select",
- "in_list_view": 1,
- "label": "Update Type",
- "options": "Create\nUpdate\nDelete",
- "read_only": 1
- },
- {
- "fieldname": "ref_doctype",
- "fieldtype": "Link",
- "label": "Doctype",
- "options": "DocType",
- "read_only": 1
- },
- {
- "fieldname": "docname",
- "fieldtype": "Data",
- "in_list_view": 1,
- "label": "Document Name",
- "options": "ref_doctype",
- "read_only": 1
- },
- {
- "fieldname": "column_break_4",
- "fieldtype": "Column Break"
- },
- {
- "fieldname": "status",
- "fieldtype": "Select",
- "in_list_view": 1,
- "label": "Status",
- "options": "\nSynced\nFailed",
- "read_only": 1
- },
- {
- "fieldname": "event_producer",
- "fieldtype": "Data",
- "in_list_view": 1,
- "label": "Event Producer",
- "options": "Event Producer",
- "read_only": 1
- },
- {
- "fieldname": "section_break_8",
- "fieldtype": "Section Break",
- "label": "Data"
- },
- {
- "fieldname": "data",
- "fieldtype": "Code",
- "label": "Data",
- "read_only": 1
- },
- {
- "fieldname": "producer_doc",
- "fieldtype": "Data",
- "label": "Producer Document Name",
- "read_only": 1
- },
- {
- "depends_on": "eval:doc.status=='Failed'",
- "fieldname": "error",
- "fieldtype": "Code",
- "label": "Error",
- "read_only": 1
- },
- {
- "fieldname": "event_configurations_section",
- "fieldtype": "Section Break",
- "label": "Event Configurations"
- },
- {
- "default": "0",
- "fieldname": "use_same_name",
- "fieldtype": "Data",
- "label": "Use Same Name",
- "read_only": 1
- },
- {
- "fieldname": "column_break_9",
- "fieldtype": "Column Break"
- },
- {
- "fieldname": "mapping",
- "fieldtype": "Data",
- "label": "Mapping",
- "read_only": 1
- }
- ],
- "in_create": 1,
- "modified": "2019-10-07 13:22:10.401479",
- "modified_by": "Administrator",
- "module": "Event Streaming",
- "name": "Event Sync Log",
- "owner": "Administrator",
- "permissions": [
- {
- "create": 1,
- "delete": 1,
- "email": 1,
- "export": 1,
- "print": 1,
- "read": 1,
- "report": 1,
- "role": "System Manager",
- "share": 1,
- "write": 1
- }
- ],
- "quick_entry": 1,
- "sort_field": "modified",
- "sort_order": "DESC",
- "track_changes": 1
-}
\ No newline at end of file
diff --git a/frappe/event_streaming/doctype/event_sync_log/event_sync_log.py b/frappe/event_streaming/doctype/event_sync_log/event_sync_log.py
deleted file mode 100644
index a1d82ad08f..0000000000
--- a/frappe/event_streaming/doctype/event_sync_log/event_sync_log.py
+++ /dev/null
@@ -1,9 +0,0 @@
-# Copyright (c) 2019, Frappe Technologies and contributors
-# License: MIT. See LICENSE
-
-# import frappe
-from frappe.model.document import Document
-
-
-class EventSyncLog(Document):
- pass
diff --git a/frappe/event_streaming/doctype/event_sync_log/event_sync_log_list.js b/frappe/event_streaming/doctype/event_sync_log/event_sync_log_list.js
deleted file mode 100644
index 97d2ee0a1d..0000000000
--- a/frappe/event_streaming/doctype/event_sync_log/event_sync_log_list.js
+++ /dev/null
@@ -1,9 +0,0 @@
-frappe.listview_settings["Event Sync Log"] = {
- get_indicator: function (doc) {
- var colors = {
- Failed: "red",
- Synced: "green",
- };
- return [__(doc.status), colors[doc.status], "status,=," + doc.status];
- },
-};
diff --git a/frappe/event_streaming/doctype/event_sync_log/test_event_sync_log.py b/frappe/event_streaming/doctype/event_sync_log/test_event_sync_log.py
deleted file mode 100644
index 5efc030026..0000000000
--- a/frappe/event_streaming/doctype/event_sync_log/test_event_sync_log.py
+++ /dev/null
@@ -1,8 +0,0 @@
-# Copyright (c) 2019, Frappe Technologies and Contributors
-# License: MIT. See LICENSE
-# import frappe
-from frappe.tests.utils import FrappeTestCase
-
-
-class TestEventSyncLog(FrappeTestCase):
- pass
diff --git a/frappe/event_streaming/doctype/event_update_log/__init__.py b/frappe/event_streaming/doctype/event_update_log/__init__.py
deleted file mode 100644
index e69de29bb2..0000000000
diff --git a/frappe/event_streaming/doctype/event_update_log/event_update_log.js b/frappe/event_streaming/doctype/event_update_log/event_update_log.js
deleted file mode 100644
index d901799780..0000000000
--- a/frappe/event_streaming/doctype/event_update_log/event_update_log.js
+++ /dev/null
@@ -1,7 +0,0 @@
-// Copyright (c) 2019, Frappe Technologies Pvt. Ltd. and contributors
-// For license information, please see license.txt
-
-frappe.ui.form.on("Event Update Log", {
- // refresh: function(frm) {
- // }
-});
diff --git a/frappe/event_streaming/doctype/event_update_log/event_update_log.json b/frappe/event_streaming/doctype/event_update_log/event_update_log.json
deleted file mode 100644
index a42bc7ec87..0000000000
--- a/frappe/event_streaming/doctype/event_update_log/event_update_log.json
+++ /dev/null
@@ -1,77 +0,0 @@
-{
- "actions": [],
- "creation": "2019-07-30 15:31:26.352527",
- "doctype": "DocType",
- "editable_grid": 1,
- "engine": "InnoDB",
- "field_order": [
- "update_type",
- "ref_doctype",
- "docname",
- "data",
- "consumers"
- ],
- "fields": [
- {
- "fieldname": "update_type",
- "fieldtype": "Select",
- "in_list_view": 1,
- "label": "Update Type",
- "options": "Create\nUpdate\nDelete",
- "read_only": 1
- },
- {
- "fieldname": "ref_doctype",
- "fieldtype": "Link",
- "in_list_view": 1,
- "label": "DocType",
- "options": "DocType",
- "read_only": 1
- },
- {
- "fieldname": "docname",
- "fieldtype": "Data",
- "in_list_view": 1,
- "label": "Document Name",
- "read_only": 1
- },
- {
- "fieldname": "data",
- "fieldtype": "Code",
- "label": "Data",
- "read_only": 1
- },
- {
- "fieldname": "consumers",
- "fieldtype": "Table MultiSelect",
- "label": "Consumers",
- "options": "Event Update Log Consumer",
- "read_only": 1
- }
- ],
- "in_create": 1,
- "links": [],
- "modified": "2020-09-04 07:31:52.599804",
- "modified_by": "Administrator",
- "module": "Event Streaming",
- "name": "Event Update Log",
- "owner": "Administrator",
- "permissions": [
- {
- "create": 1,
- "delete": 1,
- "email": 1,
- "export": 1,
- "print": 1,
- "read": 1,
- "report": 1,
- "role": "System Manager",
- "share": 1,
- "write": 1
- }
- ],
- "quick_entry": 1,
- "sort_field": "modified",
- "sort_order": "DESC",
- "track_changes": 1
-}
\ No newline at end of file
diff --git a/frappe/event_streaming/doctype/event_update_log/event_update_log.py b/frappe/event_streaming/doctype/event_update_log/event_update_log.py
deleted file mode 100644
index e40f600484..0000000000
--- a/frappe/event_streaming/doctype/event_update_log/event_update_log.py
+++ /dev/null
@@ -1,296 +0,0 @@
-# Copyright (c) 2019, Frappe Technologies Pvt. Ltd. and contributors
-# License: MIT. See LICENSE
-
-import frappe
-from frappe.model import no_value_fields, table_fields
-from frappe.model.document import Document
-from frappe.utils.background_jobs import get_jobs
-
-
-class EventUpdateLog(Document):
- def after_insert(self):
- """Send update notification updates to event consumers
- whenever update log is generated"""
- enqueued_method = (
- "frappe.event_streaming.doctype.event_consumer.event_consumer.notify_event_consumers"
- )
- jobs = get_jobs()
- if not jobs or enqueued_method not in jobs[frappe.local.site]:
- frappe.enqueue(
- enqueued_method, doctype=self.ref_doctype, queue="long", enqueue_after_commit=True
- )
-
-
-def notify_consumers(doc, event):
- """called via hooks"""
- # make event update log for doctypes having event consumers
- if frappe.flags.in_install or frappe.flags.in_migrate:
- return
-
- consumers = check_doctype_has_consumers(doc.doctype)
- if consumers:
- if event == "after_insert":
- doc.flags.event_update_log = make_event_update_log(doc, update_type="Create")
- elif event == "on_trash":
- make_event_update_log(doc, update_type="Delete")
- else:
- # on_update
- # called after saving
- if not doc.flags.event_update_log: # if not already inserted
- diff = get_update(doc.get_doc_before_save(), doc)
- if diff:
- doc.diff = diff
- make_event_update_log(doc, update_type="Update")
-
-
-def check_doctype_has_consumers(doctype):
- """Check if doctype has event consumers for event streaming"""
- return frappe.cache_manager.get_doctype_map(
- "Event Consumer Document Type",
- doctype,
- dict(ref_doctype=doctype, status="Approved", unsubscribed=0),
- )
-
-
-def get_update(old, new, for_child=False):
- """
- Get document objects with updates only
- If there is a change, then returns a dict like:
- {
- "changed" : {fieldname1: new_value1, fieldname2: new_value2, },
- "added" : {table_fieldname1: [{row_dict1}, {row_dict2}], },
- "removed" : {table_fieldname1: [row_name1, row_name2], },
- "row_changed" : {table_fieldname1:
- {
- child_fieldname1: new_val,
- child_fieldname2: new_val
- },
- },
- }
- """
- if not new:
- return None
-
- out = frappe._dict(changed={}, added={}, removed={}, row_changed={})
- for df in new.meta.fields:
- if df.fieldtype in no_value_fields and df.fieldtype not in table_fields:
- continue
-
- old_value, new_value = old.get(df.fieldname), new.get(df.fieldname)
-
- if df.fieldtype in table_fields:
- old_row_by_name, new_row_by_name = make_maps(old_value, new_value)
- out = check_for_additions(out, df, new_value, old_row_by_name)
- out = check_for_deletions(out, df, old_value, new_row_by_name)
-
- elif old_value != new_value:
- out.changed[df.fieldname] = new_value
-
- out = check_docstatus(out, old, new, for_child)
- if any((out.changed, out.added, out.removed, out.row_changed)):
- return out
- return None
-
-
-def make_event_update_log(doc, update_type):
- """Save update info for doctypes that have event consumers"""
- if update_type != "Delete":
- # diff for update type, doc for create type
- data = frappe.as_json(doc) if not doc.get("diff") else frappe.as_json(doc.diff)
- else:
- data = None
- return frappe.get_doc(
- {
- "doctype": "Event Update Log",
- "update_type": update_type,
- "ref_doctype": doc.doctype,
- "docname": doc.name,
- "data": data,
- }
- ).insert(ignore_permissions=True)
-
-
-def make_maps(old_value, new_value):
- """make maps"""
- old_row_by_name, new_row_by_name = {}, {}
- for d in old_value:
- old_row_by_name[d.name] = d
- for d in new_value:
- new_row_by_name[d.name] = d
- return old_row_by_name, new_row_by_name
-
-
-def check_for_additions(out, df, new_value, old_row_by_name):
- """check rows for additions, changes"""
- for _i, d in enumerate(new_value):
- if d.name in old_row_by_name:
- diff = get_update(old_row_by_name[d.name], d, for_child=True)
- if diff and diff.changed:
- if not out.row_changed.get(df.fieldname):
- out.row_changed[df.fieldname] = []
- diff.changed["name"] = d.name
- out.row_changed[df.fieldname].append(diff.changed)
- else:
- if not out.added.get(df.fieldname):
- out.added[df.fieldname] = []
- out.added[df.fieldname].append(d.as_dict())
- return out
-
-
-def check_for_deletions(out, df, old_value, new_row_by_name):
- """check for deletions"""
- for d in old_value:
- if d.name not in new_row_by_name:
- if not out.removed.get(df.fieldname):
- out.removed[df.fieldname] = []
- out.removed[df.fieldname].append(d.name)
- return out
-
-
-def check_docstatus(out, old, new, for_child):
- """docstatus changes"""
- if not for_child and old.docstatus != new.docstatus:
- out.changed["docstatus"] = new.docstatus
- return out
-
-
-def is_consumer_uptodate(update_log, consumer):
- """
- Checks if Consumer has read all the UpdateLogs before the specified update_log
- :param update_log: The UpdateLog Doc in context
- :param consumer: The EventConsumer doc
- """
- if update_log.update_type == "Create":
- # consumer is obviously up to date
- return True
-
- prev_logs = frappe.get_all(
- "Event Update Log",
- filters={
- "ref_doctype": update_log.ref_doctype,
- "docname": update_log.docname,
- "creation": ["<", update_log.creation],
- },
- order_by="creation desc",
- limit_page_length=1,
- )
-
- if not len(prev_logs):
- return False
-
- prev_log_consumers = frappe.get_all(
- "Event Update Log Consumer",
- fields=["consumer"],
- filters={
- "parent": prev_logs[0].name,
- "parenttype": "Event Update Log",
- "consumer": consumer.name,
- },
- )
-
- return len(prev_log_consumers) > 0
-
-
-def mark_consumer_read(update_log_name, consumer_name):
- """
- This function appends the Consumer to the list of Consumers that has 'read' an Update Log
- """
- update_log = frappe.get_doc("Event Update Log", update_log_name)
- if len([x for x in update_log.consumers if x.consumer == consumer_name]):
- return
-
- frappe.get_doc(
- frappe._dict(
- doctype="Event Update Log Consumer",
- consumer=consumer_name,
- parent=update_log_name,
- parenttype="Event Update Log",
- parentfield="consumers",
- )
- ).insert(ignore_permissions=True)
-
-
-def get_unread_update_logs(consumer_name, dt, dn):
- """
- Get old logs unread by the consumer on a particular document
- """
- already_consumed = [
- x[0]
- for x in frappe.db.sql(
- """
- SELECT
- update_log.name
- FROM `tabEvent Update Log` update_log
- JOIN `tabEvent Update Log Consumer` consumer ON consumer.parent = %(log_name)s
- WHERE
- consumer.consumer = %(consumer)s
- AND update_log.ref_doctype = %(dt)s
- AND update_log.docname = %(dn)s
- """,
- {
- "consumer": consumer_name,
- "dt": dt,
- "dn": dn,
- "log_name": "update_log.name"
- if frappe.conf.db_type == "mariadb"
- else "CAST(update_log.name AS VARCHAR)",
- },
- as_dict=0,
- )
- ]
-
- logs = frappe.get_all(
- "Event Update Log",
- fields=["update_type", "ref_doctype", "docname", "data", "name", "creation"],
- filters={"ref_doctype": dt, "docname": dn, "name": ["not in", already_consumed]},
- order_by="creation",
- )
-
- return logs
-
-
-@frappe.whitelist()
-def get_update_logs_for_consumer(event_consumer, doctypes, last_update):
- """
- Fetches all the UpdateLogs for the consumer
- It will inject old un-consumed Update Logs if a doc was just found to be accessible to the Consumer
- """
-
- if isinstance(doctypes, str):
- doctypes = frappe.parse_json(doctypes)
-
- from frappe.event_streaming.doctype.event_consumer.event_consumer import has_consumer_access
-
- consumer = frappe.get_doc("Event Consumer", event_consumer)
- docs = frappe.get_list(
- doctype="Event Update Log",
- filters={"ref_doctype": ("in", doctypes), "creation": (">", last_update)},
- fields=["update_type", "ref_doctype", "docname", "data", "name", "creation"],
- order_by="creation desc",
- )
-
- result = []
- to_update_history = []
- for d in docs:
- if (d.ref_doctype, d.docname) in to_update_history:
- # will be notified by background jobs
- continue
-
- if not has_consumer_access(consumer=consumer, update_log=d):
- continue
-
- if not is_consumer_uptodate(d, consumer):
- to_update_history.append((d.ref_doctype, d.docname))
- # get_unread_update_logs will have the current log
- old_logs = get_unread_update_logs(consumer.name, d.ref_doctype, d.docname)
- if old_logs:
- old_logs.reverse()
- result.extend(old_logs)
- else:
- result.append(d)
-
- for d in result:
- mark_consumer_read(update_log_name=d.name, consumer_name=consumer.name)
-
- result.reverse()
- return result
diff --git a/frappe/event_streaming/doctype/event_update_log/test_event_update_log.py b/frappe/event_streaming/doctype/event_update_log/test_event_update_log.py
deleted file mode 100644
index a9065ab4ed..0000000000
--- a/frappe/event_streaming/doctype/event_update_log/test_event_update_log.py
+++ /dev/null
@@ -1,8 +0,0 @@
-# Copyright (c) 2019, Frappe Technologies Pvt. Ltd. and Contributors
-# License: MIT. See LICENSE
-# import frappe
-from frappe.tests.utils import FrappeTestCase
-
-
-class TestEventUpdateLog(FrappeTestCase):
- pass
diff --git a/frappe/event_streaming/doctype/event_update_log_consumer/__init__.py b/frappe/event_streaming/doctype/event_update_log_consumer/__init__.py
deleted file mode 100644
index e69de29bb2..0000000000
diff --git a/frappe/event_streaming/doctype/event_update_log_consumer/event_update_log_consumer.json b/frappe/event_streaming/doctype/event_update_log_consumer/event_update_log_consumer.json
deleted file mode 100644
index b3484c6481..0000000000
--- a/frappe/event_streaming/doctype/event_update_log_consumer/event_update_log_consumer.json
+++ /dev/null
@@ -1,32 +0,0 @@
-{
- "actions": [],
- "creation": "2020-06-30 10:54:53.301787",
- "doctype": "DocType",
- "editable_grid": 1,
- "engine": "InnoDB",
- "field_order": [
- "consumer"
- ],
- "fields": [
- {
- "fieldname": "consumer",
- "fieldtype": "Link",
- "in_list_view": 1,
- "label": "Consumer",
- "options": "Event Consumer",
- "reqd": 1
- }
- ],
- "istable": 1,
- "links": [],
- "modified": "2020-06-30 10:54:53.301787",
- "modified_by": "Administrator",
- "module": "Event Streaming",
- "name": "Event Update Log Consumer",
- "owner": "Administrator",
- "permissions": [],
- "quick_entry": 1,
- "sort_field": "modified",
- "sort_order": "DESC",
- "track_changes": 1
-}
\ No newline at end of file
diff --git a/frappe/event_streaming/doctype/event_update_log_consumer/event_update_log_consumer.py b/frappe/event_streaming/doctype/event_update_log_consumer/event_update_log_consumer.py
deleted file mode 100644
index 69da7db92e..0000000000
--- a/frappe/event_streaming/doctype/event_update_log_consumer/event_update_log_consumer.py
+++ /dev/null
@@ -1,9 +0,0 @@
-# Copyright (c) 2020, Frappe Technologies and contributors
-# License: MIT. See LICENSE
-
-# import frappe
-from frappe.model.document import Document
-
-
-class EventUpdateLogConsumer(Document):
- pass
diff --git a/frappe/hooks.py b/frappe/hooks.py
index 85aeea7418..9715508c78 100644
--- a/frappe/hooks.py
+++ b/frappe/hooks.py
@@ -138,16 +138,12 @@ standard_queries = {"User": "frappe.core.doctype.user.user.user_query"}
doc_events = {
"*": {
- "after_insert": [
- "frappe.event_streaming.doctype.event_update_log.event_update_log.notify_consumers"
- ],
"on_update": [
"frappe.desk.notifications.clear_doctype_notifications",
"frappe.core.doctype.activity_log.feed.update_feed",
"frappe.workflow.doctype.workflow_action.workflow_action.process_workflow_actions",
"frappe.automation.doctype.assignment_rule.assignment_rule.apply",
"frappe.core.doctype.file.utils.attach_files_to_document",
- "frappe.event_streaming.doctype.event_update_log.event_update_log.notify_consumers",
"frappe.automation.doctype.assignment_rule.assignment_rule.update_due_date",
"frappe.core.doctype.user_type.user_type.apply_permissions_for_non_standard_user_type",
],
@@ -155,12 +151,10 @@ doc_events = {
"on_cancel": [
"frappe.desk.notifications.clear_doctype_notifications",
"frappe.workflow.doctype.workflow_action.workflow_action.process_workflow_actions",
- "frappe.event_streaming.doctype.event_update_log.event_update_log.notify_consumers",
],
"on_trash": [
"frappe.desk.notifications.clear_doctype_notifications",
"frappe.workflow.doctype.workflow_action.workflow_action.process_workflow_actions",
- "frappe.event_streaming.doctype.event_update_log.event_update_log.notify_consumers",
],
"on_update_after_submit": [
"frappe.workflow.doctype.workflow_action.workflow_action.process_workflow_actions"
diff --git a/frappe/model/__init__.py b/frappe/model/__init__.py
index 29991fa403..a7f966ebd6 100644
--- a/frappe/model/__init__.py
+++ b/frappe/model/__init__.py
@@ -114,6 +114,8 @@ core_doctypes_list = (
"Client Script",
)
+# NOTE: this is being used for dynamic autoincrement in new sites,
+# removing any of these will require patches.
log_types = (
"Version",
"Error Log",
diff --git a/frappe/model/document.py b/frappe/model/document.py
index 2a82b5af9a..aa3170b4ce 100644
--- a/frappe/model/document.py
+++ b/frappe/model/document.py
@@ -275,9 +275,6 @@ class Document(BaseDocument):
if self.get("amended_from"):
self.copy_attachments_from_amended_from()
- # flag to prevent creation of event update log for create and update both
- # during document creation
- self.flags.update_log_for_doc_creation = True
self.run_post_save_methods()
self.flags.in_insert = False
diff --git a/frappe/modules.txt b/frappe/modules.txt
index fb7817f6ba..863c448594 100644
--- a/frappe/modules.txt
+++ b/frappe/modules.txt
@@ -9,5 +9,4 @@ Integrations
Printing
Contacts
Social
-Automation
-Event Streaming
\ No newline at end of file
+Automation
\ No newline at end of file
diff --git a/frappe/patches.txt b/frappe/patches.txt
index ca8f678e3d..5bc8e70396 100644
--- a/frappe/patches.txt
+++ b/frappe/patches.txt
@@ -156,7 +156,6 @@ frappe.patches.v13_0.set_route_for_blog_category
frappe.patches.v13_0.enable_custom_script
frappe.patches.v13_0.update_newsletter_content_type
execute:frappe.db.set_value('Website Settings', 'Website Settings', {'navbar_template': 'Standard Navbar', 'footer_template': 'Standard Footer'})
-frappe.patches.v13_0.delete_event_producer_and_consumer_keys
frappe.patches.v13_0.web_template_set_module #2020-10-05
frappe.patches.v13_0.remove_custom_link
execute:frappe.delete_doc("DocType", "Footer Item")
@@ -195,6 +194,7 @@ frappe.patches.v14_0.log_settings_migration
frappe.patches.v14_0.setup_likes_from_feedback
frappe.patches.v14_0.update_webforms
frappe.patches.v14_0.delete_payment_gateways
+frappe.patches.v15_0.remove_event_streaming
[post_model_sync]
frappe.patches.v14_0.drop_data_import_legacy
diff --git a/frappe/patches/v13_0/delete_event_producer_and_consumer_keys.py b/frappe/patches/v13_0/delete_event_producer_and_consumer_keys.py
deleted file mode 100644
index 9cb081e15a..0000000000
--- a/frappe/patches/v13_0/delete_event_producer_and_consumer_keys.py
+++ /dev/null
@@ -1,11 +0,0 @@
-# Copyright (c) 2020, Frappe Technologies Pvt. Ltd. and Contributors
-# License: MIT. See LICENSE
-
-import frappe
-
-
-def execute():
- if frappe.db.exists("DocType", "Event Producer"):
- frappe.db.sql("""UPDATE `tabEvent Producer` SET api_key='', api_secret=''""")
- if frappe.db.exists("DocType", "Event Consumer"):
- frappe.db.sql("""UPDATE `tabEvent Consumer` SET api_key='', api_secret=''""")
diff --git a/frappe/patches/v15_0/remove_event_streaming.py b/frappe/patches/v15_0/remove_event_streaming.py
new file mode 100644
index 0000000000..4c6a1ce079
--- /dev/null
+++ b/frappe/patches/v15_0/remove_event_streaming.py
@@ -0,0 +1,22 @@
+import frappe
+
+
+def execute():
+ if "event_streaming" in frappe.get_installed_apps():
+ return
+
+ frappe.delete_doc_if_exists("Module Def", "Event Streaming", force=True)
+
+ for doc in [
+ "Event Consumer Document Type",
+ "Document Type Mapping",
+ "Event Producer",
+ "Event Producer Last Update",
+ "Event Producer Document Type",
+ "Event Consumer",
+ "Document Type Field Mapping",
+ "Event Update Log",
+ "Event Update Log Consumer",
+ "Event Sync Log",
+ ]:
+ frappe.delete_doc_if_exists("DocType", doc, force=True)
diff --git a/frappe/public/js/frappe/views/breadcrumbs.js b/frappe/public/js/frappe/views/breadcrumbs.js
index 93cff25df5..f15ef2adc2 100644
--- a/frappe/public/js/frappe/views/breadcrumbs.js
+++ b/frappe/public/js/frappe/views/breadcrumbs.js
@@ -18,7 +18,6 @@ frappe.breadcrumbs = {
Workflow: "Settings",
Printing: "Settings",
Setup: "Settings",
- "Event Streaming": "Tools",
Automation: "Tools",
},