diff --git a/frappe/events_streaming/doctype/event_producer/event_producer.py b/frappe/events_streaming/doctype/event_producer/event_producer.py index cf805e0dee..d90152e438 100644 --- a/frappe/events_streaming/doctype/event_producer/event_producer.py +++ b/frappe/events_streaming/doctype/event_producer/event_producer.py @@ -7,6 +7,7 @@ import frappe import json import time import requests +from six import iteritems from frappe import _ from frappe.model.document import Document from frappe.frappeclient import FrappeClient @@ -205,10 +206,34 @@ def set_insert(update, producer_site, event_producer): def set_update(update, producer_site): local_doc = get_local_doc(update) if local_doc: - update.data.pop('name') - check_doc_has_dependencies(local_doc, producer_site) - local_doc.update(update.data) - local_doc.db_update_all() + data = frappe._dict(update.data) + + try: + if data.changed: + local_doc.update(data.changed) + local_doc.db_update_all() + + if data.removed: + for tablename, rownames in iteritems(data.removed): + for row in rownames: + table = local_doc.get_table_field_doctype(tablename) + frappe.db.delete(table, row) + + if data.added: + for tablename, rows in iteritems(data.added): + for row in rows: + local_doc.append(tablename, data.added) + local_doc.db_update() + + if data.row_changed: + for tablename, rows in iteritems(data.row_changed): + for row in rows: + table = local_doc.get_table_field_doctype(tablename) + child_doc = frappe.get_doc(table, row.get('name')) + child_doc.update(row) + + except Exception as e: + check_doc_has_dependencies(local_doc, producer_site) def set_delete(update): local_doc = get_local_doc(update) @@ -231,7 +256,7 @@ def get_local_doc(update): else: return frappe.get_doc(update.ref_doctype, update.docname) except frappe.DoesNotExistError: - return + return def check_doc_has_dependencies(doc, producer_site): '''Sync child table link fields first, diff --git a/frappe/events_streaming/doctype/update_log/update_log.py b/frappe/events_streaming/doctype/update_log/update_log.py index 909e8a1ac9..b8430e3b4e 100644 --- a/frappe/events_streaming/doctype/update_log/update_log.py +++ b/frappe/events_streaming/doctype/update_log/update_log.py @@ -7,6 +7,7 @@ import frappe import requests from frappe.model.document import Document from frappe.utils.background_jobs import get_jobs +from frappe.model import no_value_fields, table_fields class UpdateLog(Document): def after_insert(self): @@ -14,4 +15,73 @@ class UpdateLog(Document): enqueued_method = 'frappe.events_streaming.doctype.event_consumer.event_consumer.notify_event_consumers' jobs = get_jobs() if not jobs or enqueued_method not in jobs[frappe.local.site]: - frappe.enqueue(enqueued_method, queue='long', enqueue_after_commit=True) \ No newline at end of file + frappe.enqueue(enqueued_method, queue='long', enqueue_after_commit=True) + +def get_update(old, new, for_child=False): + '''Get document objects with updates only + + If there is a change, then returns a dict like: + + { + "changed" : {fieldname1: new_value1, fieldname2: new_value2, }, + "added" : {table_fieldname1: [{row_dict1}, {row_dict2}], }, + "removed" : {table_fieldname1: [row_name1, row_name2], }, + "row_changed": {table_fieldname1: + { + child_fieldname1: new_val, + child_fieldname2: new_val + }, + }, + + }''' + if not new: + return None + + out = frappe._dict(changed = {}, added = {}, removed = {}, row_changed = {}) + for df in new.meta.fields: + if df.fieldtype in no_value_fields and df.fieldtype not in table_fields: + continue + + old_value, new_value = old.get(df.fieldname), new.get(df.fieldname) + + if df.fieldtype in table_fields: + # make maps + old_row_by_name, new_row_by_name = {}, {} + for d in old_value: + old_row_by_name[d.name] = d + for d in new_value: + new_row_by_name[d.name] = d + + # check rows for additions, changes + for i, d in enumerate(new_value): + if d.name in old_row_by_name: + diff = get_update(old_row_by_name[d.name], d, for_child=True) + if diff and diff.changed: + if not out.row_changed.get(df.fieldname): + out.row_changed[df.fieldname] = [] + diff.changed['name'] = d.name + out.row_changed[df.fieldname].append(diff.changed) + else: + if not out.added.get(df.fieldname): + out.added[df.fieldname] = [] + out.added[df.fieldname].append(d.as_dict()) + + # check for deletions + for d in old_value: + if not d.name in new_row_by_name: + if not out.removed.get(df.fieldname): + out.removed[df.fieldname] = [] + out.removed[df.fieldname].append(d.name) + + elif (old_value != new_value): + out.changed[df.fieldname] = new_value + + # docstatus + if not for_child and old.docstatus != new.docstatus: + out.changed.append(['docstatus', old.docstatus, new.docstatus]) + + if any((out.changed, out.added, out.removed, out.row_changed)): + return out + + else: + return None \ No newline at end of file diff --git a/frappe/model/document.py b/frappe/model/document.py index ac07759be6..c392c38aac 100644 --- a/frappe/model/document.py +++ b/frappe/model/document.py @@ -949,7 +949,12 @@ class Document(BaseDocument): make_update_log(self, update_type = 'Create') self.flags.create_type_update_log = False else: - make_update_log(self, update_type = 'Update') + from frappe.events_streaming.doctype.update_log.update_log import get_update + diff = get_update(self._doc_before_save, self) + if diff: + doc = self + doc.diff = diff + make_update_log(doc, update_type = 'Update') self.latest = None @@ -1281,7 +1286,8 @@ def make_update_log(doc, update_type): doctype_has_consumers = check_doctype_has_consumers(doc.doctype) if doctype_has_consumers: if update_type != 'Delete': - data = frappe.as_json(doc) + #diff for update type, doc for create type + data = frappe.as_json(doc) if not doc.get('diff') else frappe.as_json(doc.diff) else: data = None log_doc = frappe.get_doc({