feat(Events Streaming): document mapping for insert, update and delete

This commit is contained in:
Rucha Mahabal 2019-10-07 11:16:46 +05:30
parent 0d747c9d02
commit f5aea94c36
2 changed files with 54 additions and 27 deletions

View file

@ -3,8 +3,17 @@
# For license information, please see license.txt
from __future__ import unicode_literals
# import frappe
import frappe
import json
from frappe.model.document import Document
class DocumentTypeMapping(Document):
pass
def get_mapped_doc(self, update):
doc = frappe._dict(json.loads(update))
for mapping in self.field_mapping:
if doc.get(mapping.remote_fieldname):
#copy value into local fieldname key and remove remote fieldname key
doc[mapping.local_fieldname] = doc[mapping.remote_fieldname]
doc.pop(mapping.remote_fieldname, None)
doc['doctype'] = self.local_doctype
return frappe.as_json(doc)

View file

@ -102,29 +102,42 @@ def pull_from_node(event_producer):
producer_site = get_producer_site(event_producer.producer_url)
last_update = event_producer.last_update
doctypes = []
for entry in event_producer.event_configuration:
doctypes.append(entry.ref_doctype)
(doctypes, mapping_config, naming_config) = get_config(event_producer.event_configuration)
updates = get_updates(producer_site, last_update, doctypes)
for update in updates:
update.use_same_name = naming_config.get(update.ref_doctype)
mapping = mapping_config.get(update.ref_doctype)
if mapping:
update = get_mapped_update(mapping, update)
if not update.update_type == 'Delete':
update.data = json.loads(update.data)
sync(update, producer_site, event_producer)
def sync(update, producer_site, event_producer, in_retry=False):
(use_same_name, has_mapping, mapping) = frappe.db.get_value(
doctype = 'Event Configuration',
filters = {'parent': event_producer.name, 'ref_doctype': update.ref_doctype},
fieldname = ['use_same_name', 'has_mapping', 'mapping'],
)
def get_config(event_config):
doctypes, mapping_config, naming_config = [], {}, {}
for entry in event_config:
if entry.has_mapping:
(mapped_doctype, mapping) = frappe.db.get_value('Document Type Mapping', entry.mapping, ['remote_doctype', 'name'])
mapping_config[mapped_doctype] = mapping
naming_config[mapped_doctype] = entry.use_same_name
doctypes.append(mapped_doctype)
else:
naming_config[entry.ref_doctype] = entry.use_same_name
doctypes.append(entry.ref_doctype)
return (doctypes, mapping_config, naming_config)
def sync(update, producer_site, event_producer, in_retry=False):
try:
if update.update_type == 'Create':
set_insert(update, producer_site, use_same_name, event_producer.name)
set_insert(update, producer_site, event_producer.name)
if update.update_type == 'Update':
set_update(update, producer_site, use_same_name)
set_update(update, producer_site)
if update.update_type == 'Delete':
set_delete(update, use_same_name)
set_delete(update)
if in_retry:
return 'Synced'
log_event_sync(update, event_producer.name, 'Synced')
@ -137,14 +150,14 @@ def sync(update, producer_site, event_producer, in_retry=False):
frappe.db.set_value('Event Producer', event_producer.name, 'last_update', update.name)
frappe.db.commit()
def set_insert(update, producer_site, use_same_name, event_producer):
def set_insert(update, producer_site, event_producer):
if frappe.db.get_value(update.ref_doctype, update.docname):
# doc already created
return
else:
doc = frappe.get_doc(json.loads(update.data))
doc = frappe.get_doc(update.data)
check_doc_has_dependencies(doc, producer_site)
if use_same_name:
if update.use_same_name:
doc.insert(set_name=update.docname)
else:
#if event consumer is not saving documents with the same name as the producer
@ -152,17 +165,15 @@ def set_insert(update, producer_site, use_same_name, event_producer):
local_doc = doc.insert()
set_custom_fields(local_doc, update.docname, event_producer)
def set_update(update, producer_site, use_same_name):
local_doc = get_local_doc(update, use_same_name)
data = json.loads(update.get('data'))
data.pop('name')
def set_update(update, producer_site):
local_doc = get_local_doc(update)
if local_doc:
check_doc_has_dependencies(local_doc, producer_site)
local_doc.update(data)
local_doc.update(update.data)
local_doc.db_update_all()
def set_delete(update, use_same_name):
local_doc = get_local_doc(update, use_same_name)
def set_delete(update):
local_doc = get_local_doc(update)
if local_doc:
local_doc.delete()
@ -180,9 +191,9 @@ def get_updates(producer_site, last_update, doctypes):
docs.reverse()
return [frappe._dict(d) for d in docs]
def get_local_doc(update, use_same_name):
def get_local_doc(update):
try:
if not use_same_name:
if not update.use_same_name:
return frappe.get_doc(update.ref_doctype, {'remote_docname': update.docname})
else:
return frappe.get_doc(update.ref_doctype, update.docname)
@ -249,11 +260,18 @@ def log_event_sync(update, event_producer, sync_status, error=None):
doc.status = sync_status
doc.event_producer = event_producer
doc.producer_doc = update.docname
doc.data = update.data
doc.data = frappe.as_json(update.data)
if error:
doc.error = error
doc.insert()
def get_mapped_update(mapping, update):
mapping = frappe.get_doc('Document Type Mapping', mapping)
if update.update_type != 'Delete':
update.data = mapping.get_mapped_doc(update.data)
update.ref_doctype = mapping.local_doctype
return update
@frappe.whitelist()
def new_event_notification(producer_url):
'''Pull data from producer when notified'''