Merge pull request #17910 from phot0n/remove-event-streaming

refactor!: remove event streaming
This commit is contained in:
Ritwik Puri 2022-09-08 22:00:45 +05:30 committed by GitHub
commit fddaa2da57
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
64 changed files with 41 additions and 2860 deletions

View file

@ -1,7 +1,7 @@
{
"db_host": "127.0.0.1",
"db_port": 3306,
"db_name": "test_frappe_consumer",
"db_name": "test_frappe",
"db_password": "test_frappe",
"allow_tests": true,
"db_type": "mariadb",

View file

@ -1,7 +1,7 @@
{
"db_host": "127.0.0.1",
"db_port": 5432,
"db_name": "test_frappe_consumer",
"db_name": "test_frappe",
"db_password": "test_frappe",
"db_type": "postgres",
"allow_tests": true,

View file

@ -17,37 +17,23 @@ fi
echo "Setting Up Sites & Database..."
mkdir ~/frappe-bench/sites/test_site
cp "${GITHUB_WORKSPACE}/.github/helper/consumer_db/$DB.json" ~/frappe-bench/sites/test_site/site_config.json
if [ "$TYPE" == "server" ]
then
mkdir ~/frappe-bench/sites/test_site_producer
cp "${GITHUB_WORKSPACE}/.github/helper/producer_db/$DB.json" ~/frappe-bench/sites/test_site_producer/site_config.json
fi
cp "${GITHUB_WORKSPACE}/.github/helper/db/$DB.json" ~/frappe-bench/sites/test_site/site_config.json
if [ "$DB" == "mariadb" ]
then
mariadb --host 127.0.0.1 --port 3306 -u root -ptravis -e "SET GLOBAL character_set_server = 'utf8mb4'"
mariadb --host 127.0.0.1 --port 3306 -u root -ptravis -e "SET GLOBAL collation_server = 'utf8mb4_unicode_ci'"
mariadb --host 127.0.0.1 --port 3306 -u root -ptravis -e "SET GLOBAL character_set_server = 'utf8mb4'";
mariadb --host 127.0.0.1 --port 3306 -u root -ptravis -e "SET GLOBAL collation_server = 'utf8mb4_unicode_ci'";
mariadb --host 127.0.0.1 --port 3306 -u root -ptravis -e "CREATE DATABASE test_frappe_consumer"
mariadb --host 127.0.0.1 --port 3306 -u root -ptravis -e "CREATE USER 'test_frappe_consumer'@'localhost' IDENTIFIED BY 'test_frappe_consumer'"
mariadb --host 127.0.0.1 --port 3306 -u root -ptravis -e "GRANT ALL PRIVILEGES ON \`test_frappe_consumer\`.* TO 'test_frappe_consumer'@'localhost'"
mariadb --host 127.0.0.1 --port 3306 -u root -ptravis -e "CREATE DATABASE test_frappe";
mariadb --host 127.0.0.1 --port 3306 -u root -ptravis -e "CREATE USER 'test_frappe'@'localhost' IDENTIFIED BY 'test_frappe'";
mariadb --host 127.0.0.1 --port 3306 -u root -ptravis -e "GRANT ALL PRIVILEGES ON \`test_frappe\`.* TO 'test_frappe'@'localhost'";
mariadb --host 127.0.0.1 --port 3306 -u root -ptravis -e "CREATE DATABASE test_frappe_producer"
mariadb --host 127.0.0.1 --port 3306 -u root -ptravis -e "CREATE USER 'test_frappe_producer'@'localhost' IDENTIFIED BY 'test_frappe_producer'"
mariadb --host 127.0.0.1 --port 3306 -u root -ptravis -e "GRANT ALL PRIVILEGES ON \`test_frappe_producer\`.* TO 'test_frappe_producer'@'localhost'"
mariadb --host 127.0.0.1 --port 3306 -u root -ptravis -e "FLUSH PRIVILEGES"
mariadb --host 127.0.0.1 --port 3306 -u root -ptravis -e "FLUSH PRIVILEGES";
fi
if [ "$DB" == "postgres" ]
then
echo "travis" | psql -h 127.0.0.1 -p 5432 -c "CREATE DATABASE test_frappe_consumer" -U postgres
echo "travis" | psql -h 127.0.0.1 -p 5432 -c "CREATE USER test_frappe_consumer WITH PASSWORD 'test_frappe'" -U postgres
echo "travis" | psql -h 127.0.0.1 -p 5432 -c "CREATE DATABASE test_frappe_producer" -U postgres
echo "travis" | psql -h 127.0.0.1 -p 5432 -c "CREATE USER test_frappe_producer WITH PASSWORD 'test_frappe'" -U postgres
echo "travis" | psql -h 127.0.0.1 -p 5432 -c "CREATE DATABASE test_frappe" -U postgres;
echo "travis" | psql -h 127.0.0.1 -p 5432 -c "CREATE USER test_frappe WITH PASSWORD 'test_frappe'" -U postgres;
fi
echo "Setting Up Procfile..."
@ -78,11 +64,6 @@ fi
bench --site test_site reinstall --yes
if [ "$TYPE" == "server" ]
then
bench --site test_site_producer reinstall --yes
fi
if [ "$TYPE" == "server" ]
then
# wait till assets are built succesfully

View file

@ -1,16 +0,0 @@
{
"db_host": "127.0.0.1",
"db_port": 3306,
"db_name": "test_frappe_producer",
"db_password": "test_frappe",
"allow_tests": true,
"db_type": "mariadb",
"auto_email_id": "test@example.com",
"mail_server": "smtp.example.com",
"mail_login": "test@example.com",
"mail_password": "test",
"admin_password": "admin",
"root_login": "root",
"root_password": "travis",
"host_name": "http://test_site_producer:8000"
}

View file

@ -1,16 +0,0 @@
{
"db_host": "127.0.0.1",
"db_port": 5432,
"db_name": "test_frappe_producer",
"db_password": "test_frappe",
"db_type": "postgres",
"allow_tests": true,
"auto_email_id": "test@example.com",
"mail_server": "smtp.example.com",
"mail_login": "test@example.com",
"mail_password": "test",
"admin_password": "admin",
"root_login": "postgres",
"root_password": "travis",
"host_name": "http://test_site_producer:8000"
}

View file

@ -82,7 +82,6 @@ jobs:
- name: Add to Hosts
run: |
echo "127.0.0.1 test_site" | sudo tee -a /etc/hosts
echo "127.0.0.1 test_site_producer" | sudo tee -a /etc/hosts
- name: Cache pip
uses: actions/cache@v3

View file

@ -84,7 +84,6 @@ jobs:
- name: Add to Hosts
run: |
echo "127.0.0.1 test_site" | sudo tee -a /etc/hosts
echo "127.0.0.1 test_site_producer" | sudo tee -a /etc/hosts
- name: Cache pip
uses: actions/cache@v3

View file

@ -82,7 +82,6 @@ jobs:
- name: Add to Hosts
run: |
echo "127.0.0.1 test_site" | sudo tee -a /etc/hosts
echo "127.0.0.1 test_site_producer" | sudo tee -a /etc/hosts
- name: Cache pip
uses: actions/cache@v3

View file

@ -7,7 +7,6 @@
templates/ @surajshetty3416
www/ @surajshetty3416
patches/ @surajshetty3416
event_streaming/ @ruchamahabal
data_import* @netchampfaris
core/ @surajshetty3416
workspace @shariquerik

View file

@ -1,6 +1,6 @@
{
"charts": [],
"content": "[{\"type\":\"header\",\"data\":{\"text\":\"<span class=\\\"h4\\\"><b>Your Shortcuts</b></span>\",\"col\":12}},{\"type\":\"shortcut\",\"data\":{\"shortcut_name\":\"ToDo\",\"col\":3}},{\"type\":\"shortcut\",\"data\":{\"shortcut_name\":\"Note\",\"col\":3}},{\"type\":\"shortcut\",\"data\":{\"shortcut_name\":\"File\",\"col\":3}},{\"type\":\"shortcut\",\"data\":{\"shortcut_name\":\"Assignment Rule\",\"col\":3}},{\"type\":\"shortcut\",\"data\":{\"shortcut_name\":\"Auto Repeat\",\"col\":3}},{\"type\":\"spacer\",\"data\":{\"col\":12}},{\"type\":\"header\",\"data\":{\"text\":\"<span class=\\\"h4\\\"><b>Reports & Masters</b></span>\",\"col\":12}},{\"type\":\"card\",\"data\":{\"card_name\":\"Tools\",\"col\":4}},{\"type\":\"card\",\"data\":{\"card_name\":\"Email\",\"col\":4}},{\"type\":\"card\",\"data\":{\"card_name\":\"Automation\",\"col\":4}},{\"type\":\"card\",\"data\":{\"card_name\":\"Event Streaming\",\"col\":4}}]",
"content": "[{\"type\":\"header\",\"data\":{\"text\":\"<span class=\\\"h4\\\"><b>Your Shortcuts</b></span>\",\"col\":12}},{\"type\":\"shortcut\",\"data\":{\"shortcut_name\":\"ToDo\",\"col\":3}},{\"type\":\"shortcut\",\"data\":{\"shortcut_name\":\"Note\",\"col\":3}},{\"type\":\"shortcut\",\"data\":{\"shortcut_name\":\"File\",\"col\":3}},{\"type\":\"shortcut\",\"data\":{\"shortcut_name\":\"Assignment Rule\",\"col\":3}},{\"type\":\"shortcut\",\"data\":{\"shortcut_name\":\"Auto Repeat\",\"col\":3}},{\"type\":\"spacer\",\"data\":{\"col\":12}},{\"type\":\"header\",\"data\":{\"text\":\"<span class=\\\"h4\\\"><b>Reports &amp; Masters</b></span>\",\"col\":12}},{\"type\":\"card\",\"data\":{\"card_name\":\"Tools\",\"col\":4}},{\"type\":\"card\",\"data\":{\"card_name\":\"Email\",\"col\":4}},{\"type\":\"card\",\"data\":{\"card_name\":\"Automation\",\"col\":4}}]",
"creation": "2020-03-02 14:53:24.980279",
"docstatus": 0,
"doctype": "Workspace",
@ -107,7 +107,7 @@
"hidden": 0,
"is_query_report": 0,
"label": "Automation",
"link_count": 0,
"link_count": 3,
"onboard": 0,
"type": "Card Break"
},
@ -143,78 +143,16 @@
"link_type": "DocType",
"onboard": 0,
"type": "Link"
},
{
"hidden": 0,
"is_query_report": 0,
"label": "Event Streaming",
"link_count": 0,
"onboard": 0,
"type": "Card Break"
},
{
"dependencies": "",
"hidden": 0,
"is_query_report": 0,
"label": "Event Producer",
"link_count": 0,
"link_to": "Event Producer",
"link_type": "DocType",
"onboard": 0,
"type": "Link"
},
{
"dependencies": "",
"hidden": 0,
"is_query_report": 0,
"label": "Event Consumer",
"link_count": 0,
"link_to": "Event Consumer",
"link_type": "DocType",
"onboard": 0,
"type": "Link"
},
{
"dependencies": "",
"hidden": 0,
"is_query_report": 0,
"label": "Event Update Log",
"link_count": 0,
"link_to": "Event Update Log",
"link_type": "DocType",
"onboard": 0,
"type": "Link"
},
{
"dependencies": "",
"hidden": 0,
"is_query_report": 0,
"label": "Event Sync Log",
"link_count": 0,
"link_to": "Event Sync Log",
"link_type": "DocType",
"onboard": 0,
"type": "Link"
},
{
"dependencies": "",
"hidden": 0,
"is_query_report": 0,
"label": "Document Type Mapping",
"link_count": 0,
"link_to": "Document Type Mapping",
"link_type": "DocType",
"onboard": 0,
"type": "Link"
}
],
"modified": "2022-01-13 17:48:48.456763",
"modified": "2022-08-23 14:42:58.364898",
"modified_by": "Administrator",
"module": "Automation",
"name": "Tools",
"owner": "Administrator",
"parent_page": "",
"public": 1,
"quick_lists": [],
"restrict_to_domain": "",
"roles": [],
"sequence_id": 26.0,

View file

@ -12,7 +12,6 @@ doctype_map_keys = (
"energy_point_rule_map",
"assignment_rule_map",
"milestone_tracker_map",
"event_consumer_document_type_map",
)
bench_cache_keys = ("assets_json",)

View file

@ -1,73 +0,0 @@
{
"actions": [],
"creation": "2019-09-27 12:46:50.165135",
"doctype": "DocType",
"editable_grid": 1,
"engine": "InnoDB",
"field_order": [
"local_fieldname",
"mapping_type",
"mapping",
"remote_value_filters",
"column_break_5",
"remote_fieldname",
"default_value"
],
"fields": [
{
"fieldname": "remote_fieldname",
"fieldtype": "Data",
"in_list_view": 1,
"label": "Remote Fieldname"
},
{
"fieldname": "local_fieldname",
"fieldtype": "Data",
"in_list_view": 1,
"label": "Local Fieldname",
"reqd": 1
},
{
"fieldname": "column_break_5",
"fieldtype": "Column Break"
},
{
"fieldname": "default_value",
"fieldtype": "Data",
"label": "Default Value"
},
{
"fieldname": "mapping_type",
"fieldtype": "Select",
"label": "Mapping Type",
"options": "\nChild Table\nDocument"
},
{
"depends_on": "eval:doc.mapping_type;",
"fieldname": "mapping",
"fieldtype": "Link",
"label": "Mapping",
"options": "Document Type Mapping"
},
{
"depends_on": "eval:doc.mapping_type==\"Document\";",
"fieldname": "remote_value_filters",
"fieldtype": "Code",
"label": "Remote Value Filters",
"mandatory_depends_on": "eval:doc.mapping_type===\"Document\";",
"options": "JSON"
}
],
"istable": 1,
"links": [],
"modified": "2020-03-19 13:56:36.223799",
"modified_by": "Administrator",
"module": "Event Streaming",
"name": "Document Type Field Mapping",
"owner": "Administrator",
"permissions": [],
"quick_entry": 1,
"sort_field": "modified",
"sort_order": "DESC",
"track_changes": 1
}

View file

@ -1,9 +0,0 @@
# Copyright (c) 2019, Frappe Technologies and contributors
# License: MIT. See LICENSE
# import frappe
from frappe.model.document import Document
class DocumentTypeFieldMapping(Document):
pass

View file

@ -1,37 +0,0 @@
// Copyright (c) 2019, Frappe Technologies and contributors
// For license information, please see license.txt
frappe.ui.form.on("Document Type Mapping", {
local_doctype: function (frm) {
if (frm.doc.local_doctype) {
frappe.model.clear_table(frm.doc, "field_mapping");
let fields = frm.events.get_fields(frm);
$.each(fields, function (i, data) {
let row = frappe.model.add_child(
frm.doc,
"Document Type Field Mapping",
"field_mapping"
);
row.local_fieldname = data;
});
refresh_field("field_mapping");
}
},
get_fields: function (frm) {
let filtered_fields = [];
frappe.model.with_doctype(frm.doc.local_doctype, () => {
frappe.get_meta(frm.doc.local_doctype).fields.map((field) => {
if (
field.fieldname !== "remote_docname" &&
field.fieldname !== "remote_site_name" &&
frappe.model.is_value_type(field) &&
!field.hidden
) {
filtered_fields.push(field.fieldname);
}
});
});
return filtered_fields;
},
});

View file

@ -1,71 +0,0 @@
{
"autoname": "field:mapping_name",
"creation": "2019-09-27 12:45:56.529124",
"doctype": "DocType",
"editable_grid": 1,
"engine": "InnoDB",
"field_order": [
"mapping_name",
"local_doctype",
"remote_doctype",
"section_break_3",
"field_mapping"
],
"fields": [
{
"fieldname": "local_doctype",
"fieldtype": "Link",
"in_list_view": 1,
"label": "Local Document Type",
"options": "DocType",
"reqd": 1
},
{
"fieldname": "remote_doctype",
"fieldtype": "Data",
"in_list_view": 1,
"label": "Remote Document Type",
"reqd": 1
},
{
"fieldname": "section_break_3",
"fieldtype": "Section Break"
},
{
"fieldname": "field_mapping",
"fieldtype": "Table",
"label": "Field Mapping",
"options": "Document Type Field Mapping"
},
{
"fieldname": "mapping_name",
"fieldtype": "Data",
"label": "Mapping Name",
"reqd": 1,
"unique": 1
}
],
"modified": "2019-10-09 08:36:04.621397",
"modified_by": "Administrator",
"module": "Event Streaming",
"name": "Document Type Mapping",
"owner": "Administrator",
"permissions": [
{
"create": 1,
"delete": 1,
"email": 1,
"export": 1,
"print": 1,
"read": 1,
"report": 1,
"role": "System Manager",
"share": 1,
"write": 1
}
],
"quick_entry": 1,
"sort_field": "modified",
"sort_order": "DESC",
"track_changes": 1
}

View file

@ -1,181 +0,0 @@
# Copyright (c) 2019, Frappe Technologies and contributors
# License: MIT. See LICENSE
import json
import frappe
from frappe import _
from frappe.model import child_table_fields, default_fields
from frappe.model.document import Document
class DocumentTypeMapping(Document):
def validate(self):
self.validate_inner_mapping()
def validate_inner_mapping(self):
meta = frappe.get_meta(self.local_doctype)
for field_map in self.field_mapping:
if field_map.local_fieldname not in (default_fields + child_table_fields):
field = meta.get_field(field_map.local_fieldname)
if not field:
frappe.throw(_("Row #{0}: Invalid Local Fieldname").format(field_map.idx))
fieldtype = field.get("fieldtype")
if fieldtype in ["Link", "Dynamic Link", "Table"]:
if not field_map.mapping and not field_map.default_value:
msg = _(
"Row #{0}: Please set Mapping or Default Value for the field {1} since its a dependency field"
).format(field_map.idx, frappe.bold(field_map.local_fieldname))
frappe.throw(msg, title="Inner Mapping Missing")
if field_map.mapping_type == "Document" and not field_map.remote_value_filters:
msg = _(
"Row #{0}: Please set remote value filters for the field {1} to fetch the unique remote dependency document"
).format(field_map.idx, frappe.bold(field_map.remote_fieldname))
frappe.throw(msg, title="Remote Value Filters Missing")
def get_mapping(self, doc, producer_site, update_type):
remote_fields = []
# list of tuples (local_fieldname, dependent_doc)
dependencies = []
for mapping in self.field_mapping:
if doc.get(mapping.remote_fieldname):
if mapping.mapping_type == "Document":
if not mapping.default_value:
dependency = self.get_mapped_dependency(mapping, producer_site, doc)
if dependency:
dependencies.append((mapping.local_fieldname, dependency))
else:
doc[mapping.local_fieldname] = mapping.default_value
if mapping.mapping_type == "Child Table" and update_type != "Update":
doc[mapping.local_fieldname] = get_mapped_child_table_docs(
mapping.mapping, doc[mapping.remote_fieldname], producer_site
)
else:
# copy value into local fieldname key and remove remote fieldname key
doc[mapping.local_fieldname] = doc[mapping.remote_fieldname]
if mapping.local_fieldname != mapping.remote_fieldname:
remote_fields.append(mapping.remote_fieldname)
if not doc.get(mapping.remote_fieldname) and mapping.default_value and update_type != "Update":
doc[mapping.local_fieldname] = mapping.default_value
# remove the remote fieldnames
for field in remote_fields:
doc.pop(field, None)
if update_type != "Update":
doc["doctype"] = self.local_doctype
mapping = {"doc": frappe.as_json(doc)}
if len(dependencies):
mapping["dependencies"] = dependencies
return mapping
def get_mapped_update(self, update, producer_site):
update_diff = frappe._dict(json.loads(update.data))
mapping = update_diff
dependencies = []
if update_diff.changed:
doc_map = self.get_mapping(update_diff.changed, producer_site, "Update")
mapped_doc = doc_map.get("doc")
mapping.changed = json.loads(mapped_doc)
if doc_map.get("dependencies"):
dependencies += doc_map.get("dependencies")
if update_diff.removed:
mapping = self.map_rows_removed(update_diff, mapping)
if update_diff.added:
mapping = self.map_rows(update_diff, mapping, producer_site, operation="added")
if update_diff.row_changed:
mapping = self.map_rows(update_diff, mapping, producer_site, operation="row_changed")
update = {"doc": frappe.as_json(mapping)}
if len(dependencies):
update["dependencies"] = dependencies
return update
def get_mapped_dependency(self, mapping, producer_site, doc):
inner_mapping = frappe.get_doc("Document Type Mapping", mapping.mapping)
filters = json.loads(mapping.remote_value_filters)
for key, value in filters.items():
if value.startswith("eval:"):
val = frappe.safe_eval(value[5:], None, dict(doc=doc))
filters[key] = val
if doc.get(value):
filters[key] = doc.get(value)
matching_docs = producer_site.get_doc(inner_mapping.remote_doctype, filters=filters)
if len(matching_docs):
remote_docname = matching_docs[0].get("name")
remote_doc = producer_site.get_doc(inner_mapping.remote_doctype, remote_docname)
doc = inner_mapping.get_mapping(remote_doc, producer_site, "Insert").get("doc")
return doc
return
def map_rows_removed(self, update_diff, mapping):
removed = []
mapping["removed"] = update_diff.removed
for key, value in update_diff.removed.copy().items():
local_table_name = frappe.db.get_value(
"Document Type Field Mapping",
{"remote_fieldname": key, "parent": self.name},
"local_fieldname",
)
mapping.removed[local_table_name] = value
if local_table_name != key:
removed.append(key)
# remove the remote fieldnames
for field in removed:
mapping.removed.pop(field, None)
return mapping
def map_rows(self, update_diff, mapping, producer_site, operation):
remote_fields = []
for tablename, entries in update_diff.get(operation).copy().items():
local_table_name = frappe.db.get_value(
"Document Type Field Mapping", {"remote_fieldname": tablename}, "local_fieldname"
)
table_map = frappe.db.get_value(
"Document Type Field Mapping",
{"local_fieldname": local_table_name, "parent": self.name},
"mapping",
)
table_map = frappe.get_doc("Document Type Mapping", table_map)
docs = []
for entry in entries:
mapped_doc = table_map.get_mapping(entry, producer_site, "Update").get("doc")
docs.append(json.loads(mapped_doc))
mapping.get(operation)[local_table_name] = docs
if local_table_name != tablename:
remote_fields.append(tablename)
# remove the remote fieldnames
for field in remote_fields:
mapping.get(operation).pop(field, None)
return mapping
def get_mapped_child_table_docs(child_map, table_entries, producer_site):
"""Get mapping for child doctypes"""
child_map = frappe.get_doc("Document Type Mapping", child_map)
mapped_entries = []
remote_fields = []
for child_doc in table_entries:
for mapping in child_map.field_mapping:
if child_doc.get(mapping.remote_fieldname):
child_doc[mapping.local_fieldname] = child_doc[mapping.remote_fieldname]
if mapping.local_fieldname != mapping.remote_fieldname:
child_doc.pop(mapping.remote_fieldname, None)
mapped_entries.append(child_doc)
# remove the remote fieldnames
for field in remote_fields:
child_doc.pop(field, None)
child_doc["doctype"] = child_map.local_doctype
return mapped_entries

View file

@ -1,8 +0,0 @@
# Copyright (c) 2019, Frappe Technologies and Contributors
# License: MIT. See LICENSE
# import frappe
from frappe.tests.utils import FrappeTestCase
class TestDocumentTypeMapping(FrappeTestCase):
pass

View file

@ -1,17 +0,0 @@
// Copyright (c) 2019, Frappe Technologies and contributors
// For license information, please see license.txt
frappe.ui.form.on("Event Consumer", {
refresh: function (frm) {
// formatter for subscribed doctype approval status
frm.set_indicator_formatter("status", function (doc) {
let indicator = "orange";
if (doc.status == "Approved") {
indicator = "green";
} else if (doc.status == "Rejected") {
indicator = "red";
}
return indicator;
});
},
});

View file

@ -1,97 +0,0 @@
{
"actions": [],
"autoname": "field:callback_url",
"creation": "2019-08-26 17:45:15.479530",
"doctype": "DocType",
"editable_grid": 1,
"engine": "InnoDB",
"field_order": [
"consumer_doctypes",
"callback_url",
"section_break_3",
"api_key",
"api_secret",
"column_break_6",
"user",
"incoming_change"
],
"fields": [
{
"fieldname": "callback_url",
"fieldtype": "Data",
"in_list_view": 1,
"label": "Callback URL",
"read_only": 1,
"reqd": 1,
"unique": 1
},
{
"fieldname": "api_key",
"fieldtype": "Data",
"label": "API Key",
"reqd": 1
},
{
"fieldname": "api_secret",
"fieldtype": "Password",
"label": "API Secret",
"reqd": 1
},
{
"fieldname": "user",
"fieldtype": "Link",
"label": "Event Subscriber",
"options": "User",
"read_only": 1,
"reqd": 1
},
{
"fieldname": "section_break_3",
"fieldtype": "Section Break"
},
{
"fieldname": "column_break_6",
"fieldtype": "Column Break"
},
{
"default": "0",
"fieldname": "incoming_change",
"fieldtype": "Check",
"hidden": 1,
"label": "Incoming Change",
"read_only": 1
},
{
"fieldname": "consumer_doctypes",
"fieldtype": "Table",
"label": "Event Consumer Document Types",
"options": "Event Consumer Document Type",
"reqd": 1
}
],
"in_create": 1,
"links": [],
"modified": "2020-09-08 16:42:39.828085",
"modified_by": "Administrator",
"module": "Event Streaming",
"name": "Event Consumer",
"owner": "Administrator",
"permissions": [
{
"create": 1,
"delete": 1,
"email": 1,
"export": 1,
"print": 1,
"read": 1,
"report": 1,
"role": "System Manager",
"share": 1,
"write": 1
}
],
"quick_entry": 1,
"sort_field": "modified",
"sort_order": "DESC",
"track_changes": 1
}

View file

@ -1,216 +0,0 @@
# Copyright (c) 2019, Frappe Technologies and contributors
# License: MIT. See LICENSE
import json
import os
import requests
import frappe
from frappe import _
from frappe.frappeclient import FrappeClient
from frappe.model.document import Document
from frappe.utils.background_jobs import get_jobs
from frappe.utils.data import get_url
class EventConsumer(Document):
def validate(self):
# approve subscribed doctypes for tests
# frappe.flags.in_test won't work here as tests are running on the consumer site
if os.environ.get("CI"):
for entry in self.consumer_doctypes:
entry.status = "Approved"
def on_update(self):
if not self.incoming_change:
doc_before_save = self.get_doc_before_save()
if doc_before_save.api_key != self.api_key or doc_before_save.api_secret != self.api_secret:
return
self.update_consumer_status()
else:
frappe.db.set_value(self.doctype, self.name, "incoming_change", 0)
frappe.cache().delete_value("event_consumer_document_type_map")
def on_trash(self):
for i in frappe.get_all("Event Update Log Consumer", {"consumer": self.name}):
frappe.delete_doc("Event Update Log Consumer", i.name)
frappe.cache().delete_value("event_consumer_document_type_map")
def update_consumer_status(self):
consumer_site = get_consumer_site(self.callback_url)
event_producer = consumer_site.get_doc("Event Producer", get_url())
event_producer = frappe._dict(event_producer)
config = event_producer.producer_doctypes
event_producer.producer_doctypes = []
for entry in config:
if entry.get("has_mapping"):
ref_doctype = consumer_site.get_value(
"Document Type Mapping", "remote_doctype", entry.get("mapping")
).get("remote_doctype")
else:
ref_doctype = entry.get("ref_doctype")
entry["status"] = frappe.db.get_value(
"Event Consumer Document Type", {"parent": self.name, "ref_doctype": ref_doctype}, "status"
)
event_producer.producer_doctypes = config
# when producer doc is updated it updates the consumer doc
# set flag to avoid deadlock
event_producer.incoming_change = True
consumer_site.update(event_producer)
def get_consumer_status(self):
response = requests.get(self.callback_url)
if response.status_code != 200:
return "offline"
return "online"
@frappe.whitelist()
def register_consumer(data):
"""create an event consumer document for registering a consumer"""
data = json.loads(data)
# to ensure that consumer is created only once
if frappe.db.exists("Event Consumer", data["event_consumer"]):
return None
user = data["user"]
if not frappe.db.exists("User", user):
frappe.throw(_("User {0} not found on the producer site").format(user))
if "System Manager" not in frappe.get_roles(user):
frappe.throw(_("Event Subscriber has to be a System Manager."))
consumer = frappe.new_doc("Event Consumer")
consumer.callback_url = data["event_consumer"]
consumer.user = data["user"]
consumer.api_key = data["api_key"]
consumer.api_secret = data["api_secret"]
consumer.incoming_change = True
consumer_doctypes = json.loads(data["consumer_doctypes"])
for entry in consumer_doctypes:
consumer.append(
"consumer_doctypes",
{"ref_doctype": entry.get("doctype"), "status": "Pending", "condition": entry.get("condition")},
)
consumer.insert()
# consumer's 'last_update' field should point to the latest update
# in producer's update log when subscribing
# so that, updates after subscribing are consumed and not the old ones.
last_update = str(get_last_update())
return json.dumps({"last_update": last_update})
def get_consumer_site(consumer_url):
"""create a FrappeClient object for event consumer site"""
consumer_doc = frappe.get_doc("Event Consumer", consumer_url)
consumer_site = FrappeClient(
url=consumer_url,
api_key=consumer_doc.api_key,
api_secret=consumer_doc.get_password("api_secret"),
)
return consumer_site
def get_last_update():
"""get the creation timestamp of last update consumed"""
updates = frappe.get_list(
"Event Update Log", "creation", ignore_permissions=True, limit=1, order_by="creation desc"
)
if updates:
return updates[0].creation
return frappe.utils.now_datetime()
@frappe.whitelist()
def notify_event_consumers(doctype):
"""get all event consumers and set flag for notification status"""
event_consumers = frappe.get_all(
"Event Consumer Document Type", ["parent"], {"ref_doctype": doctype, "status": "Approved"}
)
for entry in event_consumers:
consumer = frappe.get_doc("Event Consumer", entry.parent)
consumer.flags.notified = False
notify(consumer)
@frappe.whitelist()
def notify(consumer):
"""notify individual event consumers about a new update"""
consumer_status = consumer.get_consumer_status()
if consumer_status == "online":
try:
client = get_consumer_site(consumer.callback_url)
client.post_request(
{
"cmd": "frappe.event_streaming.doctype.event_producer.event_producer.new_event_notification",
"producer_url": get_url(),
}
)
consumer.flags.notified = True
except Exception:
consumer.flags.notified = False
else:
consumer.flags.notified = False
# enqueue another job if the site was not notified
if not consumer.flags.notified:
enqueued_method = "frappe.event_streaming.doctype.event_consumer.event_consumer.notify"
jobs = get_jobs()
if not jobs or enqueued_method not in jobs[frappe.local.site] and not consumer.flags.notifed:
frappe.enqueue(
enqueued_method, queue="long", enqueue_after_commit=True, **{"consumer": consumer}
)
def has_consumer_access(consumer, update_log):
"""Checks if consumer has completely satisfied all the conditions on the doc"""
if isinstance(consumer, str):
consumer = frappe.get_doc("Event Consumer", consumer)
if not frappe.db.exists(update_log.ref_doctype, update_log.docname):
# Delete Log
# Check if the last Update Log of this document was read by this consumer
last_update_log = frappe.get_all(
"Event Update Log",
filters={
"ref_doctype": update_log.ref_doctype,
"docname": update_log.docname,
"creation": ["<", update_log.creation],
},
order_by="creation desc",
limit_page_length=1,
)
if not len(last_update_log):
return False
last_update_log = frappe.get_doc("Event Update Log", last_update_log[0].name)
return len([x for x in last_update_log.consumers if x.consumer == consumer.name])
doc = frappe.get_doc(update_log.ref_doctype, update_log.docname)
try:
for dt_entry in consumer.consumer_doctypes:
if dt_entry.ref_doctype != update_log.ref_doctype:
continue
if not dt_entry.condition:
return True
condition: str = dt_entry.condition
if condition.startswith("cmd:"):
cmd = condition.split("cmd:")[1].strip()
args = {"consumer": consumer, "doc": doc, "update_log": update_log}
return frappe.call(cmd, **args)
else:
return frappe.safe_eval(condition, frappe._dict(doc=doc))
except Exception as e:
consumer.log_error("has_consumer_access error")
return False

View file

@ -1,8 +0,0 @@
# Copyright (c) 2019, Frappe Technologies and Contributors
# License: MIT. See LICENSE
# import frappe
from frappe.tests.utils import FrappeTestCase
class TestEventConsumer(FrappeTestCase):
pass

View file

@ -1,61 +0,0 @@
{
"actions": [],
"creation": "2019-10-03 21:10:54.754651",
"doctype": "DocType",
"editable_grid": 1,
"engine": "InnoDB",
"field_order": [
"ref_doctype",
"status",
"unsubscribed",
"condition"
],
"fields": [
{
"columns": 4,
"fieldname": "ref_doctype",
"fieldtype": "Link",
"in_list_view": 1,
"label": "Document Type",
"options": "DocType",
"read_only": 1,
"reqd": 1
},
{
"columns": 4,
"default": "Pending",
"fieldname": "status",
"fieldtype": "Select",
"in_list_view": 1,
"label": "Approval Status",
"options": "Pending\nApproved\nRejected"
},
{
"columns": 2,
"default": "0",
"fieldname": "unsubscribed",
"fieldtype": "Check",
"in_list_view": 1,
"label": "Unsubscribed",
"read_only": 1
},
{
"fieldname": "condition",
"fieldtype": "Code",
"label": "Condition",
"read_only": 1
}
],
"istable": 1,
"links": [],
"modified": "2020-11-07 09:26:49.894294",
"modified_by": "Administrator",
"module": "Event Streaming",
"name": "Event Consumer Document Type",
"owner": "Administrator",
"permissions": [],
"quick_entry": 1,
"sort_field": "modified",
"sort_order": "DESC",
"track_changes": 1
}

View file

@ -1,9 +0,0 @@
# Copyright (c) 2019, Frappe Technologies and contributors
# License: MIT. See LICENSE
# import frappe
from frappe.model.document import Document
class EventConsumerDocumentType(Document):
pass

View file

@ -1,25 +0,0 @@
// Copyright (c) 2019, Frappe Technologies and contributors
// For license information, please see license.txt
frappe.ui.form.on("Event Producer", {
refresh: function (frm) {
frm.set_query("ref_doctype", "producer_doctypes", function () {
return {
filters: {
issingle: 0,
istable: 0,
},
};
});
frm.set_indicator_formatter("status", function (doc) {
let indicator = "orange";
if (doc.status == "Approved") {
indicator = "green";
} else if (doc.status == "Rejected") {
indicator = "red";
}
return indicator;
});
},
});

View file

@ -1,96 +0,0 @@
{
"actions": [],
"autoname": "field:producer_url",
"creation": "2019-08-26 19:17:24.919196",
"doctype": "DocType",
"editable_grid": 1,
"engine": "InnoDB",
"field_order": [
"producer_url",
"producer_doctypes",
"section_break_3",
"api_key",
"api_secret",
"column_break_6",
"user",
"incoming_change"
],
"fields": [
{
"fieldname": "producer_url",
"fieldtype": "Data",
"in_list_view": 1,
"label": "Producer URL",
"reqd": 1,
"unique": 1
},
{
"description": "API Key of the user(Event Subscriber) on the producer site",
"fieldname": "api_key",
"fieldtype": "Data",
"label": "API Key",
"reqd": 1
},
{
"description": "API Secret of the user(Event Subscriber) on the producer site",
"fieldname": "api_secret",
"fieldtype": "Password",
"label": "API Secret",
"reqd": 1
},
{
"fieldname": "user",
"fieldtype": "Link",
"label": "Event Subscriber",
"options": "User",
"reqd": 1,
"set_only_once": 1
},
{
"fieldname": "column_break_6",
"fieldtype": "Column Break"
},
{
"fieldname": "section_break_3",
"fieldtype": "Section Break"
},
{
"default": "0",
"fieldname": "incoming_change",
"fieldtype": "Check",
"hidden": 1,
"label": "Incoming Change"
},
{
"fieldname": "producer_doctypes",
"fieldtype": "Table",
"label": "Event Producer Document Types",
"options": "Event Producer Document Type",
"reqd": 1
}
],
"links": [],
"modified": "2020-10-26 13:00:15.361316",
"modified_by": "Administrator",
"module": "Event Streaming",
"name": "Event Producer",
"owner": "Administrator",
"permissions": [
{
"create": 1,
"delete": 1,
"email": 1,
"export": 1,
"print": 1,
"read": 1,
"report": 1,
"role": "System Manager",
"share": 1,
"write": 1
}
],
"quick_entry": 1,
"sort_field": "modified",
"sort_order": "DESC",
"track_changes": 1
}

View file

@ -1,569 +0,0 @@
# Copyright (c) 2019, Frappe Technologies and contributors
# License: MIT. See LICENSE
import json
import time
import requests
import frappe
from frappe import _
from frappe.custom.doctype.custom_field.custom_field import create_custom_field
from frappe.frappeclient import FrappeClient
from frappe.model.document import Document
from frappe.utils.background_jobs import get_jobs
from frappe.utils.data import get_link_to_form, get_url
from frappe.utils.password import get_decrypted_password
class EventProducer(Document):
def before_insert(self):
self.check_url()
self.validate_event_subscriber()
self.incoming_change = True
self.create_event_consumer()
self.create_custom_fields()
def validate(self):
self.validate_event_subscriber()
if frappe.flags.in_test:
for entry in self.producer_doctypes:
entry.status = "Approved"
def validate_event_subscriber(self):
if not frappe.db.get_value("User", self.user, "api_key"):
frappe.throw(
_("Please generate keys for the Event Subscriber User {0} first.").format(
frappe.bold(get_link_to_form("User", self.user))
)
)
def on_update(self):
if not self.incoming_change:
if frappe.db.exists("Event Producer", self.name):
if not self.api_key or not self.api_secret:
frappe.throw(_("Please set API Key and Secret on the producer and consumer sites first."))
else:
doc_before_save = self.get_doc_before_save()
if doc_before_save.api_key != self.api_key or doc_before_save.api_secret != self.api_secret:
return
self.update_event_consumer()
self.create_custom_fields()
else:
# when producer doc is updated it updates the consumer doc, set flag to avoid deadlock
self.db_set("incoming_change", 0)
self.reload()
def on_trash(self):
last_update = frappe.db.get_value("Event Producer Last Update", dict(event_producer=self.name))
if last_update:
frappe.delete_doc("Event Producer Last Update", last_update)
def check_url(self):
valid_url_schemes = ("http", "https")
frappe.utils.validate_url(self.producer_url, throw=True, valid_schemes=valid_url_schemes)
# remove '/' from the end of the url like http://test_site.com/
# to prevent mismatch in get_url() results
if self.producer_url.endswith("/"):
self.producer_url = self.producer_url[:-1]
def create_event_consumer(self):
"""register event consumer on the producer site"""
if self.is_producer_online():
producer_site = FrappeClient(
url=self.producer_url, api_key=self.api_key, api_secret=self.get_password("api_secret")
)
response = producer_site.post_api(
"frappe.event_streaming.doctype.event_consumer.event_consumer.register_consumer",
params={"data": json.dumps(self.get_request_data())},
)
if response:
response = json.loads(response)
self.set_last_update(response["last_update"])
else:
frappe.throw(
_(
"Failed to create an Event Consumer or an Event Consumer for the current site is already registered."
)
)
def set_last_update(self, last_update):
last_update_doc_name = frappe.db.get_value(
"Event Producer Last Update", dict(event_producer=self.name)
)
if not last_update_doc_name:
frappe.get_doc(
dict(
doctype="Event Producer Last Update",
event_producer=self.producer_url,
last_update=last_update,
)
).insert(ignore_permissions=True)
else:
frappe.db.set_value(
"Event Producer Last Update", last_update_doc_name, "last_update", last_update
)
def get_last_update(self):
return frappe.db.get_value(
"Event Producer Last Update", dict(event_producer=self.name), "last_update"
)
def get_request_data(self):
consumer_doctypes = []
for entry in self.producer_doctypes:
if entry.has_mapping:
# if mapping, subscribe to remote doctype on consumer's site
dt = frappe.db.get_value("Document Type Mapping", entry.mapping, "remote_doctype")
else:
dt = entry.ref_doctype
consumer_doctypes.append({"doctype": dt, "condition": entry.condition})
user_key = frappe.db.get_value("User", self.user, "api_key")
user_secret = get_decrypted_password("User", self.user, "api_secret")
return {
"event_consumer": get_url(),
"consumer_doctypes": json.dumps(consumer_doctypes),
"user": self.user,
"api_key": user_key,
"api_secret": user_secret,
}
def create_custom_fields(self):
"""create custom field to store remote docname and remote site url"""
for entry in self.producer_doctypes:
if not entry.use_same_name:
if not frappe.db.exists(
"Custom Field", {"fieldname": "remote_docname", "dt": entry.ref_doctype}
):
df = dict(
fieldname="remote_docname",
label="Remote Document Name",
fieldtype="Data",
read_only=1,
print_hide=1,
)
create_custom_field(entry.ref_doctype, df)
if not frappe.db.exists(
"Custom Field", {"fieldname": "remote_site_name", "dt": entry.ref_doctype}
):
df = dict(
fieldname="remote_site_name",
label="Remote Site",
fieldtype="Data",
read_only=1,
print_hide=1,
)
create_custom_field(entry.ref_doctype, df)
def update_event_consumer(self):
if self.is_producer_online():
producer_site = get_producer_site(self.producer_url)
event_consumer = producer_site.get_doc("Event Consumer", get_url())
event_consumer = frappe._dict(event_consumer)
if event_consumer:
config = event_consumer.consumer_doctypes
event_consumer.consumer_doctypes = []
for entry in self.producer_doctypes:
if entry.has_mapping:
# if mapping, subscribe to remote doctype on consumer's site
ref_doctype = frappe.db.get_value("Document Type Mapping", entry.mapping, "remote_doctype")
else:
ref_doctype = entry.ref_doctype
event_consumer.consumer_doctypes.append(
{
"ref_doctype": ref_doctype,
"status": get_approval_status(config, ref_doctype),
"unsubscribed": entry.unsubscribe,
"condition": entry.condition,
}
)
event_consumer.user = self.user
event_consumer.incoming_change = True
producer_site.update(event_consumer)
def is_producer_online(self):
"""check connection status for the Event Producer site"""
retry = 3
while retry > 0:
res = requests.get(self.producer_url)
if res.status_code == 200:
return True
retry -= 1
time.sleep(5)
frappe.throw(_("Failed to connect to the Event Producer site. Retry after some time."))
def get_producer_site(producer_url):
"""create a FrappeClient object for event producer site"""
producer_doc = frappe.get_doc("Event Producer", producer_url)
producer_site = FrappeClient(
url=producer_url,
api_key=producer_doc.api_key,
api_secret=producer_doc.get_password("api_secret"),
)
return producer_site
def get_approval_status(config, ref_doctype):
"""check the approval status for consumption"""
for entry in config:
if entry.get("ref_doctype") == ref_doctype:
return entry.get("status")
return "Pending"
@frappe.whitelist()
def pull_producer_data():
"""Fetch data from producer node."""
response = requests.get(get_url())
if response.status_code == 200:
for event_producer in frappe.get_all("Event Producer"):
pull_from_node(event_producer.name)
return "success"
return None
@frappe.whitelist()
def pull_from_node(event_producer):
"""pull all updates after the last update timestamp from event producer site"""
event_producer = frappe.get_doc("Event Producer", event_producer)
producer_site = get_producer_site(event_producer.producer_url)
last_update = event_producer.get_last_update()
(doctypes, mapping_config, naming_config) = get_config(event_producer.producer_doctypes)
updates = get_updates(producer_site, last_update, doctypes)
for update in updates:
update.use_same_name = naming_config.get(update.ref_doctype)
mapping = mapping_config.get(update.ref_doctype)
if mapping:
update.mapping = mapping
update = get_mapped_update(update, producer_site)
if not update.update_type == "Delete":
update.data = json.loads(update.data)
sync(update, producer_site, event_producer)
def get_config(event_config):
"""get the doctype mapping and naming configurations for consumption"""
doctypes, mapping_config, naming_config = [], {}, {}
for entry in event_config:
if entry.status == "Approved":
if entry.has_mapping:
(mapped_doctype, mapping) = frappe.db.get_value(
"Document Type Mapping", entry.mapping, ["remote_doctype", "name"]
)
mapping_config[mapped_doctype] = mapping
naming_config[mapped_doctype] = entry.use_same_name
doctypes.append(mapped_doctype)
else:
naming_config[entry.ref_doctype] = entry.use_same_name
doctypes.append(entry.ref_doctype)
return (doctypes, mapping_config, naming_config)
def sync(update, producer_site, event_producer, in_retry=False):
"""Sync the individual update"""
try:
if update.update_type == "Create":
set_insert(update, producer_site, event_producer.name)
if update.update_type == "Update":
set_update(update, producer_site)
if update.update_type == "Delete":
set_delete(update)
if in_retry:
return "Synced"
log_event_sync(update, event_producer.name, "Synced")
except Exception:
if in_retry:
if frappe.flags.in_test:
print(frappe.get_traceback())
return "Failed"
log_event_sync(update, event_producer.name, "Failed", frappe.get_traceback())
event_producer.set_last_update(update.creation)
frappe.db.commit()
def set_insert(update, producer_site, event_producer):
"""Sync insert type update"""
if frappe.db.get_value(update.ref_doctype, update.docname):
# doc already created
return
doc = frappe.get_doc(update.data)
if update.mapping:
if update.get("dependencies"):
dependencies_created = sync_mapped_dependencies(update.dependencies, producer_site)
for fieldname, value in dependencies_created.items():
doc.update({fieldname: value})
else:
sync_dependencies(doc, producer_site)
if update.use_same_name:
doc.insert(set_name=update.docname, set_child_names=False)
else:
# if event consumer is not saving documents with the same name as the producer
# store the remote docname in a custom field for future updates
doc.remote_docname = update.docname
doc.remote_site_name = event_producer
doc.insert(set_child_names=False)
def set_update(update, producer_site):
"""Sync update type update"""
local_doc = get_local_doc(update)
if local_doc:
data = frappe._dict(update.data)
if data.changed:
local_doc.update(data.changed)
if data.removed:
local_doc = update_row_removed(local_doc, data.removed)
if data.row_changed:
update_row_changed(local_doc, data.row_changed)
if data.added:
local_doc = update_row_added(local_doc, data.added)
if update.mapping:
if update.get("dependencies"):
dependencies_created = sync_mapped_dependencies(update.dependencies, producer_site)
for fieldname, value in dependencies_created.items():
local_doc.update({fieldname: value})
else:
sync_dependencies(local_doc, producer_site)
local_doc.save()
local_doc.db_update_all()
def update_row_removed(local_doc, removed):
"""Sync child table row deletion type update"""
for tablename, rownames in removed.items():
table = local_doc.get_table_field_doctype(tablename)
for row in rownames:
table_rows = local_doc.get(tablename)
child_table_row = get_child_table_row(table_rows, row)
table_rows.remove(child_table_row)
local_doc.set(tablename, table_rows)
return local_doc
def get_child_table_row(table_rows, row):
for entry in table_rows:
if entry.get("name") == row:
return entry
def update_row_changed(local_doc, changed):
"""Sync child table row updation type update"""
for tablename, rows in changed.items():
old = local_doc.get(tablename)
for doc in old:
for row in rows:
if row["name"] == doc.get("name"):
doc.update(row)
def update_row_added(local_doc, added):
"""Sync child table row addition type update"""
for tablename, rows in added.items():
local_doc.extend(tablename, rows)
for child in rows:
child_doc = frappe.get_doc(child)
child_doc.parent = local_doc.name
child_doc.parenttype = local_doc.doctype
child_doc.insert(set_name=child_doc.name)
return local_doc
def set_delete(update):
"""Sync delete type update"""
local_doc = get_local_doc(update)
if local_doc:
local_doc.delete()
def get_updates(producer_site, last_update, doctypes):
"""Get all updates generated after the last update timestamp"""
docs = producer_site.post_request(
{
"cmd": "frappe.event_streaming.doctype.event_update_log.event_update_log.get_update_logs_for_consumer",
"event_consumer": get_url(),
"doctypes": frappe.as_json(doctypes),
"last_update": last_update,
}
)
return [frappe._dict(d) for d in (docs or [])]
def get_local_doc(update):
"""Get the local document if created with a different name"""
try:
if not update.use_same_name:
return frappe.get_doc(update.ref_doctype, {"remote_docname": update.docname})
return frappe.get_doc(update.ref_doctype, update.docname)
except frappe.DoesNotExistError:
return None
def sync_dependencies(document, producer_site):
"""
dependencies is a dictionary to store all the docs
having dependencies and their sync status,
which is shared among all nested functions.
"""
dependencies = {document: True}
def check_doc_has_dependencies(doc, producer_site):
"""Sync child table link fields first,
then sync link fields,
then dynamic links"""
meta = frappe.get_meta(doc.doctype)
table_fields = meta.get_table_fields()
link_fields = meta.get_link_fields()
dl_fields = meta.get_dynamic_link_fields()
if table_fields:
sync_child_table_dependencies(doc, table_fields, producer_site)
if link_fields:
sync_link_dependencies(doc, link_fields, producer_site)
if dl_fields:
sync_dynamic_link_dependencies(doc, dl_fields, producer_site)
def sync_child_table_dependencies(doc, table_fields, producer_site):
for df in table_fields:
child_table = doc.get(df.fieldname)
for entry in child_table:
child_doc = producer_site.get_doc(entry.doctype, entry.name)
if child_doc:
child_doc = frappe._dict(child_doc)
set_dependencies(child_doc, frappe.get_meta(entry.doctype).get_link_fields(), producer_site)
def sync_link_dependencies(doc, link_fields, producer_site):
set_dependencies(doc, link_fields, producer_site)
def sync_dynamic_link_dependencies(doc, dl_fields, producer_site):
for df in dl_fields:
docname = doc.get(df.fieldname)
linked_doctype = doc.get(df.options)
if docname and not check_dependency_fulfilled(linked_doctype, docname):
master_doc = producer_site.get_doc(linked_doctype, docname)
frappe.get_doc(master_doc).insert(set_name=docname)
def set_dependencies(doc, link_fields, producer_site):
for df in link_fields:
docname = doc.get(df.fieldname)
linked_doctype = df.get_link_doctype()
if docname and not check_dependency_fulfilled(linked_doctype, docname):
master_doc = producer_site.get_doc(linked_doctype, docname)
try:
master_doc = frappe.get_doc(master_doc)
master_doc.insert(set_name=docname)
frappe.db.commit()
# for dependency inside a dependency
except Exception:
dependencies[master_doc] = True
def check_dependency_fulfilled(linked_doctype, docname):
return frappe.db.exists(linked_doctype, docname)
while dependencies[document]:
# find the first non synced dependency
for item in reversed(list(dependencies.keys())):
if dependencies[item]:
dependency = item
break
check_doc_has_dependencies(dependency, producer_site)
# mark synced for nested dependency
if dependency != document:
dependencies[dependency] = False
dependency.insert()
# no more dependencies left to be synced, the main doc is ready to be synced
# end the dependency loop
if not any(list(dependencies.values())[1:]):
dependencies[document] = False
def sync_mapped_dependencies(dependencies, producer_site):
dependencies_created = {}
for entry in dependencies:
doc = frappe._dict(json.loads(entry[1]))
docname = frappe.db.exists(doc.doctype, doc.name)
if not docname:
doc = frappe.get_doc(doc).insert(set_child_names=False)
dependencies_created[entry[0]] = doc.name
else:
dependencies_created[entry[0]] = docname
return dependencies_created
def log_event_sync(update, event_producer, sync_status, error=None):
"""Log event update received with the sync_status as Synced or Failed"""
doc = frappe.new_doc("Event Sync Log")
doc.update_type = update.update_type
doc.ref_doctype = update.ref_doctype
doc.status = sync_status
doc.event_producer = event_producer
doc.producer_doc = update.docname
doc.data = frappe.as_json(update.data)
doc.use_same_name = update.use_same_name
doc.mapping = update.mapping if update.mapping else None
if update.use_same_name:
doc.docname = update.docname
else:
doc.docname = frappe.db.get_value(update.ref_doctype, {"remote_docname": update.docname}, "name")
if error:
doc.error = error
doc.insert()
def get_mapped_update(update, producer_site):
"""get the new update document with mapped fields"""
mapping = frappe.get_doc("Document Type Mapping", update.mapping)
if update.update_type == "Create":
doc = frappe._dict(json.loads(update.data))
mapped_update = mapping.get_mapping(doc, producer_site, update.update_type)
update.data = mapped_update.get("doc")
update.dependencies = mapped_update.get("dependencies", None)
elif update.update_type == "Update":
mapped_update = mapping.get_mapped_update(update, producer_site)
update.data = mapped_update.get("doc")
update.dependencies = mapped_update.get("dependencies", None)
update["ref_doctype"] = mapping.local_doctype
return update
@frappe.whitelist()
def new_event_notification(producer_url):
"""Pull data from producer when notified"""
enqueued_method = "frappe.event_streaming.doctype.event_producer.event_producer.pull_from_node"
jobs = get_jobs()
if not jobs or enqueued_method not in jobs[frappe.local.site]:
frappe.enqueue(enqueued_method, queue="default", **{"event_producer": producer_url})
@frappe.whitelist()
def resync(update):
"""Retry syncing update if failed"""
update = frappe._dict(json.loads(update))
producer_site = get_producer_site(update.event_producer)
event_producer = frappe.get_doc("Event Producer", update.event_producer)
if update.mapping:
update = get_mapped_update(update, producer_site)
update.data = json.loads(update.data)
return sync(update, producer_site, event_producer, in_retry=True)

View file

@ -1,438 +0,0 @@
# Copyright (c) 2019, Frappe Technologies and Contributors
# License: MIT. See LICENSE
import json
import frappe
from frappe.core.doctype.user.user import generate_keys
from frappe.event_streaming.doctype.event_producer.event_producer import pull_from_node
from frappe.frappeclient import FrappeClient
from frappe.query_builder.utils import db_type_is
from frappe.tests.test_query_builder import run_only_if
from frappe.tests.utils import FrappeTestCase
producer_url = "http://test_site_producer:8000"
class TestEventProducer(FrappeTestCase):
def setUp(self):
create_event_producer(producer_url)
def tearDown(self):
unsubscribe_doctypes(producer_url)
def test_insert(self):
producer = get_remote_site()
producer_doc = insert_into_producer(producer, "test creation 1 sync")
self.pull_producer_data()
self.assertTrue(frappe.db.exists("ToDo", producer_doc.name))
def test_update(self):
producer = get_remote_site()
producer_doc = insert_into_producer(producer, "test update 1")
producer_doc["description"] = "test update 2"
producer_doc = producer.update(producer_doc)
self.pull_producer_data()
local_doc = frappe.get_doc(producer_doc.doctype, producer_doc.name)
self.assertEqual(local_doc.description, producer_doc.description)
def test_delete(self):
producer = get_remote_site()
producer_doc = insert_into_producer(producer, "test delete sync")
self.pull_producer_data()
self.assertTrue(frappe.db.exists("ToDo", producer_doc.name))
producer.delete("ToDo", producer_doc.name)
self.pull_producer_data()
self.assertFalse(frappe.db.exists("ToDo", producer_doc.name))
@run_only_if(db_type_is.MARIADB)
def test_multiple_doctypes_sync(self):
# TODO: This test is extremely flaky with Postgres. Rewrite this!
producer = get_remote_site()
# insert todo and note in producer
producer_todo = insert_into_producer(producer, "test multiple doc sync")
producer_note1 = frappe._dict(doctype="Note", title="test multiple doc sync 1")
delete_on_remote_if_exists(producer, "Note", {"title": producer_note1["title"]})
frappe.db.delete("Note", {"title": producer_note1["title"]})
producer_note1 = producer.insert(producer_note1)
producer_note2 = frappe._dict(doctype="Note", title="test multiple doc sync 2")
delete_on_remote_if_exists(producer, "Note", {"title": producer_note2["title"]})
frappe.db.delete("Note", {"title": producer_note2["title"]})
producer_note2 = producer.insert(producer_note2)
# update in producer
producer_todo["description"] = "test multiple doc update sync"
producer_todo = producer.update(producer_todo)
producer_note1["content"] = "testing update sync"
producer_note1 = producer.update(producer_note1)
producer.delete("Note", producer_note2.name)
self.pull_producer_data()
# check inserted
self.assertTrue(frappe.db.exists("ToDo", producer_todo.name))
# check update
local_todo = frappe.get_doc("ToDo", producer_todo.name)
self.assertEqual(local_todo.description, producer_todo.description)
local_note1 = frappe.get_doc("Note", producer_note1.name)
self.assertEqual(local_note1.content, producer_note1.content)
# check delete
self.assertFalse(frappe.db.exists("Note", producer_note2.name))
def test_child_table_sync_with_dependencies(self):
producer = get_remote_site()
producer_user = frappe._dict(
doctype="User",
email="test_user@sync.com",
send_welcome_email=0,
first_name="Test Sync User",
enabled=1,
roles=[{"role": "System Manager"}],
)
delete_on_remote_if_exists(producer, "User", {"email": producer_user.email})
frappe.db.delete("User", {"email": producer_user.email})
producer_user = producer.insert(producer_user)
producer_note = frappe._dict(
doctype="Note", title="test child table dependency sync", seen_by=[{"user": producer_user.name}]
)
delete_on_remote_if_exists(producer, "Note", {"title": producer_note.title})
frappe.db.delete("Note", {"title": producer_note.title})
producer_note = producer.insert(producer_note)
self.pull_producer_data()
self.assertTrue(frappe.db.exists("User", producer_user.name))
if self.assertTrue(frappe.db.exists("Note", producer_note.name)):
local_note = frappe.get_doc("Note", producer_note.name)
self.assertEqual(len(local_note.seen_by), 1)
def test_dynamic_link_dependencies_synced(self):
producer = get_remote_site()
# unsubscribe for Note to check whether dependency is fulfilled
event_producer = frappe.get_doc("Event Producer", producer_url, for_update=True)
event_producer.producer_doctypes = []
event_producer.append("producer_doctypes", {"ref_doctype": "ToDo", "use_same_name": 1})
event_producer.save()
producer_link_doc = frappe._dict(doctype="Note", title="Test Dynamic Link 1")
delete_on_remote_if_exists(producer, "Note", {"title": producer_link_doc.title})
frappe.db.delete("Note", {"title": producer_link_doc.title})
producer_link_doc = producer.insert(producer_link_doc)
producer_doc = frappe._dict(
doctype="ToDo",
description="Test Dynamic Link 2",
assigned_by="Administrator",
reference_type="Note",
reference_name=producer_link_doc.name,
)
producer_doc = producer.insert(producer_doc)
self.pull_producer_data()
# check dynamic link dependency created
self.assertTrue(frappe.db.exists("Note", producer_link_doc.name))
self.assertEqual(
producer_link_doc.name, frappe.db.get_value("ToDo", producer_doc.name, "reference_name")
)
reset_configuration(producer_url)
def test_naming_configuration(self):
# test with use_same_name = 0
producer = get_remote_site()
event_producer = frappe.get_doc("Event Producer", producer_url, for_update=True)
event_producer.producer_doctypes = []
event_producer.append("producer_doctypes", {"ref_doctype": "ToDo", "use_same_name": 0})
event_producer.save()
producer_doc = insert_into_producer(producer, "test different name sync")
self.pull_producer_data()
self.assertTrue(
frappe.db.exists(
"ToDo", {"remote_docname": producer_doc.name, "remote_site_name": producer_url}
)
)
reset_configuration(producer_url)
def test_conditional_events(self):
producer = get_remote_site()
# Add Condition
event_producer = frappe.get_doc("Event Producer", producer_url)
note_producer_entry = [x for x in event_producer.producer_doctypes if x.ref_doctype == "Note"][0]
note_producer_entry.condition = "doc.public == 1"
event_producer.save()
# Make test doc
producer_note1 = frappe._dict(doctype="Note", public=0, title="test conditional sync")
delete_on_remote_if_exists(producer, "Note", {"title": producer_note1["title"]})
producer_note1 = producer.insert(producer_note1)
# Make Update
producer_note1["content"] = "Test Conditional Sync Content"
producer_note1 = producer.update(producer_note1)
self.pull_producer_data()
# Check if synced here
self.assertFalse(frappe.db.exists("Note", producer_note1.name))
# Lets satisfy the condition
producer_note1["public"] = 1
producer_note1 = producer.update(producer_note1)
self.pull_producer_data()
# it should sync now
self.assertTrue(frappe.db.exists("Note", producer_note1.name))
local_note = frappe.get_doc("Note", producer_note1.name)
self.assertEqual(local_note.content, producer_note1.content)
reset_configuration(producer_url)
def test_conditional_events_with_cmd(self):
producer = get_remote_site()
# Add Condition
event_producer = frappe.get_doc("Event Producer", producer_url)
note_producer_entry = [x for x in event_producer.producer_doctypes if x.ref_doctype == "Note"][0]
note_producer_entry.condition = (
"cmd: frappe.event_streaming.doctype.event_producer.test_event_producer.can_sync_note"
)
event_producer.save()
# Make test doc
producer_note1 = frappe._dict(doctype="Note", public=0, title="test conditional sync cmd")
delete_on_remote_if_exists(producer, "Note", {"title": producer_note1["title"]})
producer_note1 = producer.insert(producer_note1)
# Make Update
producer_note1["content"] = "Test Conditional Sync Content"
producer_note1 = producer.update(producer_note1)
self.pull_producer_data()
# Check if synced here
self.assertFalse(frappe.db.exists("Note", producer_note1.name))
# Lets satisfy the condition
producer_note1["public"] = 1
producer_note1 = producer.update(producer_note1)
self.pull_producer_data()
# it should sync now
self.assertTrue(frappe.db.exists("Note", producer_note1.name))
local_note = frappe.get_doc("Note", producer_note1.name)
self.assertEqual(local_note.content, producer_note1.content)
reset_configuration(producer_url)
def test_update_log(self):
producer = get_remote_site()
producer_doc = insert_into_producer(producer, "test update log")
update_log_doc = producer.get_value(
"Event Update Log", "docname", {"docname": producer_doc.get("name")}
)
self.assertEqual(update_log_doc.get("docname"), producer_doc.get("name"))
def test_event_sync_log(self):
producer = get_remote_site()
producer_doc = insert_into_producer(producer, "test event sync log")
self.pull_producer_data()
self.assertTrue(frappe.db.exists("Event Sync Log", {"docname": producer_doc.name}))
def pull_producer_data(self):
pull_from_node(producer_url)
def test_mapping(self):
producer = get_remote_site()
event_producer = frappe.get_doc("Event Producer", producer_url, for_update=True)
event_producer.producer_doctypes = []
mapping = [{"local_fieldname": "description", "remote_fieldname": "content"}]
event_producer.append(
"producer_doctypes",
{
"ref_doctype": "ToDo",
"use_same_name": 1,
"has_mapping": 1,
"mapping": get_mapping("ToDo to Note", "ToDo", "Note", mapping),
},
)
event_producer.save()
producer_note = frappe._dict(doctype="Note", title="Test Mapping", content="Test Mapping")
delete_on_remote_if_exists(producer, "Note", {"title": producer_note.title})
producer_note = producer.insert(producer_note)
self.pull_producer_data()
# check inserted
self.assertTrue(frappe.db.exists("ToDo", {"description": producer_note.content}))
# update in producer
producer_note["content"] = "test mapped doc update sync"
producer_note = producer.update(producer_note)
self.pull_producer_data()
# check updated
self.assertTrue(frappe.db.exists("ToDo", {"description": producer_note["content"]}))
producer.delete("Note", producer_note.name)
self.pull_producer_data()
# check delete
self.assertFalse(frappe.db.exists("ToDo", {"description": producer_note.content}))
reset_configuration(producer_url)
def test_inner_mapping(self):
producer = get_remote_site()
setup_event_producer_for_inner_mapping()
producer_note = frappe._dict(
doctype="Note", title="Inner Mapping Tester", content="Test Inner Mapping"
)
delete_on_remote_if_exists(producer, "Note", {"title": producer_note.title})
producer_note = producer.insert(producer_note)
self.pull_producer_data()
# check dependency inserted
self.assertTrue(frappe.db.exists("Role", {"role_name": producer_note.title}))
# check doc inserted
self.assertTrue(frappe.db.exists("ToDo", {"description": producer_note.content}))
reset_configuration(producer_url)
def can_sync_note(consumer, doc, update_log):
return doc.public == 1
def setup_event_producer_for_inner_mapping():
event_producer = frappe.get_doc("Event Producer", producer_url, for_update=True)
event_producer.producer_doctypes = []
inner_mapping = [{"local_fieldname": "role_name", "remote_fieldname": "title"}]
inner_map = get_mapping("Role to Note Dependency Creation", "Role", "Note", inner_mapping)
mapping = [
{
"local_fieldname": "description",
"remote_fieldname": "content",
},
{
"local_fieldname": "role",
"remote_fieldname": "title",
"mapping_type": "Document",
"mapping": inner_map,
"remote_value_filters": json.dumps({"title": "title"}),
},
]
event_producer.append(
"producer_doctypes",
{
"ref_doctype": "ToDo",
"use_same_name": 1,
"has_mapping": 1,
"mapping": get_mapping("ToDo to Note Mapping", "ToDo", "Note", mapping),
},
)
event_producer.save()
return event_producer
def insert_into_producer(producer, description):
# create and insert todo on remote site
todo = dict(doctype="ToDo", description=description, assigned_by="Administrator")
return producer.insert(todo)
def delete_on_remote_if_exists(producer, doctype, filters):
remote_doc = producer.get_value(doctype, "name", filters)
if remote_doc:
producer.delete(doctype, remote_doc.get("name"))
def get_mapping(mapping_name, local, remote, field_map):
name = frappe.db.exists("Document Type Mapping", mapping_name)
if name:
doc = frappe.get_doc("Document Type Mapping", name)
else:
doc = frappe.new_doc("Document Type Mapping")
doc.mapping_name = mapping_name
doc.local_doctype = local
doc.remote_doctype = remote
for entry in field_map:
doc.append("field_mapping", entry)
doc.save()
return doc.name
def create_event_producer(producer_url):
if frappe.db.exists("Event Producer", producer_url):
event_producer = frappe.get_doc("Event Producer", producer_url)
for entry in event_producer.producer_doctypes:
entry.unsubscribe = 0
event_producer.save()
return
generate_keys("Administrator")
producer_site = connect()
response = producer_site.post_api(
"frappe.core.doctype.user.user.generate_keys", params={"user": "Administrator"}
)
api_secret = response.get("api_secret")
response = producer_site.get_value("User", "api_key", {"name": "Administrator"})
api_key = response.get("api_key")
event_producer = frappe.new_doc("Event Producer")
event_producer.producer_doctypes = []
event_producer.producer_url = producer_url
event_producer.append("producer_doctypes", {"ref_doctype": "ToDo", "use_same_name": 1})
event_producer.append("producer_doctypes", {"ref_doctype": "Note", "use_same_name": 1})
event_producer.user = "Administrator"
event_producer.api_key = api_key
event_producer.api_secret = api_secret
event_producer.save()
def reset_configuration(producer_url):
event_producer = frappe.get_doc("Event Producer", producer_url, for_update=True)
event_producer.producer_doctypes = []
event_producer.conditions = []
event_producer.producer_url = producer_url
event_producer.append("producer_doctypes", {"ref_doctype": "ToDo", "use_same_name": 1})
event_producer.append("producer_doctypes", {"ref_doctype": "Note", "use_same_name": 1})
event_producer.user = "Administrator"
event_producer.save()
def get_remote_site():
producer_doc = frappe.get_doc("Event Producer", producer_url)
producer_site = FrappeClient(
url=producer_doc.producer_url, username="Administrator", password="admin", verify=False
)
return producer_site
def unsubscribe_doctypes(producer_url):
event_producer = frappe.get_doc("Event Producer", producer_url)
for entry in event_producer.producer_doctypes:
entry.unsubscribe = 1
event_producer.save()
def connect():
def _connect():
return FrappeClient(url=producer_url, username="Administrator", password="admin", verify=False)
try:
return _connect()
except Exception:
return _connect()

View file

@ -1,86 +0,0 @@
{
"actions": [],
"creation": "2019-10-03 21:08:25.890352",
"doctype": "DocType",
"editable_grid": 1,
"engine": "InnoDB",
"field_order": [
"ref_doctype",
"status",
"use_same_name",
"unsubscribe",
"has_mapping",
"mapping",
"condition"
],
"fields": [
{
"columns": 3,
"fieldname": "ref_doctype",
"fieldtype": "Link",
"in_list_view": 1,
"label": "Document Type",
"options": "DocType",
"reqd": 1,
"set_only_once": 1
},
{
"default": "0",
"description": "If the document has different field names on the Producer and Consumer's end check this and set up the Mapping",
"fieldname": "has_mapping",
"fieldtype": "Check",
"label": "Has Mapping"
},
{
"depends_on": "eval: doc.has_mapping",
"fieldname": "mapping",
"fieldtype": "Link",
"label": "Mapping",
"options": "Document Type Mapping"
},
{
"columns": 2,
"default": "0",
"description": "If this is checked the documents will have the same name as they have on the Event Producer's site",
"fieldname": "use_same_name",
"fieldtype": "Check",
"in_list_view": 1,
"label": "Use Same Name"
},
{
"columns": 3,
"default": "Pending",
"fieldname": "status",
"fieldtype": "Select",
"in_list_view": 1,
"label": "Approval Status",
"options": "Pending\nApproved\nRejected",
"read_only": 1
},
{
"columns": 2,
"default": "0",
"fieldname": "unsubscribe",
"fieldtype": "Check",
"in_list_view": 1,
"label": "Unsubscribe"
},
{
"fieldname": "condition",
"fieldtype": "Code",
"label": "Condition"
}
],
"istable": 1,
"links": [],
"modified": "2020-11-07 09:26:58.463868",
"modified_by": "Administrator",
"module": "Event Streaming",
"name": "Event Producer Document Type",
"owner": "Administrator",
"permissions": [],
"quick_entry": 1,
"sort_field": "modified",
"sort_order": "DESC",
"track_changes": 1
}

View file

@ -1,9 +0,0 @@
# Copyright (c) 2019, Frappe Technologies and contributors
# License: MIT. See LICENSE
# import frappe
from frappe.model.document import Document
class EventProducerDocumentType(Document):
pass

View file

@ -1,7 +0,0 @@
// Copyright (c) 2020, Frappe Technologies and contributors
// For license information, please see license.txt
frappe.ui.form.on("Event Producer Last Update", {
// refresh: function(frm) {
// }
});

View file

@ -1,53 +0,0 @@
{
"actions": [],
"autoname": "field:event_producer",
"creation": "2020-10-26 12:53:11.940177",
"doctype": "DocType",
"editable_grid": 1,
"engine": "InnoDB",
"field_order": [
"event_producer",
"last_update"
],
"fields": [
{
"fieldname": "event_producer",
"fieldtype": "Data",
"in_list_view": 1,
"label": "Event Producer",
"reqd": 1,
"unique": 1
},
{
"fieldname": "last_update",
"fieldtype": "Data",
"label": "Last Update"
}
],
"in_create": 1,
"index_web_pages_for_search": 1,
"links": [],
"modified": "2020-10-26 13:22:27.056599",
"modified_by": "Administrator",
"module": "Event Streaming",
"name": "Event Producer Last Update",
"owner": "Administrator",
"permissions": [
{
"create": 1,
"delete": 1,
"email": 1,
"export": 1,
"print": 1,
"read": 1,
"report": 1,
"role": "System Manager",
"share": 1,
"write": 1
}
],
"read_only": 1,
"sort_field": "modified",
"sort_order": "DESC",
"track_changes": 1
}

View file

@ -1,9 +0,0 @@
# Copyright (c) 2020, Frappe Technologies and contributors
# License: MIT. See LICENSE
# import frappe
from frappe.model.document import Document
class EventProducerLastUpdate(Document):
pass

View file

@ -1,8 +0,0 @@
# Copyright (c) 2020, Frappe Technologies and Contributors
# License: MIT. See LICENSE
# import frappe
from frappe.tests.utils import FrappeTestCase
class TestEventProducerLastUpdate(FrappeTestCase):
pass

View file

@ -1,24 +0,0 @@
// Copyright (c) 2019, Frappe Technologies and contributors
// For license information, please see license.txt
frappe.ui.form.on("Event Sync Log", {
refresh: function (frm) {
if (frm.doc.status == "Failed") {
frm.add_custom_button(__("Resync"), function () {
frappe.call({
method: "frappe.event_streaming.doctype.event_producer.event_producer.resync",
args: {
update: frm.doc,
},
callback: function (r) {
if (r.message) {
frappe.msgprint(r.message);
frm.set_value("status", r.message);
frm.save();
}
},
});
});
}
},
});

View file

@ -1,137 +0,0 @@
{
"creation": "2019-09-24 22:22:05.845089",
"doctype": "DocType",
"editable_grid": 1,
"engine": "InnoDB",
"field_order": [
"update_type",
"ref_doctype",
"docname",
"column_break_4",
"status",
"event_producer",
"producer_doc",
"event_configurations_section",
"use_same_name",
"column_break_9",
"mapping",
"section_break_8",
"data",
"error"
],
"fields": [
{
"fieldname": "update_type",
"fieldtype": "Select",
"in_list_view": 1,
"label": "Update Type",
"options": "Create\nUpdate\nDelete",
"read_only": 1
},
{
"fieldname": "ref_doctype",
"fieldtype": "Link",
"label": "Doctype",
"options": "DocType",
"read_only": 1
},
{
"fieldname": "docname",
"fieldtype": "Data",
"in_list_view": 1,
"label": "Document Name",
"options": "ref_doctype",
"read_only": 1
},
{
"fieldname": "column_break_4",
"fieldtype": "Column Break"
},
{
"fieldname": "status",
"fieldtype": "Select",
"in_list_view": 1,
"label": "Status",
"options": "\nSynced\nFailed",
"read_only": 1
},
{
"fieldname": "event_producer",
"fieldtype": "Data",
"in_list_view": 1,
"label": "Event Producer",
"options": "Event Producer",
"read_only": 1
},
{
"fieldname": "section_break_8",
"fieldtype": "Section Break",
"label": "Data"
},
{
"fieldname": "data",
"fieldtype": "Code",
"label": "Data",
"read_only": 1
},
{
"fieldname": "producer_doc",
"fieldtype": "Data",
"label": "Producer Document Name",
"read_only": 1
},
{
"depends_on": "eval:doc.status=='Failed'",
"fieldname": "error",
"fieldtype": "Code",
"label": "Error",
"read_only": 1
},
{
"fieldname": "event_configurations_section",
"fieldtype": "Section Break",
"label": "Event Configurations"
},
{
"default": "0",
"fieldname": "use_same_name",
"fieldtype": "Data",
"label": "Use Same Name",
"read_only": 1
},
{
"fieldname": "column_break_9",
"fieldtype": "Column Break"
},
{
"fieldname": "mapping",
"fieldtype": "Data",
"label": "Mapping",
"read_only": 1
}
],
"in_create": 1,
"modified": "2019-10-07 13:22:10.401479",
"modified_by": "Administrator",
"module": "Event Streaming",
"name": "Event Sync Log",
"owner": "Administrator",
"permissions": [
{
"create": 1,
"delete": 1,
"email": 1,
"export": 1,
"print": 1,
"read": 1,
"report": 1,
"role": "System Manager",
"share": 1,
"write": 1
}
],
"quick_entry": 1,
"sort_field": "modified",
"sort_order": "DESC",
"track_changes": 1
}

View file

@ -1,9 +0,0 @@
# Copyright (c) 2019, Frappe Technologies and contributors
# License: MIT. See LICENSE
# import frappe
from frappe.model.document import Document
class EventSyncLog(Document):
pass

View file

@ -1,9 +0,0 @@
frappe.listview_settings["Event Sync Log"] = {
get_indicator: function (doc) {
var colors = {
Failed: "red",
Synced: "green",
};
return [__(doc.status), colors[doc.status], "status,=," + doc.status];
},
};

View file

@ -1,8 +0,0 @@
# Copyright (c) 2019, Frappe Technologies and Contributors
# License: MIT. See LICENSE
# import frappe
from frappe.tests.utils import FrappeTestCase
class TestEventSyncLog(FrappeTestCase):
pass

View file

@ -1,7 +0,0 @@
// Copyright (c) 2019, Frappe Technologies Pvt. Ltd. and contributors
// For license information, please see license.txt
frappe.ui.form.on("Event Update Log", {
// refresh: function(frm) {
// }
});

View file

@ -1,77 +0,0 @@
{
"actions": [],
"creation": "2019-07-30 15:31:26.352527",
"doctype": "DocType",
"editable_grid": 1,
"engine": "InnoDB",
"field_order": [
"update_type",
"ref_doctype",
"docname",
"data",
"consumers"
],
"fields": [
{
"fieldname": "update_type",
"fieldtype": "Select",
"in_list_view": 1,
"label": "Update Type",
"options": "Create\nUpdate\nDelete",
"read_only": 1
},
{
"fieldname": "ref_doctype",
"fieldtype": "Link",
"in_list_view": 1,
"label": "DocType",
"options": "DocType",
"read_only": 1
},
{
"fieldname": "docname",
"fieldtype": "Data",
"in_list_view": 1,
"label": "Document Name",
"read_only": 1
},
{
"fieldname": "data",
"fieldtype": "Code",
"label": "Data",
"read_only": 1
},
{
"fieldname": "consumers",
"fieldtype": "Table MultiSelect",
"label": "Consumers",
"options": "Event Update Log Consumer",
"read_only": 1
}
],
"in_create": 1,
"links": [],
"modified": "2020-09-04 07:31:52.599804",
"modified_by": "Administrator",
"module": "Event Streaming",
"name": "Event Update Log",
"owner": "Administrator",
"permissions": [
{
"create": 1,
"delete": 1,
"email": 1,
"export": 1,
"print": 1,
"read": 1,
"report": 1,
"role": "System Manager",
"share": 1,
"write": 1
}
],
"quick_entry": 1,
"sort_field": "modified",
"sort_order": "DESC",
"track_changes": 1
}

View file

@ -1,296 +0,0 @@
# Copyright (c) 2019, Frappe Technologies Pvt. Ltd. and contributors
# License: MIT. See LICENSE
import frappe
from frappe.model import no_value_fields, table_fields
from frappe.model.document import Document
from frappe.utils.background_jobs import get_jobs
class EventUpdateLog(Document):
def after_insert(self):
"""Send update notification updates to event consumers
whenever update log is generated"""
enqueued_method = (
"frappe.event_streaming.doctype.event_consumer.event_consumer.notify_event_consumers"
)
jobs = get_jobs()
if not jobs or enqueued_method not in jobs[frappe.local.site]:
frappe.enqueue(
enqueued_method, doctype=self.ref_doctype, queue="long", enqueue_after_commit=True
)
def notify_consumers(doc, event):
"""called via hooks"""
# make event update log for doctypes having event consumers
if frappe.flags.in_install or frappe.flags.in_migrate:
return
consumers = check_doctype_has_consumers(doc.doctype)
if consumers:
if event == "after_insert":
doc.flags.event_update_log = make_event_update_log(doc, update_type="Create")
elif event == "on_trash":
make_event_update_log(doc, update_type="Delete")
else:
# on_update
# called after saving
if not doc.flags.event_update_log: # if not already inserted
diff = get_update(doc.get_doc_before_save(), doc)
if diff:
doc.diff = diff
make_event_update_log(doc, update_type="Update")
def check_doctype_has_consumers(doctype):
"""Check if doctype has event consumers for event streaming"""
return frappe.cache_manager.get_doctype_map(
"Event Consumer Document Type",
doctype,
dict(ref_doctype=doctype, status="Approved", unsubscribed=0),
)
def get_update(old, new, for_child=False):
"""
Get document objects with updates only
If there is a change, then returns a dict like:
{
"changed" : {fieldname1: new_value1, fieldname2: new_value2, },
"added" : {table_fieldname1: [{row_dict1}, {row_dict2}], },
"removed" : {table_fieldname1: [row_name1, row_name2], },
"row_changed" : {table_fieldname1:
{
child_fieldname1: new_val,
child_fieldname2: new_val
},
},
}
"""
if not new:
return None
out = frappe._dict(changed={}, added={}, removed={}, row_changed={})
for df in new.meta.fields:
if df.fieldtype in no_value_fields and df.fieldtype not in table_fields:
continue
old_value, new_value = old.get(df.fieldname), new.get(df.fieldname)
if df.fieldtype in table_fields:
old_row_by_name, new_row_by_name = make_maps(old_value, new_value)
out = check_for_additions(out, df, new_value, old_row_by_name)
out = check_for_deletions(out, df, old_value, new_row_by_name)
elif old_value != new_value:
out.changed[df.fieldname] = new_value
out = check_docstatus(out, old, new, for_child)
if any((out.changed, out.added, out.removed, out.row_changed)):
return out
return None
def make_event_update_log(doc, update_type):
"""Save update info for doctypes that have event consumers"""
if update_type != "Delete":
# diff for update type, doc for create type
data = frappe.as_json(doc) if not doc.get("diff") else frappe.as_json(doc.diff)
else:
data = None
return frappe.get_doc(
{
"doctype": "Event Update Log",
"update_type": update_type,
"ref_doctype": doc.doctype,
"docname": doc.name,
"data": data,
}
).insert(ignore_permissions=True)
def make_maps(old_value, new_value):
"""make maps"""
old_row_by_name, new_row_by_name = {}, {}
for d in old_value:
old_row_by_name[d.name] = d
for d in new_value:
new_row_by_name[d.name] = d
return old_row_by_name, new_row_by_name
def check_for_additions(out, df, new_value, old_row_by_name):
"""check rows for additions, changes"""
for _i, d in enumerate(new_value):
if d.name in old_row_by_name:
diff = get_update(old_row_by_name[d.name], d, for_child=True)
if diff and diff.changed:
if not out.row_changed.get(df.fieldname):
out.row_changed[df.fieldname] = []
diff.changed["name"] = d.name
out.row_changed[df.fieldname].append(diff.changed)
else:
if not out.added.get(df.fieldname):
out.added[df.fieldname] = []
out.added[df.fieldname].append(d.as_dict())
return out
def check_for_deletions(out, df, old_value, new_row_by_name):
"""check for deletions"""
for d in old_value:
if d.name not in new_row_by_name:
if not out.removed.get(df.fieldname):
out.removed[df.fieldname] = []
out.removed[df.fieldname].append(d.name)
return out
def check_docstatus(out, old, new, for_child):
"""docstatus changes"""
if not for_child and old.docstatus != new.docstatus:
out.changed["docstatus"] = new.docstatus
return out
def is_consumer_uptodate(update_log, consumer):
"""
Checks if Consumer has read all the UpdateLogs before the specified update_log
:param update_log: The UpdateLog Doc in context
:param consumer: The EventConsumer doc
"""
if update_log.update_type == "Create":
# consumer is obviously up to date
return True
prev_logs = frappe.get_all(
"Event Update Log",
filters={
"ref_doctype": update_log.ref_doctype,
"docname": update_log.docname,
"creation": ["<", update_log.creation],
},
order_by="creation desc",
limit_page_length=1,
)
if not len(prev_logs):
return False
prev_log_consumers = frappe.get_all(
"Event Update Log Consumer",
fields=["consumer"],
filters={
"parent": prev_logs[0].name,
"parenttype": "Event Update Log",
"consumer": consumer.name,
},
)
return len(prev_log_consumers) > 0
def mark_consumer_read(update_log_name, consumer_name):
"""
This function appends the Consumer to the list of Consumers that has 'read' an Update Log
"""
update_log = frappe.get_doc("Event Update Log", update_log_name)
if len([x for x in update_log.consumers if x.consumer == consumer_name]):
return
frappe.get_doc(
frappe._dict(
doctype="Event Update Log Consumer",
consumer=consumer_name,
parent=update_log_name,
parenttype="Event Update Log",
parentfield="consumers",
)
).insert(ignore_permissions=True)
def get_unread_update_logs(consumer_name, dt, dn):
"""
Get old logs unread by the consumer on a particular document
"""
already_consumed = [
x[0]
for x in frappe.db.sql(
"""
SELECT
update_log.name
FROM `tabEvent Update Log` update_log
JOIN `tabEvent Update Log Consumer` consumer ON consumer.parent = %(log_name)s
WHERE
consumer.consumer = %(consumer)s
AND update_log.ref_doctype = %(dt)s
AND update_log.docname = %(dn)s
""",
{
"consumer": consumer_name,
"dt": dt,
"dn": dn,
"log_name": "update_log.name"
if frappe.conf.db_type == "mariadb"
else "CAST(update_log.name AS VARCHAR)",
},
as_dict=0,
)
]
logs = frappe.get_all(
"Event Update Log",
fields=["update_type", "ref_doctype", "docname", "data", "name", "creation"],
filters={"ref_doctype": dt, "docname": dn, "name": ["not in", already_consumed]},
order_by="creation",
)
return logs
@frappe.whitelist()
def get_update_logs_for_consumer(event_consumer, doctypes, last_update):
"""
Fetches all the UpdateLogs for the consumer
It will inject old un-consumed Update Logs if a doc was just found to be accessible to the Consumer
"""
if isinstance(doctypes, str):
doctypes = frappe.parse_json(doctypes)
from frappe.event_streaming.doctype.event_consumer.event_consumer import has_consumer_access
consumer = frappe.get_doc("Event Consumer", event_consumer)
docs = frappe.get_list(
doctype="Event Update Log",
filters={"ref_doctype": ("in", doctypes), "creation": (">", last_update)},
fields=["update_type", "ref_doctype", "docname", "data", "name", "creation"],
order_by="creation desc",
)
result = []
to_update_history = []
for d in docs:
if (d.ref_doctype, d.docname) in to_update_history:
# will be notified by background jobs
continue
if not has_consumer_access(consumer=consumer, update_log=d):
continue
if not is_consumer_uptodate(d, consumer):
to_update_history.append((d.ref_doctype, d.docname))
# get_unread_update_logs will have the current log
old_logs = get_unread_update_logs(consumer.name, d.ref_doctype, d.docname)
if old_logs:
old_logs.reverse()
result.extend(old_logs)
else:
result.append(d)
for d in result:
mark_consumer_read(update_log_name=d.name, consumer_name=consumer.name)
result.reverse()
return result

View file

@ -1,8 +0,0 @@
# Copyright (c) 2019, Frappe Technologies Pvt. Ltd. and Contributors
# License: MIT. See LICENSE
# import frappe
from frappe.tests.utils import FrappeTestCase
class TestEventUpdateLog(FrappeTestCase):
pass

View file

@ -1,32 +0,0 @@
{
"actions": [],
"creation": "2020-06-30 10:54:53.301787",
"doctype": "DocType",
"editable_grid": 1,
"engine": "InnoDB",
"field_order": [
"consumer"
],
"fields": [
{
"fieldname": "consumer",
"fieldtype": "Link",
"in_list_view": 1,
"label": "Consumer",
"options": "Event Consumer",
"reqd": 1
}
],
"istable": 1,
"links": [],
"modified": "2020-06-30 10:54:53.301787",
"modified_by": "Administrator",
"module": "Event Streaming",
"name": "Event Update Log Consumer",
"owner": "Administrator",
"permissions": [],
"quick_entry": 1,
"sort_field": "modified",
"sort_order": "DESC",
"track_changes": 1
}

View file

@ -1,9 +0,0 @@
# Copyright (c) 2020, Frappe Technologies and contributors
# License: MIT. See LICENSE
# import frappe
from frappe.model.document import Document
class EventUpdateLogConsumer(Document):
pass

View file

@ -138,16 +138,12 @@ standard_queries = {"User": "frappe.core.doctype.user.user.user_query"}
doc_events = {
"*": {
"after_insert": [
"frappe.event_streaming.doctype.event_update_log.event_update_log.notify_consumers"
],
"on_update": [
"frappe.desk.notifications.clear_doctype_notifications",
"frappe.core.doctype.activity_log.feed.update_feed",
"frappe.workflow.doctype.workflow_action.workflow_action.process_workflow_actions",
"frappe.automation.doctype.assignment_rule.assignment_rule.apply",
"frappe.core.doctype.file.utils.attach_files_to_document",
"frappe.event_streaming.doctype.event_update_log.event_update_log.notify_consumers",
"frappe.automation.doctype.assignment_rule.assignment_rule.update_due_date",
"frappe.core.doctype.user_type.user_type.apply_permissions_for_non_standard_user_type",
],
@ -155,12 +151,10 @@ doc_events = {
"on_cancel": [
"frappe.desk.notifications.clear_doctype_notifications",
"frappe.workflow.doctype.workflow_action.workflow_action.process_workflow_actions",
"frappe.event_streaming.doctype.event_update_log.event_update_log.notify_consumers",
],
"on_trash": [
"frappe.desk.notifications.clear_doctype_notifications",
"frappe.workflow.doctype.workflow_action.workflow_action.process_workflow_actions",
"frappe.event_streaming.doctype.event_update_log.event_update_log.notify_consumers",
],
"on_update_after_submit": [
"frappe.workflow.doctype.workflow_action.workflow_action.process_workflow_actions"

View file

@ -114,6 +114,8 @@ core_doctypes_list = (
"Client Script",
)
# NOTE: this is being used for dynamic autoincrement in new sites,
# removing any of these will require patches.
log_types = (
"Version",
"Error Log",

View file

@ -275,9 +275,6 @@ class Document(BaseDocument):
if self.get("amended_from"):
self.copy_attachments_from_amended_from()
# flag to prevent creation of event update log for create and update both
# during document creation
self.flags.update_log_for_doc_creation = True
self.run_post_save_methods()
self.flags.in_insert = False

View file

@ -9,5 +9,4 @@ Integrations
Printing
Contacts
Social
Automation
Event Streaming
Automation

View file

@ -156,7 +156,6 @@ frappe.patches.v13_0.set_route_for_blog_category
frappe.patches.v13_0.enable_custom_script
frappe.patches.v13_0.update_newsletter_content_type
execute:frappe.db.set_value('Website Settings', 'Website Settings', {'navbar_template': 'Standard Navbar', 'footer_template': 'Standard Footer'})
frappe.patches.v13_0.delete_event_producer_and_consumer_keys
frappe.patches.v13_0.web_template_set_module #2020-10-05
frappe.patches.v13_0.remove_custom_link
execute:frappe.delete_doc("DocType", "Footer Item")
@ -195,6 +194,7 @@ frappe.patches.v14_0.log_settings_migration
frappe.patches.v14_0.setup_likes_from_feedback
frappe.patches.v14_0.update_webforms
frappe.patches.v14_0.delete_payment_gateways
frappe.patches.v15_0.remove_event_streaming
[post_model_sync]
frappe.patches.v14_0.drop_data_import_legacy

View file

@ -1,11 +0,0 @@
# Copyright (c) 2020, Frappe Technologies Pvt. Ltd. and Contributors
# License: MIT. See LICENSE
import frappe
def execute():
if frappe.db.exists("DocType", "Event Producer"):
frappe.db.sql("""UPDATE `tabEvent Producer` SET api_key='', api_secret=''""")
if frappe.db.exists("DocType", "Event Consumer"):
frappe.db.sql("""UPDATE `tabEvent Consumer` SET api_key='', api_secret=''""")

View file

@ -0,0 +1,22 @@
import frappe
def execute():
if "event_streaming" in frappe.get_installed_apps():
return
frappe.delete_doc_if_exists("Module Def", "Event Streaming", force=True)
for doc in [
"Event Consumer Document Type",
"Document Type Mapping",
"Event Producer",
"Event Producer Last Update",
"Event Producer Document Type",
"Event Consumer",
"Document Type Field Mapping",
"Event Update Log",
"Event Update Log Consumer",
"Event Sync Log",
]:
frappe.delete_doc_if_exists("DocType", doc, force=True)

View file

@ -18,7 +18,6 @@ frappe.breadcrumbs = {
Workflow: "Settings",
Printing: "Settings",
Setup: "Settings",
"Event Streaming": "Tools",
Automation: "Tools",
},