fix: replaced Node Configuration with Event Producer and Consumer, removed Node related doctypes

This commit is contained in:
Rucha Mahabal 2019-09-03 23:52:16 +05:30
parent 7c5ea346eb
commit 870bf66807
24 changed files with 315 additions and 508 deletions

View file

@ -1,7 +1,7 @@
// Copyright (c) 2019, Frappe Technologies and contributors
// For license information, please see license.txt
frappe.ui.form.on('Node Configuration DocType', {
frappe.ui.form.on('Event Consumer', {
// refresh: function(frm) {
// }

View file

@ -0,0 +1,51 @@
{
"autoname": "field:callback_url",
"creation": "2019-08-26 17:45:15.479530",
"doctype": "DocType",
"editable_grid": 1,
"engine": "InnoDB",
"field_order": [
"subscribed_doctypes",
"callback_url"
],
"fields": [
{
"fieldname": "callback_url",
"fieldtype": "Data",
"in_list_view": 1,
"label": "Callback URL",
"reqd": 1,
"unique": 1
},
{
"fieldname": "subscribed_doctypes",
"fieldtype": "Table",
"label": "Subscribed Doctypes",
"options": "Event Subscribed DocType",
"reqd": 1
}
],
"modified": "2019-08-29 21:00:03.302463",
"modified_by": "Administrator",
"module": "Events Streaming",
"name": "Event Consumer",
"owner": "Administrator",
"permissions": [
{
"create": 1,
"delete": 1,
"email": 1,
"export": 1,
"print": 1,
"read": 1,
"report": 1,
"role": "System Manager",
"share": 1,
"write": 1
}
],
"quick_entry": 1,
"sort_field": "modified",
"sort_order": "DESC",
"track_changes": 1
}

View file

@ -6,5 +6,5 @@ from __future__ import unicode_literals
# import frappe
from frappe.model.document import Document
class NodeConfigurationDocType(Document):
class EventConsumer(Document):
pass

View file

@ -6,5 +6,5 @@ from __future__ import unicode_literals
# import frappe
import unittest
class TestNodeConfigurationDocType(unittest.TestCase):
class TestEventConsumer(unittest.TestCase):
pass

View file

@ -0,0 +1,8 @@
// Copyright (c) 2019, Frappe Technologies and contributors
// For license information, please see license.txt
frappe.ui.form.on('Event Producer', {
// refresh: function(frm) {
// }
});

View file

@ -0,0 +1,59 @@
{
"autoname": "field:producer_url",
"creation": "2019-08-26 19:17:24.919196",
"doctype": "DocType",
"editable_grid": 1,
"engine": "InnoDB",
"field_order": [
"producer_url",
"subscribed_doctypes",
"last_update"
],
"fields": [
{
"fieldname": "producer_url",
"fieldtype": "Data",
"in_list_view": 1,
"label": "Producer URL",
"reqd": 1,
"unique": 1
},
{
"fieldname": "subscribed_doctypes",
"fieldtype": "Table",
"label": "Subscribed Doctypes",
"options": "Event Subscribed DocType",
"reqd": 1
},
{
"fieldname": "last_update",
"fieldtype": "Data",
"label": "Last Update",
"read_only": 1,
"reqd": 1
}
],
"modified": "2019-08-30 12:17:30.264969",
"modified_by": "Administrator",
"module": "Events Streaming",
"name": "Event Producer",
"owner": "Administrator",
"permissions": [
{
"create": 1,
"delete": 1,
"email": 1,
"export": 1,
"print": 1,
"read": 1,
"report": 1,
"role": "System Manager",
"share": 1,
"write": 1
}
],
"quick_entry": 1,
"sort_field": "modified",
"sort_order": "DESC",
"track_changes": 1
}

View file

@ -0,0 +1,164 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2019, Frappe Technologies and contributors
# For license information, please see license.txt
from __future__ import unicode_literals
import frappe
import json
from frappe import _
from frappe.model.document import Document
from frappe.frappeclient import FrappeClient
# from frappe.desk.linked_with import get_linked_doctype
class EventProducer(Document):
def after_insert(self):
self.update_event_consumer()
def on_update(self):
self.update_event_consumer()
def update_event_consumer(self):
producer_site = get_producer_site(self.producer_url)
event_consumer = producer_site.get_doc('Event Consumer', get_current_node())
if not event_consumer:
consumer = frappe.new_doc('Event Consumer')
consumer.callback_url = get_current_node()
for entry in self.subscribed_doctypes:
consumer.append('subscribed_doctypes', {
'ref_doctype': entry.ref_doctype
})
producer_site.insert(consumer)
else:
event_consumer.subscribed_doctypes = []
for entry in self.subscribed_doctypes:
event_consumer.subscribed_doctypes.append({
'ref_doctype': entry.ref_doctype
})
producer_site.update(event_consumer)
def get_current_node():
current_node = frappe.utils.get_url()
parts = current_node.split(':')
if not len(parts) > 2:
port = frappe.conf.http_port or frappe.conf.webserver_port
current_node += ':' + str(port)
return current_node
def get_producer_site(producer_url):
producer_site = FrappeClient(producer_url, 'Administrator', 'root')
return producer_site
@frappe.whitelist()
def pull_producer_data():
'''Fetch data from producer node.'''
for event_producer in frappe.get_all('Event Producer'):
pull_from_node(event_producer.name)
return 'success'
def pull_from_node(event_producer):
event_producer = frappe.get_doc('Event Producer', event_producer)
producer_site = get_producer_site(event_producer.producer_url)
last_update = event_producer.last_update
doctypes = []
for entry in event_producer.subscribed_doctypes:
doctypes.append(entry.ref_doctype)
updates = get_updates(producer_site, last_update, doctypes)
for update in updates:
if update.update_type == 'Create':
set_insert(update, producer_site)
if update.update_type == 'Update':
set_update(update, producer_site)
if update.update_type == 'Delete':
set_delete(update)
frappe.db.set_value('Event Producer', event_producer.name, 'last_update', update.name)
frappe.db.commit()
def set_insert(update, producer_site):
if frappe.db.get_value(update.ref_doctype, update.docname):
# doc already created
return
else:
doc = frappe.get_doc(json.loads(update.data))
check_doc_has_dependencies(doc, producer_site)
frappe.get_doc(json.loads(update.data)).insert(set_name=update.docname)
def set_update(update, producer_site):
local_doc = get_local_doc(update)
data = json.loads(update.get('data'))
data.pop('name')
local_doc.update(data)
local_doc.db_update_all()
def set_delete(update):
local_doc = get_local_doc(update)
if local_doc:
local_doc.delete()
def get_updates(producer_site, last_update, doctypes):
last_update = producer_site.get_value('Update Log', 'creation', {'name': last_update})
if last_update:
last_update_timestamp = last_update.get('creation')
else:
last_update_timestamp = None
docs = producer_site.get_list(
doctype = 'Update Log',
filters = {'creation': ('>', last_update_timestamp), 'ref_doctype': ('in', doctypes)},
fields = ['update_type', 'ref_doctype', 'docname', 'data', 'name']
)
docs.reverse()
return [frappe._dict(d) for d in docs]
def get_local_doc(update):
try:
return frappe.get_doc(update.ref_doctype, update.docname)
except frappe.DoesNotExistError:
return
def check_doc_has_dependencies(doc, producer_site):
'''Sync child table link fields first,
then sync link fields,
then dynamic links'''
meta = frappe.get_meta(doc.doctype)
table_fields = meta.get_table_fields()
link_fields = meta.get_link_fields()
dl_fields = meta.get_dynamic_link_fields()
if table_fields:
sync_child_table_dependencies(doc, table_fields, producer_site)
if link_fields:
sync_link_dependencies(doc, link_fields, producer_site)
if dl_fields:
sync_dynamic_link_dependencies(doc, dl_fields, producer_site)
def sync_child_table_dependencies(doc, table_fields, producer_site):
for df in table_fields:
child_table = doc.get(df.fieldname)
for entry in child_table:
set_dependencies(entry, frappe.get_meta(entry.doctype).get_link_fields(), producer_site)
def sync_link_dependencies(doc, link_fields, producer_site):
set_dependencies(doc, link_fields, producer_site)
def sync_dynamic_link_dependencies(doc, dl_fields, producer_site):
for df in dl_fields:
docname = doc.get(df.fieldname)
linked_doctype = doc.get(df.options)
if docname and not check_dependency_fulfilled(linked_doctype, docname):
master_doc = producer_site.get_doc(linked_doctype, docname)
frappe.get_doc(master_doc).insert(set_name=docname)
def set_dependencies(doc, link_fields, producer_site):
for df in link_fields:
docname = doc.get(df.fieldname)
linked_doctype = df.get_link_doctype()
if docname and not check_dependency_fulfilled(linked_doctype, docname):
master_doc = producer_site.get_doc(linked_doctype, docname)
frappe.get_doc(master_doc).insert(set_name=docname)
def check_dependency_fulfilled(linked_doctype, docname):
return frappe.db.exists(linked_doctype, docname)

View file

@ -1,10 +1,10 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2019, Frappe Technologies Pvt. Ltd. and Contributors
# Copyright (c) 2019, Frappe Technologies and Contributors
# See license.txt
from __future__ import unicode_literals
# import frappe
import unittest
class TestNode(unittest.TestCase):
class TestEventProducer(unittest.TestCase):
pass

View file

@ -1,5 +1,5 @@
{
"creation": "2019-08-09 16:19:58.344278",
"creation": "2019-08-29 11:35:00.625815",
"doctype": "DocType",
"editable_grid": 1,
"engine": "InnoDB",
@ -17,10 +17,10 @@
}
],
"istable": 1,
"modified": "2019-08-26 12:48:54.329431",
"modified": "2019-08-29 11:35:00.625815",
"modified_by": "Administrator",
"module": "Events Streaming",
"name": "Node Configuration DocType",
"name": "Event Subscribed DocType",
"owner": "Administrator",
"permissions": [],
"quick_entry": 1,

View file

@ -0,0 +1,10 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2019, Frappe Technologies and contributors
# For license information, please see license.txt
from __future__ import unicode_literals
# import frappe
from frappe.model.document import Document
class EventSubscribedDocType(Document):
pass

View file

@ -1,8 +0,0 @@
// Copyright (c) 2019, Frappe Technologies Pvt. Ltd. and contributors
// For license information, please see license.txt
frappe.ui.form.on('Node', {
// refresh: function(frm) {
// }
});

View file

@ -1,81 +0,0 @@
{
"autoname": "field:host_name",
"creation": "2019-07-30 15:10:47.993692",
"doctype": "DocType",
"engine": "InnoDB",
"field_order": [
"host_name",
"api_key",
"api_secret",
"user",
"allow_auto_changes",
"last_updated"
],
"fields": [
{
"fieldname": "host_name",
"fieldtype": "Data",
"in_list_view": 1,
"label": "Hostname (URL)",
"reqd": 1,
"unique": 1
},
{
"default": "0",
"description": "Automatic changes from Follower nodes will be fetched and updated",
"fieldname": "allow_auto_changes",
"fieldtype": "Check",
"label": "Allow Automatic Changes"
},
{
"fieldname": "api_key",
"fieldtype": "Data",
"label": "API Key",
"read_only": 1,
"unique": 1
},
{
"fieldname": "api_secret",
"fieldtype": "Password",
"label": "API Secret",
"read_only": 1
},
{
"fieldname": "last_updated",
"fieldtype": "Data",
"label": "Last Updated",
"options": "Update Log"
},
{
"fieldname": "user",
"fieldtype": "Link",
"in_list_view": 1,
"label": "Node User",
"options": "User",
"reqd": 1
}
],
"modified": "2019-08-26 12:49:32.387804",
"modified_by": "Administrator",
"module": "Events Streaming",
"name": "Node",
"owner": "Administrator",
"permissions": [
{
"create": 1,
"delete": 1,
"email": 1,
"export": 1,
"print": 1,
"read": 1,
"report": 1,
"role": "System Manager",
"share": 1,
"write": 1
}
],
"quick_entry": 1,
"sort_field": "modified",
"sort_order": "DESC",
"track_changes": 1
}

View file

@ -1,12 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2019, Frappe Technologies Pvt. Ltd. and contributors
# For license information, please see license.txt
from __future__ import unicode_literals
import frappe
from frappe.model.document import Document
class Node(Document):
def before_insert(self):
self.api_key = frappe.generate_hash(length = 15)
self.api_secret = frappe.generate_hash(length = 15)

View file

@ -1,8 +0,0 @@
// Copyright (c) 2019, Frappe Technologies Pvt. Ltd. and contributors
// For license information, please see license.txt
frappe.ui.form.on('Node Configuration', {
// refresh: function(frm) {
// }
});

View file

@ -1,85 +0,0 @@
{
"autoname": "format:NODE-CONFIG-{#####}",
"creation": "2019-07-30 15:39:28.765991",
"doctype": "DocType",
"engine": "InnoDB",
"field_order": [
"master_node",
"following_doctypes",
"follower_node",
"rules_section",
"allow_creation",
"allow_update",
"allow_deletion"
],
"fields": [
{
"fieldname": "follower_node",
"fieldtype": "Link",
"in_list_view": 1,
"label": "Consumer Node",
"options": "Node",
"reqd": 1
},
{
"fieldname": "master_node",
"fieldtype": "Link",
"in_list_view": 1,
"label": "Producer Node",
"options": "Node",
"reqd": 1
},
{
"fieldname": "rules_section",
"fieldtype": "Section Break",
"label": "Rules for Consumer"
},
{
"default": "0",
"fieldname": "allow_creation",
"fieldtype": "Check",
"label": "Allow Creation"
},
{
"default": "0",
"fieldname": "allow_update",
"fieldtype": "Check",
"label": "Allow Update"
},
{
"default": "0",
"fieldname": "allow_deletion",
"fieldtype": "Check",
"label": "Allow Deletion"
},
{
"fieldname": "following_doctypes",
"fieldtype": "Table",
"label": "Following DocTypes",
"options": "Node Configuration DocType",
"reqd": 1
}
],
"modified": "2019-08-26 14:03:17.668802",
"modified_by": "Administrator",
"module": "Events Streaming",
"name": "Node Configuration",
"owner": "Administrator",
"permissions": [
{
"create": 1,
"delete": 1,
"email": 1,
"export": 1,
"print": 1,
"read": 1,
"report": 1,
"role": "System Manager",
"share": 1,
"write": 1
}
],
"sort_field": "modified",
"sort_order": "DESC",
"track_changes": 1
}

View file

@ -1,148 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2019, Frappe Technologies Pvt. Ltd. and contributors
# For license information, please see license.txt
from __future__ import unicode_literals
import frappe
from frappe import _
import json
from frappe.model.document import Document
from frappe.frappeclient import FrappeClient
from frappe.model.document import check_doctype_has_followers
from frappe.desk.form.linked_with import get_linked_doctypes
class NodeConfiguration(Document):
def before_insert(self):
config_exists = frappe.db.get_all(
doctype = 'Node Configuration',
filters = {
'master_node': self.master_node,
'follower_node': self.follower_node
}
)
if config_exists:
frappe.throw(_('Node configuration already exists'))
@frappe.whitelist()
def pull_master_data():
'''Fetch data from remote master node.'''
for node_config in frappe.get_all('Node Configuration', {'follower_node': get_current_node().name}):
pull_from_node(node_config.name)
return 'success'
def pull_from_node(node_config_name):
config = frappe.get_doc('Node Configuration', node_config_name)
master = get_master(config)
last_update = frappe.db.get_value('Node', config.follower_node, 'last_updated')
doctypes = []
for entry in config.following_doctypes:
doctypes.append(entry.ref_doctype)
updates = get_updates(master, last_update, doctypes)
for update in updates:
if update.update_type == 'Create':
set_insert(update, master)
if update.update_type == 'Update':
set_update(update, master)
if update.update_type == 'Delete':
set_delete(update)
frappe.db.set_value('Node', config.follower_node, 'last_updated', update.name)
frappe.db.commit()
def set_insert(update, master):
if frappe.db.get_value(update.ref_doctype, update.docname):
# doc already created
return
else:
doc = frappe.get_doc(json.loads(update.data))
check_doc_has_dependencies(doc, master)
frappe.get_doc(json.loads(update.data)).insert(set_name=update.docname)
def set_update(update, master):
local_doc = get_local_doc(update)
data = json.loads(update.get('data'))
data.pop('name')
local_doc.update(data)
local_doc.db_update_all()
def set_delete(update):
local_doc = get_local_doc(update)
if local_doc:
local_doc.delete()
def get_updates(master, last_update, doctypes):
last_update_timestamp = master.get_value('Update Log', 'creation', {'name': last_update}).get('creation')
docs = master.get_list(
doctype = 'Update Log',
filters = {'creation': ('>', last_update_timestamp), 'ref_doctype': ('in', doctypes)},
fields = ['update_type', 'ref_doctype', 'docname', 'data', 'name']
)
docs.reverse()
return [frappe._dict(d) for d in docs]
def get_master(config):
master = FrappeClient(config.master_node, 'Administrator', 'root')
return master
def get_local_doc(update):
try:
return frappe.get_doc(update.ref_doctype, update.docname)
except frappe.DoesNotExistError:
return
def get_current_node():
current_node = frappe.utils.get_url()
parts = current_node.split(':')
if not len(parts) > 2:
port = frappe.conf.http_port or frappe.conf.webserver_port
current_node += ':' + str(port)
return frappe.get_doc('Node', current_node)
def check_doc_has_dependencies(doc, master):
'''Sync child table link fields first,
then sync link fields,
then dynamic links'''
meta = frappe.get_meta(doc.doctype)
table_fields = meta.get_table_fields()
link_fields = meta.get_link_fields()
dl_fields = meta.get_dynamic_link_fields()
if table_fields:
sync_child_table_dependencies(doc, table_fields, master)
if link_fields:
sync_link_dependencies(doc, link_fields, master)
if dl_fields:
sync_dynamic_link_dependencies(doc, dl_fields, master)
def sync_child_table_dependencies(doc, table_fields, master):
for df in table_fields:
child_table = doc.get(df.fieldname)
for entry in child_table:
set_dependencies(entry, frappe.get_meta(entry.doctype).get_link_fields(), master)
def sync_link_dependencies(doc, link_fields, master):
set_dependencies(doc, link_fields, master)
def sync_dynamic_link_dependencies(doc, dl_fields, master):
for df in dl_fields:
docname = doc.get(df.fieldname)
linked_doctype = doc.get(df.options)
if docname and not check_dependency_fulfilled(linked_doctype, docname):
master_doc = master.get_doc(linked_doctype, docname)
frappe.get_doc(master_doc).insert(set_name=docname)
def set_dependencies(doc, link_fields, master):
for df in link_fields:
docname = doc.get(df.fieldname)
linked_doctype = df.get_link_doctype()
if docname and not check_dependency_fulfilled(linked_doctype, docname):
master_doc = master.get_doc(linked_doctype, docname)
frappe.get_doc(master_doc).insert(set_name=docname)
def check_dependency_fulfilled(linked_doctype, docname):
return frappe.db.exists(linked_doctype, docname)

View file

@ -1,143 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2019, Frappe Technologies Pvt. Ltd. and Contributors
# See license.txt
from __future__ import unicode_literals
import frappe
import unittest
import time
from frappe import _
from frappe.commands.site import new_site
from frappe.frappeclient import FrappeClient
from frappe.events_streaming.doctype.node_configuration.node_configuration import pull_master_data, get_current_node
class TestNodeConfiguration(unittest.TestCase):
def test_insert(self):
master = self.get_client()
master_doc = self.insert_into_master(master, 'test creation 1 sync')
pull_master_data()
time.sleep(1)
self.assertTrue(frappe.db.exists('ToDo', master_doc.name))
def test_update(self):
master = self.get_client()
master_doc = self.insert_into_master(master, 'test update 1')
master_doc['description'] = 'test update 2'
master_doc = master.update(master_doc)
pull_master_data()
time.sleep(1)
local_doc = frappe.get_doc(master_doc.doctype, master_doc.name)
self.assertEqual(local_doc.description, master_doc.description)
def test_delete(self):
master = self.get_client()
master_doc = self.insert_into_master(master, 'test delete sync')
pull_master_data()
time.sleep(1)
self.assertTrue(frappe.db.exists('ToDo', master_doc.name))
master.delete('ToDo', master_doc.name)
pull_master_data()
time.sleep(1)
self.assertFalse(frappe.db.exists('ToDo', master_doc.name))
def test_multiple_doctypes_sync(self):
master = self.get_client()
#insert todo and note in master
master_todo = self.insert_into_master(master, 'test multiple doc sync')
master_note1 = frappe.get_doc(dict(doctype='Note', title='test multiple doc sync 1'))
master_note1 = master.insert(master_note1)
master_note2 = frappe.get_doc(dict(doctype='Note', title='test multiple doc sync 2'))
master_note2 = master.insert(master_note2)
#update in master
master_todo['description'] = 'test multiple doc update sync'
master_todo = master.update(master_todo)
master_note1['content'] = 'testing update sync'
master_note1 = master.update(master_note1)
master.delete('Note', master_note2.name)
pull_master_data()
time.sleep(1)
#check inserted
self.assertTrue(frappe.db.exists('ToDo', master_todo.name))
#check update
local_todo = frappe.get_doc('ToDo', master_todo.name)
self.assertEqual(local_todo.description, master_todo.description)
local_note1 = frappe.get_doc('Note', master_note1.name)
self.assertEqual(local_note1.content, master_note1.content)
#check delete
self.assertFalse(frappe.db.exists('Note', master_note2.name))
def test_child_table_dependencies_fulfilled(self):
master = self.get_client()
master_user = frappe.get_doc(dict(doctype='User', email='test_user@sync.com', first_name='Test Sync User'))
email_account = make_email_account_in_master(master, _('_Test Sync Email Account 1'), _('test-password1@example.com'))
master_user.user_emails = []
master_user.append('user_emails', {
'email_account': email_account.name
})
master_user = master.insert(master_user)
pull_master_data()
time.sleep(1)
self.assertTrue(frappe.db.exists('Email Account', email_account.name))
if self.assertTrue(frappe.db.exists('User', master_user.name)):
local_user = frappe.get_doc('User', master_user.name)
self.assertEqual(len(local_user.user_emails), 3)
def test_child_table_sync(self):
master = self.get_client()
master_doc = frappe.get_doc(dict(doctype='Node Configuration', master_node='http://test-site:8004', follower_node='http://test-site:8004'))
master_doc.following_doctypes = []
master_doc.append('following_doctypes', {
'ref_doctype': 'ToDo'
})
master_doc.append('following_doctypes', {
'ref_doctype': 'Note'
})
master_doc = master.insert(master_doc)
pull_master_data()
time.sleep(1)
if frappe.db.exists('Node Configuration', master_doc.name):
local_doc = frappe.get_doc('Node Configuration', master_doc.name)
self.assertEqual(len(local_doc.following_doctypes), 2)
def test_dynamic_link_dependencies_synced(self):
master = self.get_client()
master_link_doc = frappe.get_doc(dict(doctype='Note', title='Test Dynamic Link 1'))
master_link_doc = master.insert(master_link_doc)
master_doc = frappe.get_doc(dict(doctype='ToDo', description='Test Dynamic Link 2', assigned_by='Administrator', reference_type='Note', reference_name=master_link_doc.name))
master_doc = master.insert(master_doc)
pull_master_data()
time.sleep(1)
#check dynamic link dependency created
self.assertTrue(frappe.db.exists('Note', master_link_doc.name))
self.assertEqual(master_link_doc.name, frappe.db.get_value('ToDo', master_doc.name, 'reference_name'))
def insert_into_master(self, master, description):
#create and insert todo on remote site
todo = frappe.get_doc(dict(doctype='ToDo', description=description, assigned_by='Administrator'))
return master.insert(todo)
def get_client(self):
#connect to remote master
return FrappeClient('http://test-site-2:8004', 'Administrator', 'root')
def make_email_account_in_master(master, name, email_id):
doc = frappe.get_doc(dict(
doctype='Email Account',
domain='example.com',
email_account_name=name,
append_to='Communication',
smtp_server='test.example.com',
pop3_server='pop.test.example.com',
email_id=email_id,
password='password',
))
return master.insert(doc)

View file

@ -121,7 +121,7 @@ def delete_doc(doctype=None, name=None, force=0, ignore_doctypes=None, for_reloa
# delete tag link entry
delete_tags_for_document(doc)
# update log if doctype has followers
# update log if doctype has event consumers
if not frappe.flags.in_install and not frappe.flags.in_migrate:
make_update_log(doc, update_type = 'Delete')

View file

@ -943,7 +943,7 @@ class Document(BaseDocument):
if (self.doctype, self.name) in frappe.flags.currently_saving:
frappe.flags.currently_saving.remove((self.doctype, self.name))
# make update log for doctypes having followers
# make update log for doctypes having event consumers
if not frappe.flags.in_install and not frappe.flags.in_migrate:
if self.flags.update_log_for_doc_creation:
make_update_log(self, update_type = 'Create')
@ -1277,9 +1277,9 @@ def execute_action(doctype, name, action, **kwargs):
doc.notify_update()
def make_update_log(doc, update_type):
'''Save update info for doctypes that have followers'''
doctype_has_followers = check_doctype_has_followers(doc.doctype)
if doctype_has_followers:
'''Save update info for doctypes that have event consumers'''
doctype_has_consumers = check_doctype_has_consumers(doc.doctype)
if doctype_has_consumers:
if update_type != 'Delete':
data = frappe.as_json(doc)
else:
@ -1294,11 +1294,11 @@ def make_update_log(doc, update_type):
log_doc.insert(ignore_permissions = True)
frappe.db.commit()
def check_doctype_has_followers(doctype):
node_configs = frappe.get_all(doctype = 'Node Configuration')
for node_config in node_configs:
config = frappe.get_doc('Node Configuration', node_config.name)
for entry in config.following_doctypes:
def check_doctype_has_consumers(doctype):
event_consumers = frappe.get_all(doctype = 'Event Consumer')
for event_consumer in event_consumers:
consumer = frappe.get_doc('Event Consumer', event_consumer.name)
for entry in consumer.subscribed_doctypes:
if doctype == entry.ref_doctype:
return True
return False

View file

@ -41,8 +41,8 @@
<li><a href="#background_jobs">
{%= __("Background Jobs") %}</a></li>
<li class="divider"></li>
<li><a href="#" onclick="return frappe.ui.toolbar.pull_master_data();">
{%= __("Pull Master Data") %}</a></li>
<li><a href="#" onclick="return frappe.ui.toolbar.pull_producer_data();">
{%= __("Pull Producer Data") %}</a></li>
<li><a href="#" onclick="return frappe.app.logout();">
{%= __("Logout") %}</a></li>
</ul>

View file

@ -301,16 +301,16 @@ frappe.ui.toolbar.setup_session_defaults = function() {
});
};
frappe.ui.toolbar.pull_master_data = function() {
frappe.ui.toolbar.pull_producer_data = function() {
frappe.call({
method: 'frappe.events_streaming.doctype.node_configuration.node_configuration.pull_master_data',
method: 'frappe.events_streaming.doctype.event_producer.event_producer.pull_producer_data',
callback: function(r) {
if(r.message == 'success') {
frappe.show_alert({message:'Successfully pulled master data', indicator:'green'});
location.reload(true);
}
else if(r.message == 'failed') {
frappe.show_alert({message:'Failed to pull master data', indicator:'red'});
frappe.show_alert({message:'Failed to pull producer data', indicator:'red'});
location.reload(true);
}
else if(r.message == 'no updates') {