fix: anti-pattern code
This commit is contained in:
parent
d2445a6a40
commit
19c69e5316
14 changed files with 181 additions and 110 deletions
|
|
@ -98,24 +98,27 @@ def handle():
|
|||
})
|
||||
frappe.db.commit()
|
||||
|
||||
if frappe.local.request.method=="DELETE":
|
||||
if frappe.local.request.method == "DELETE":
|
||||
# Not checking permissions here because it's checked in delete_doc
|
||||
frappe.delete_doc(doctype, name, ignore_missing=False)
|
||||
frappe.local.response.http_status_code = 202
|
||||
frappe.local.response.message = "ok"
|
||||
frappe.db.commit()
|
||||
|
||||
|
||||
elif doctype:
|
||||
if frappe.local.request.method=="GET":
|
||||
if frappe.local.request.method == "GET":
|
||||
if frappe.local.form_dict.get('fields'):
|
||||
frappe.local.form_dict['fields'] = json.loads(frappe.local.form_dict['fields'])
|
||||
frappe.local.form_dict.setdefault('limit_page_length', 20)
|
||||
frappe.local.response.update({
|
||||
"data": frappe.call(frappe.client.get_list,
|
||||
doctype, **frappe.local.form_dict)})
|
||||
"data": frappe.call(
|
||||
frappe.client.get_list,
|
||||
doctype,
|
||||
**frappe.local.form_dict
|
||||
)
|
||||
})
|
||||
|
||||
if frappe.local.request.method=="POST":
|
||||
if frappe.local.request.method == "POST":
|
||||
if frappe.local.form_dict.data is None:
|
||||
data = json.loads(frappe.safe_decode(frappe.local.request.get_data()))
|
||||
else:
|
||||
|
|
@ -135,24 +138,26 @@ def handle():
|
|||
|
||||
return build_response("json")
|
||||
|
||||
|
||||
def validate_oauth():
|
||||
""" authentication using oauth """
|
||||
from frappe.oauth import get_url_delimiter
|
||||
form_dict = frappe.local.form_dict
|
||||
authorization_header = frappe.get_request_header("Authorization").split(" ") if frappe.get_request_header("Authorization") else None
|
||||
authorization_header = frappe.get_request_header("Authorization", "").split(" ")
|
||||
if authorization_header and authorization_header[0].lower() == "bearer":
|
||||
from frappe.integrations.oauth2 import get_oauth_server
|
||||
token = authorization_header[1]
|
||||
r = frappe.request
|
||||
parsed_url = urlparse(r.url)
|
||||
access_token = { "access_token": token}
|
||||
req = frappe.request
|
||||
parsed_url = urlparse(req.url)
|
||||
access_token = {"access_token": token}
|
||||
uri = parsed_url.scheme + "://" + parsed_url.netloc + parsed_url.path + "?" + urlencode(access_token)
|
||||
http_method = r.method
|
||||
body = r.get_data()
|
||||
headers = r.headers
|
||||
http_method = req.method
|
||||
body = req.get_data()
|
||||
headers = req.headers
|
||||
|
||||
required_scopes = frappe.db.get_value("OAuth Bearer Token", token, "scopes").split(get_url_delimiter())
|
||||
|
||||
valid, oauthlib_request = get_oauth_server().verify_request(uri, http_method, body, headers, required_scopes)
|
||||
valid, _oauthlib_request = get_oauth_server().verify_request(uri, http_method, body, headers, required_scopes)
|
||||
|
||||
if valid:
|
||||
frappe.set_user(frappe.db.get_value("OAuth Bearer Token", token, "user"))
|
||||
|
|
@ -177,8 +182,11 @@ def validate_auth_via_api_keys():
|
|||
except Exception as e:
|
||||
raise e
|
||||
|
||||
|
||||
def validate_api_key_secret(api_key, api_secret, frappe_authorization_source=None):
|
||||
""" frappe_authorization_source to provide api key and secret for a doctype apart from User """
|
||||
"""
|
||||
frappe_authorization_source to provide api key and secret for a doctype apart from User
|
||||
"""
|
||||
doctype = frappe_authorization_source or 'User'
|
||||
doc = frappe.db.get_value(
|
||||
doctype=doctype,
|
||||
|
|
@ -197,4 +205,4 @@ def validate_api_key_secret(api_key, api_secret, frappe_authorization_source=Non
|
|||
else:
|
||||
user = frappe.db.get_value(doctype, doc, 'user')
|
||||
frappe.set_user(user)
|
||||
frappe.local.form_dict = form_dict
|
||||
frappe.local.form_dict = form_dict
|
||||
|
|
|
|||
|
|
@ -6,5 +6,6 @@ from __future__ import unicode_literals
|
|||
# import frappe
|
||||
from frappe.model.document import Document
|
||||
|
||||
|
||||
class DocumentTypeFieldMapping(Document):
|
||||
pass
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ import frappe
|
|||
import json
|
||||
from frappe.model.document import Document
|
||||
|
||||
|
||||
class DocumentTypeMapping(Document):
|
||||
def get_mapped_doc(self, update):
|
||||
doc = frappe._dict(json.loads(update))
|
||||
|
|
@ -16,14 +17,16 @@ class DocumentTypeMapping(Document):
|
|||
if mapping.is_child_table:
|
||||
doc[mapping.local_fieldname] = self.get_mapped_child_table_docs(mapping.child_table_mapping, doc[mapping.remote_fieldname])
|
||||
else:
|
||||
#copy value into local fieldname key and remove remote fieldname key
|
||||
# copy value into local fieldname key and remove remote fieldname key
|
||||
doc[mapping.local_fieldname] = doc[mapping.remote_fieldname]
|
||||
doc.pop(mapping.remote_fieldname, None)
|
||||
|
||||
doc['doctype'] = self.local_doctype
|
||||
return frappe.as_json(doc)
|
||||
|
||||
|
||||
def get_mapped_child_table_docs(child_map, table_entries):
|
||||
"""Get mapping for child doctypes"""
|
||||
child_map = frappe.get_doc('Document Type Mapping', child_map)
|
||||
mapped_entries = []
|
||||
for child_doc in table_entries:
|
||||
|
|
@ -33,4 +36,4 @@ def get_mapped_child_table_docs(child_map, table_entries):
|
|||
child_doc.pop(mapping.remote_fieldname, None)
|
||||
child_doc['doctype'] = child_map.local_doctype
|
||||
mapped_entries.append(child_doc)
|
||||
return mapped_entries
|
||||
return mapped_entries
|
||||
|
|
|
|||
|
|
@ -4,7 +4,6 @@
|
|||
|
||||
from __future__ import unicode_literals
|
||||
import frappe
|
||||
import time
|
||||
import json
|
||||
import requests
|
||||
from frappe.model.document import Document
|
||||
|
|
@ -12,6 +11,7 @@ from frappe.frappeclient import FrappeClient
|
|||
from frappe.utils.data import get_url
|
||||
from frappe.utils.background_jobs import get_jobs
|
||||
|
||||
|
||||
class EventConsumer(Document):
|
||||
def validate(self):
|
||||
if self.in_test:
|
||||
|
|
@ -49,12 +49,14 @@ class EventConsumer(Document):
|
|||
return 'offline'
|
||||
return 'online'
|
||||
|
||||
|
||||
@frappe.whitelist(allow_guest=True)
|
||||
def register_consumer(data):
|
||||
"""create an event consumer document for registering a consumer"""
|
||||
data = json.loads(data)
|
||||
# to ensure that consumer is created only once
|
||||
if frappe.db.exists('Event Consumer', data['event_consumer']):
|
||||
return
|
||||
return None
|
||||
consumer = frappe.new_doc('Event Consumer')
|
||||
consumer.callback_url = data['event_consumer']
|
||||
consumer.user = data['user']
|
||||
|
|
@ -72,7 +74,7 @@ def register_consumer(data):
|
|||
consumer.api_key = api_key
|
||||
consumer.api_secret = api_secret
|
||||
consumer.in_test = data['in_test']
|
||||
consumer.insert(ignore_permissions = True)
|
||||
consumer.insert(ignore_permissions=True)
|
||||
frappe.db.commit()
|
||||
|
||||
# consumer's 'last_update' field should point to the latest update in producer's update log when subscribing
|
||||
|
|
@ -80,7 +82,9 @@ def register_consumer(data):
|
|||
last_update = str(get_last_update())
|
||||
return json.dumps({'api_key': api_key, 'api_secret': api_secret, 'last_update': last_update})
|
||||
|
||||
|
||||
def get_consumer_site(consumer_url):
|
||||
"""create a FrappeClient object for event consumer site"""
|
||||
consumer_doc = frappe.get_doc('Event Consumer', consumer_url)
|
||||
consumer_site = FrappeClient(
|
||||
url=consumer_url,
|
||||
|
|
@ -90,22 +94,28 @@ def get_consumer_site(consumer_url):
|
|||
)
|
||||
return consumer_site
|
||||
|
||||
|
||||
def get_last_update():
|
||||
updates = frappe.get_list('Event Update Log', 'creation', ignore_permissions=True, limit = 1, order_by = 'creation desc')
|
||||
"""get the creation timestamp of last update consumed"""
|
||||
updates = frappe.get_list('Event Update Log', 'creation', ignore_permissions=True, limit=1, order_by='creation desc')
|
||||
if updates:
|
||||
return updates[0].creation
|
||||
return frappe.utils.now_datetime()
|
||||
|
||||
|
||||
@frappe.whitelist()
|
||||
def notify_event_consumers(doctype):
|
||||
"""get all event consumers and set flag for notification status"""
|
||||
event_consumers = frappe.get_all('Event Consumer Document Type', ['parent'], {'ref_doctype': doctype, 'status': 'Approved'})
|
||||
for entry in event_consumers:
|
||||
consumer = frappe.get_doc('Event Consumer', entry.parent)
|
||||
consumer.flags.notified = False
|
||||
notify(consumer)
|
||||
|
||||
|
||||
@frappe.whitelist()
|
||||
def notify(consumer):
|
||||
"""notify individual event consumers about a new update"""
|
||||
consumer_status = consumer.get_consumer_status()
|
||||
if consumer_status == 'online':
|
||||
try:
|
||||
|
|
@ -120,9 +130,9 @@ def notify(consumer):
|
|||
else:
|
||||
consumer.flags.notified = False
|
||||
|
||||
#enqueue another job if the site was not notified
|
||||
# enqueue another job if the site was not notified
|
||||
if not consumer.flags.notified:
|
||||
enqueued_method = 'frappe.event_streaming.doctype.event_consumer.event_consumer.notify'
|
||||
jobs = get_jobs()
|
||||
if not jobs or enqueued_method not in jobs[frappe.local.site] and not consumer.flags.notifed:
|
||||
frappe.enqueue(enqueued_method, queue = 'long', enqueue_after_commit = True, **{'consumer': consumer})
|
||||
frappe.enqueue(enqueued_method, queue='long', enqueue_after_commit=True, **{'consumer': consumer})
|
||||
|
|
|
|||
|
|
@ -6,5 +6,6 @@ from __future__ import unicode_literals
|
|||
# import frappe
|
||||
from frappe.model.document import Document
|
||||
|
||||
|
||||
class EventConsumerDocumentType(Document):
|
||||
pass
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@ from frappe.utils.data import get_url
|
|||
from frappe.custom.doctype.custom_field.custom_field import create_custom_field
|
||||
from frappe.integrations.oauth2 import validate_url
|
||||
|
||||
|
||||
class EventProducer(Document):
|
||||
def before_insert(self):
|
||||
self.incoming_change = True
|
||||
|
|
@ -35,21 +36,21 @@ class EventProducer(Document):
|
|||
self.update_event_consumer()
|
||||
self.create_custom_fields()
|
||||
else:
|
||||
#when producer doc is updated it updates the consumer doc, set flag to avoid deadlock
|
||||
# when producer doc is updated it updates the consumer doc, set flag to avoid deadlock
|
||||
frappe.db.set_value(self.doctype, self.name, 'incoming_change', 0)
|
||||
|
||||
def create_event_consumer(self):
|
||||
'''register event consumer on the producer site'''
|
||||
"""register event consumer on the producer site"""
|
||||
if self.is_producer_online():
|
||||
producer_site = FrappeClient(self.producer_url, verify=False)
|
||||
response = producer_site.post_api(
|
||||
'frappe.event_streaming.doctype.event_consumer.event_consumer.register_consumer',
|
||||
params = {'data': json.dumps(self.get_request_data())}
|
||||
params={'data': json.dumps(self.get_request_data())}
|
||||
)
|
||||
if response:
|
||||
response = json.loads(response)
|
||||
self.api_key = response['api_key']
|
||||
self.api_secret = response['api_secret']
|
||||
self.api_secret = response['api_secret']
|
||||
self.last_update = response['last_update']
|
||||
else:
|
||||
frappe.throw(_('Failed to create an Event Consumer or an Event Consumer for the current site is already registered.'))
|
||||
|
|
@ -58,7 +59,7 @@ class EventProducer(Document):
|
|||
consumer_doctypes = []
|
||||
for entry in self.producer_doctypes:
|
||||
if entry.has_mapping:
|
||||
#if it has mapping then on event consumer's site it should subscribe to remote doctype
|
||||
# if it has mapping then on event consumer's site it should subscribe to remote doctype
|
||||
consumer_doctypes.append(frappe.db.get_value('Document Type Mapping', entry.mapping, 'remote_doctype'))
|
||||
else:
|
||||
consumer_doctypes.append(entry.ref_doctype)
|
||||
|
|
@ -71,7 +72,7 @@ class EventProducer(Document):
|
|||
}
|
||||
|
||||
def create_custom_fields(self):
|
||||
'''create custom field to store remote docname and remote site url'''
|
||||
"""create custom field to store remote docname and remote site url"""
|
||||
for entry in self.producer_doctypes:
|
||||
if not entry.use_same_name:
|
||||
if not frappe.db.exists('Custom Field', {'fieldname': 'remote_docname', 'dt': entry.ref_doctype}):
|
||||
|
|
@ -90,7 +91,7 @@ class EventProducer(Document):
|
|||
event_consumer.consumer_doctypes = []
|
||||
for entry in self.producer_doctypes:
|
||||
if entry.has_mapping:
|
||||
#if it has mapping then on event consumer's site it should subscribe to remote doctype
|
||||
# if it has mapping then on event consumer's site it should subscribe to remote doctype
|
||||
ref_doctype = frappe.db.get_value('Document Type Mapping', entry.mapping, 'remote_doctype')
|
||||
else:
|
||||
ref_doctype = entry.ref_doctype
|
||||
|
|
@ -106,9 +107,9 @@ class EventProducer(Document):
|
|||
producer_site.update(event_consumer)
|
||||
|
||||
def is_producer_online(self):
|
||||
'''check connection status for the Event Producer site'''
|
||||
"""check connection status for the Event Producer site"""
|
||||
retry = 3
|
||||
while(retry > 0):
|
||||
while retry > 0:
|
||||
res = requests.get(self.producer_url)
|
||||
if res.status_code == 200:
|
||||
return True
|
||||
|
|
@ -116,7 +117,9 @@ class EventProducer(Document):
|
|||
time.sleep(5)
|
||||
frappe.throw(_('Failed to connect to the Event Producer site. Retry after some time.'))
|
||||
|
||||
|
||||
def get_producer_site(producer_url):
|
||||
"""create a FrappeClient object for event producer site"""
|
||||
producer_doc = frappe.get_doc('Event Producer', producer_url)
|
||||
producer_site = FrappeClient(
|
||||
url=producer_url,
|
||||
|
|
@ -126,23 +129,30 @@ def get_producer_site(producer_url):
|
|||
)
|
||||
return producer_site
|
||||
|
||||
|
||||
def get_approval_status(config, ref_doctype):
|
||||
"""check whether the doctype has been marked approved, rejected or pending for consumption"""
|
||||
for entry in config:
|
||||
if entry.get('ref_doctype') == ref_doctype:
|
||||
return entry.get('status')
|
||||
return 'Pending'
|
||||
|
||||
|
||||
@frappe.whitelist()
|
||||
def pull_producer_data():
|
||||
'''Fetch data from producer node.'''
|
||||
"""Fetch data from producer node."""
|
||||
response = requests.get(get_url())
|
||||
if response.status_code == 200:
|
||||
for event_producer in frappe.get_all('Event Producer'):
|
||||
pull_from_node(event_producer.name)
|
||||
return 'success'
|
||||
else:
|
||||
return None
|
||||
|
||||
|
||||
@frappe.whitelist()
|
||||
def pull_from_node(event_producer):
|
||||
"""pull all updates after the last update timestamp from event producer site"""
|
||||
event_producer = frappe.get_doc('Event Producer', event_producer)
|
||||
producer_site = get_producer_site(event_producer.producer_url)
|
||||
last_update = event_producer.last_update
|
||||
|
|
@ -162,7 +172,9 @@ def pull_from_node(event_producer):
|
|||
|
||||
sync(update, producer_site, event_producer)
|
||||
|
||||
|
||||
def get_config(event_config):
|
||||
"""get the doctype mapping and naming configurations for consumption"""
|
||||
doctypes, mapping_config, naming_config = [], {}, {}
|
||||
|
||||
for entry in event_config:
|
||||
|
|
@ -177,7 +189,9 @@ def get_config(event_config):
|
|||
doctypes.append(entry.ref_doctype)
|
||||
return (doctypes, mapping_config, naming_config)
|
||||
|
||||
|
||||
def sync(update, producer_site, event_producer, in_retry=False):
|
||||
"""Sync the individual update"""
|
||||
try:
|
||||
if update.update_type == 'Create':
|
||||
set_insert(update, producer_site, event_producer.name)
|
||||
|
|
@ -197,7 +211,9 @@ def sync(update, producer_site, event_producer, in_retry=False):
|
|||
frappe.db.set_value('Event Producer', event_producer.name, 'last_update', update.creation)
|
||||
frappe.db.commit()
|
||||
|
||||
|
||||
def set_insert(update, producer_site, event_producer):
|
||||
"""Sync insert type update"""
|
||||
if frappe.db.get_value(update.ref_doctype, update.docname):
|
||||
# doc already created
|
||||
return
|
||||
|
|
@ -206,12 +222,14 @@ def set_insert(update, producer_site, event_producer):
|
|||
if update.use_same_name:
|
||||
doc.insert(set_name=update.docname, set_child_names=False)
|
||||
else:
|
||||
#if event consumer is not saving documents with the same name as the producer
|
||||
#store the remote docname in a custom field for future updates
|
||||
# if event consumer is not saving documents with the same name as the producer
|
||||
# store the remote docname in a custom field for future updates
|
||||
local_doc = doc.insert(set_child_names=False)
|
||||
set_custom_fields(local_doc, update.docname, event_producer)
|
||||
|
||||
|
||||
def set_update(update, producer_site):
|
||||
"""Sync update type update"""
|
||||
local_doc = get_local_doc(update)
|
||||
try:
|
||||
if local_doc:
|
||||
|
|
@ -232,13 +250,17 @@ def set_update(update, producer_site):
|
|||
except frappe.DoesNotExistError:
|
||||
sync_dependencies(local_doc, producer_site)
|
||||
|
||||
|
||||
def update_row_removed(local_doc, removed):
|
||||
"""Sync child table row deletion type update"""
|
||||
for tablename, rownames in iteritems(removed):
|
||||
table = local_doc.get_table_field_doctype(tablename)
|
||||
for row in rownames:
|
||||
frappe.db.delete(table, row)
|
||||
|
||||
|
||||
def update_row_changed(local_doc, changed):
|
||||
"""Sync child table row updation type update"""
|
||||
for tablename, rows in iteritems(changed):
|
||||
old = local_doc.get(tablename)
|
||||
for doc in old:
|
||||
|
|
@ -246,7 +268,9 @@ def update_row_changed(local_doc, changed):
|
|||
if row['name'] == doc.get('name'):
|
||||
doc.update(row)
|
||||
|
||||
|
||||
def update_row_added(local_doc, added):
|
||||
"""Sync child table row addition type update"""
|
||||
for tablename, rows in iteritems(added):
|
||||
local_doc.extend(tablename, rows)
|
||||
for child in rows:
|
||||
|
|
@ -254,12 +278,16 @@ def update_row_added(local_doc, added):
|
|||
child_doc.insert(set_name=child_doc.name, set_child_names=False)
|
||||
return local_doc
|
||||
|
||||
|
||||
def set_delete(update):
|
||||
"""Sync delete type update"""
|
||||
local_doc = get_local_doc(update)
|
||||
if local_doc:
|
||||
local_doc.delete()
|
||||
|
||||
|
||||
def get_updates(producer_site, last_update, doctypes):
|
||||
"""Get all updates generated after the last update timestamp"""
|
||||
docs = producer_site.get_list(
|
||||
doctype = 'Event Update Log',
|
||||
filters = {'ref_doctype': ('in', doctypes), 'creation': ('>', last_update)},
|
||||
|
|
@ -268,23 +296,28 @@ def get_updates(producer_site, last_update, doctypes):
|
|||
docs.reverse()
|
||||
return [frappe._dict(d) for d in docs]
|
||||
|
||||
|
||||
def get_local_doc(update):
|
||||
"""Get the local document if created with a different name"""
|
||||
try:
|
||||
if not update.use_same_name:
|
||||
return frappe.get_doc(update.ref_doctype, {'remote_docname': update.docname})
|
||||
return frappe.get_doc(update.ref_doctype, update.docname)
|
||||
except frappe.DoesNotExistError:
|
||||
return
|
||||
return None
|
||||
|
||||
|
||||
def sync_dependencies(document, producer_site):
|
||||
# dependencies is a dictionary to store all the docs having dependencies and their sync status
|
||||
# dependencies is shared among all nested functions
|
||||
"""
|
||||
dependencies is a dictionary to store all the docs having dependencies and their sync status,
|
||||
which is shared among all nested functions.
|
||||
"""
|
||||
dependencies = {document: True}
|
||||
|
||||
def check_doc_has_dependencies(doc, producer_site):
|
||||
'''Sync child table link fields first,
|
||||
"""Sync child table link fields first,
|
||||
then sync link fields,
|
||||
then dynamic links'''
|
||||
then dynamic links"""
|
||||
meta = frappe.get_meta(doc.doctype)
|
||||
table_fields = meta.get_table_fields()
|
||||
link_fields = meta.get_link_fields()
|
||||
|
|
@ -325,7 +358,7 @@ def sync_dependencies(document, producer_site):
|
|||
master_doc.insert(set_name=docname)
|
||||
frappe.db.commit()
|
||||
|
||||
#for dependency inside a dependency
|
||||
# for dependency inside a dependency
|
||||
except Exception:
|
||||
dependencies[master_doc] = True
|
||||
|
||||
|
|
@ -351,7 +384,9 @@ def sync_dependencies(document, producer_site):
|
|||
if not any(list(dependencies.values())[1:]):
|
||||
dependencies[document] = False
|
||||
|
||||
|
||||
def log_event_sync(update, event_producer, sync_status, error=None):
|
||||
"""Log event update received with the sync_status as Synced or Failed"""
|
||||
doc = frappe.new_doc('Event Sync Log')
|
||||
doc.update_type = update.update_type
|
||||
doc.ref_doctype = update.ref_doctype
|
||||
|
|
@ -369,23 +404,28 @@ def log_event_sync(update, event_producer, sync_status, error=None):
|
|||
doc.error = error
|
||||
doc.insert()
|
||||
|
||||
|
||||
def get_mapped_update(update):
|
||||
"""get the new update document with mapped fields"""
|
||||
mapping = frappe.get_doc('Document Type Mapping', update.mapping)
|
||||
if update.update_type != 'Delete':
|
||||
update.data = mapping.get_mapped_doc(update.data)
|
||||
update.ref_doctype = mapping.local_doctype
|
||||
return update
|
||||
|
||||
|
||||
@frappe.whitelist()
|
||||
def new_event_notification(producer_url):
|
||||
'''Pull data from producer when notified'''
|
||||
"""Pull data from producer when notified"""
|
||||
enqueued_method = 'frappe.event_streaming.doctype.event_producer.event_producer.pull_from_node'
|
||||
jobs = get_jobs()
|
||||
if not jobs or enqueued_method not in jobs[frappe.local.site]:
|
||||
frappe.enqueue(enqueued_method, queue = 'default', **{'event_producer': producer_url})
|
||||
frappe.enqueue(enqueued_method, queue='default', **{'event_producer': producer_url})
|
||||
|
||||
|
||||
@frappe.whitelist()
|
||||
def resync(update):
|
||||
"""Retry syncing update if failed"""
|
||||
update = frappe._dict(json.loads(update))
|
||||
if update.mapping:
|
||||
update = get_mapped_update(update)
|
||||
|
|
@ -394,7 +434,8 @@ def resync(update):
|
|||
event_producer = frappe.get_doc('Event Producer', update.event_producer)
|
||||
return sync(update, producer_site, event_producer, in_retry=True)
|
||||
|
||||
|
||||
def set_custom_fields(local_doc, remote_docname, remote_site_name):
|
||||
'''sets custom field in doc for storing remote docname'''
|
||||
"""sets custom field in doc for storing remote docname"""
|
||||
frappe.db.set_value(local_doc.doctype, local_doc.name, 'remote_docname', remote_docname)
|
||||
frappe.db.set_value(local_doc.doctype, local_doc.name, 'remote_site_name', remote_site_name)
|
||||
frappe.db.set_value(local_doc.doctype, local_doc.name, 'remote_site_name', remote_site_name)
|
||||
|
|
|
|||
|
|
@ -6,5 +6,6 @@ from __future__ import unicode_literals
|
|||
# import frappe
|
||||
from frappe.model.document import Document
|
||||
|
||||
|
||||
class EventProducerDocumentType(Document):
|
||||
pass
|
||||
|
|
|
|||
|
|
@ -7,4 +7,4 @@ from __future__ import unicode_literals
|
|||
from frappe.model.document import Document
|
||||
|
||||
class EventSyncLog(Document):
|
||||
pass
|
||||
pass
|
||||
|
|
|
|||
|
|
@ -11,27 +11,29 @@ from frappe.model import no_value_fields, table_fields
|
|||
class EventUpdateLog(Document):
|
||||
pass
|
||||
|
||||
def notify_consumers(doc, method=None):
|
||||
'''Send update notification updates to event consumers whenever update log is generated'''
|
||||
def notify_consumers(doc, _method=None):
|
||||
"""Send update notification updates to event consumers whenever update log is generated"""
|
||||
enqueued_method = 'frappe.event_streaming.doctype.event_consumer.event_consumer.notify_event_consumers'
|
||||
jobs = get_jobs()
|
||||
if not jobs or enqueued_method not in jobs[frappe.local.site]:
|
||||
frappe.enqueue(enqueued_method, doctype=doc.ref_doctype, queue='long', enqueue_after_commit=True)
|
||||
|
||||
def get_update(old, new, for_child=False):
|
||||
'''Get document objects with updates only
|
||||
"""
|
||||
Get document objects with updates only
|
||||
If there is a change, then returns a dict like:
|
||||
{
|
||||
"changed" : {fieldname1: new_value1, fieldname2: new_value2, },
|
||||
"added" : {table_fieldname1: [{row_dict1}, {row_dict2}], },
|
||||
"removed" : {table_fieldname1: [row_name1, row_name2], },
|
||||
"row_changed": {table_fieldname1:
|
||||
{
|
||||
child_fieldname1: new_val,
|
||||
child_fieldname2: new_val
|
||||
},
|
||||
{
|
||||
"changed" : {fieldname1: new_value1, fieldname2: new_value2, },
|
||||
"added" : {table_fieldname1: [{row_dict1}, {row_dict2}], },
|
||||
"removed" : {table_fieldname1: [row_name1, row_name2], },
|
||||
"row_changed" : {table_fieldname1:
|
||||
{
|
||||
child_fieldname1: new_val,
|
||||
child_fieldname2: new_val
|
||||
},
|
||||
}'''
|
||||
},
|
||||
}
|
||||
"""
|
||||
if not new:
|
||||
return None
|
||||
|
||||
|
|
@ -47,7 +49,7 @@ def get_update(old, new, for_child=False):
|
|||
out = check_for_additions(out, df, new_value, old_row_by_name)
|
||||
out = check_for_deletions(out, df, old_value, new_row_by_name)
|
||||
|
||||
elif (old_value != new_value):
|
||||
elif old_value != new_value:
|
||||
out.changed[df.fieldname] = new_value
|
||||
|
||||
out = check_docstatus(out, old, new, for_child)
|
||||
|
|
@ -57,7 +59,7 @@ def get_update(old, new, for_child=False):
|
|||
|
||||
|
||||
def make_maps(old_value, new_value):
|
||||
# make maps
|
||||
"""make maps"""
|
||||
old_row_by_name, new_row_by_name = {}, {}
|
||||
for d in old_value:
|
||||
old_row_by_name[d.name] = d
|
||||
|
|
@ -66,7 +68,7 @@ def make_maps(old_value, new_value):
|
|||
return old_row_by_name, new_row_by_name
|
||||
|
||||
def check_for_additions(out, df, new_value, old_row_by_name):
|
||||
# check rows for additions, changes
|
||||
"""check rows for additions, changes"""
|
||||
for _i, d in enumerate(new_value):
|
||||
if d.name in old_row_by_name:
|
||||
diff = get_update(old_row_by_name[d.name], d, for_child=True)
|
||||
|
|
@ -82,7 +84,7 @@ def check_for_additions(out, df, new_value, old_row_by_name):
|
|||
return out
|
||||
|
||||
def check_for_deletions(out, df, old_value, new_row_by_name):
|
||||
# check for deletions
|
||||
"""check for deletions"""
|
||||
for d in old_value:
|
||||
if d.name not in new_row_by_name:
|
||||
if not out.removed.get(df.fieldname):
|
||||
|
|
@ -91,7 +93,7 @@ def check_for_deletions(out, df, old_value, new_row_by_name):
|
|||
return out
|
||||
|
||||
def check_docstatus(out, old, new, for_child):
|
||||
# docstatus
|
||||
"""docstatus changes"""
|
||||
if not for_child and old.docstatus != new.docstatus:
|
||||
out.changed['docstatus'] = new.docstatus
|
||||
return out
|
||||
return out
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@ class FrappeException(Exception):
|
|||
pass
|
||||
|
||||
class FrappeClient(object):
|
||||
def __init__(self, url, username=None, password=None, verify=True, api_key=None, api_secret=None, frappe_authorization_source = None):
|
||||
def __init__(self, url, username=None, password=None, verify=True, api_key=None, api_secret=None, frappe_authorization_source=None):
|
||||
self.headers = {
|
||||
'Accept': 'application/json',
|
||||
'content-type': 'application/x-www-form-urlencoded',
|
||||
|
|
|
|||
|
|
@ -370,12 +370,12 @@ class BaseDocument(object):
|
|||
raise
|
||||
|
||||
def db_update_all(self):
|
||||
'''Raw update parent + children
|
||||
DOES NOT VALIDATE AND CALL TRIGGERS'''
|
||||
"""Raw update parent + children
|
||||
DOES NOT VALIDATE AND CALL TRIGGERS"""
|
||||
self.db_update()
|
||||
for df in self.meta.get_table_fields():
|
||||
for d in self.get(df.fieldname):
|
||||
d.db_update()
|
||||
for doc in self.get(df.fieldname):
|
||||
doc.db_update()
|
||||
|
||||
def show_unique_validation_message(self, e):
|
||||
# TODO: Find a better way to extract fieldname
|
||||
|
|
@ -398,7 +398,7 @@ class BaseDocument(object):
|
|||
raise frappe.UniqueValidationError(self.doctype, self.name, e)
|
||||
|
||||
def update_modified(self):
|
||||
'''Update modified timestamp'''
|
||||
"""Update modified timestamp"""
|
||||
self.set("modified", now())
|
||||
frappe.db.set_value(self.doctype, self.name, 'modified', self.modified, update_modified=False)
|
||||
|
||||
|
|
@ -445,7 +445,7 @@ class BaseDocument(object):
|
|||
return missing
|
||||
|
||||
def get_invalid_links(self, is_submittable=False):
|
||||
'''Returns list of invalid links and also updates fetch values if not set'''
|
||||
"""Returns list of invalid links and also updates fetch values if not set"""
|
||||
def get_msg(df, docname):
|
||||
if self.parentfield:
|
||||
return "{} #{}: {}: {}".format(_("Row"), self.idx, _(df.label), docname)
|
||||
|
|
@ -673,7 +673,7 @@ class BaseDocument(object):
|
|||
self.set(fieldname, sanitized_value)
|
||||
|
||||
def _save_passwords(self):
|
||||
'''Save password field values in __Auth table'''
|
||||
"""Save password field values in __Auth table"""
|
||||
if self.flags.ignore_save_passwords is True:
|
||||
return
|
||||
|
||||
|
|
|
|||
|
|
@ -123,7 +123,7 @@ def delete_doc(doctype=None, name=None, force=0, ignore_doctypes=None, for_reloa
|
|||
|
||||
# update log if doctype has event consumers
|
||||
if not frappe.flags.in_install and not frappe.flags.in_migrate and check_doctype_has_consumers(doc.doctype):
|
||||
make_event_update_log(doc, update_type = 'Delete')
|
||||
make_event_update_log(doc, update_type='Delete')
|
||||
|
||||
if doc and not for_reload:
|
||||
add_to_deleted_document(doc)
|
||||
|
|
|
|||
|
|
@ -256,7 +256,8 @@ class Document(BaseDocument):
|
|||
if self.get("amended_from"):
|
||||
self.copy_attachments_from_amended_from()
|
||||
|
||||
#flag to prevent creation of event update log for create and update both, during document creation
|
||||
# flag to prevent creation of event update log for create and update both
|
||||
# during document creation
|
||||
self.flags.update_log_for_doc_creation = True
|
||||
self.run_post_save_methods()
|
||||
self.flags.in_insert = False
|
||||
|
|
@ -329,7 +330,7 @@ class Document(BaseDocument):
|
|||
return self
|
||||
|
||||
def copy_attachments_from_amended_from(self):
|
||||
'''Copy attachments from `amended_from`'''
|
||||
"""Copy attachments from `amended_from`"""
|
||||
from frappe.desk.form.load import get_attachments
|
||||
|
||||
#loop through attachments
|
||||
|
|
@ -347,12 +348,12 @@ class Document(BaseDocument):
|
|||
|
||||
|
||||
def update_children(self):
|
||||
'''update child tables'''
|
||||
"""update child tables"""
|
||||
for df in self.meta.get_table_fields():
|
||||
self.update_child_table(df.fieldname, df)
|
||||
|
||||
def update_child_table(self, fieldname, df=None):
|
||||
'''sync child table for given fieldname'''
|
||||
"""sync child table for given fieldname"""
|
||||
rows = []
|
||||
if not df:
|
||||
df = self.meta.get_field(fieldname)
|
||||
|
|
@ -404,7 +405,7 @@ class Document(BaseDocument):
|
|||
self.flags.name_set = True
|
||||
|
||||
def get_title(self):
|
||||
'''Get the document title based on title_field or `title` or `name`'''
|
||||
"""Get the document title based on title_field or `title` or `name`"""
|
||||
return self.get(self.meta.get_title_field())
|
||||
|
||||
def set_title_field(self):
|
||||
|
|
@ -487,13 +488,13 @@ class Document(BaseDocument):
|
|||
self.validate_set_only_once()
|
||||
|
||||
def validate_workflow(self):
|
||||
'''Validate if the workflow transition is valid'''
|
||||
"""Validate if the workflow transition is valid"""
|
||||
if frappe.flags.in_install == 'frappe': return
|
||||
if self.meta.get_workflow():
|
||||
validate_workflow(self)
|
||||
|
||||
def validate_set_only_once(self):
|
||||
'''Validate that fields are not changed if not in insert'''
|
||||
"""Validate that fields are not changed if not in insert"""
|
||||
set_only_once_fields = self.meta.get_set_only_once_fields()
|
||||
|
||||
if set_only_once_fields and self._doc_before_save:
|
||||
|
|
@ -517,7 +518,7 @@ class Document(BaseDocument):
|
|||
return False
|
||||
|
||||
def is_child_table_same(self, fieldname):
|
||||
'''Validate child table is same as original table before saving'''
|
||||
"""Validate child table is same as original table before saving"""
|
||||
value = self.get(fieldname)
|
||||
original_value = self._doc_before_save.get(fieldname)
|
||||
same = True
|
||||
|
|
@ -542,7 +543,7 @@ class Document(BaseDocument):
|
|||
return same
|
||||
|
||||
def apply_fieldlevel_read_permissions(self):
|
||||
'''Remove values the user is not allowed to read (called when loading in desk)'''
|
||||
"""Remove values the user is not allowed to read (called when loading in desk)"""
|
||||
has_higher_permlevel = False
|
||||
for p in self.get_permissions():
|
||||
if p.permlevel > 0:
|
||||
|
|
@ -638,8 +639,8 @@ class Document(BaseDocument):
|
|||
self._action = "save"
|
||||
if not self.get('__islocal'):
|
||||
if self.meta.issingle:
|
||||
modified = frappe.db.sql('''select value from tabSingles
|
||||
where doctype=%s and field='modified' for update''', self.doctype)
|
||||
modified = frappe.db.sql("""select value from tabSingles
|
||||
where doctype=%s and field='modified' for update""", self.doctype)
|
||||
modified = modified and modified[0][0]
|
||||
if modified and modified != cstr(self._original_modified):
|
||||
conflict = True
|
||||
|
|
@ -806,7 +807,7 @@ class Document(BaseDocument):
|
|||
return self.run_method(method, *args, **kwargs)
|
||||
|
||||
def run_notifications(self, method):
|
||||
'''Run notifications for this method'''
|
||||
"""Run notifications for this method"""
|
||||
if frappe.flags.in_import or frappe.flags.in_patch or frappe.flags.in_install:
|
||||
return
|
||||
|
||||
|
|
@ -909,7 +910,7 @@ class Document(BaseDocument):
|
|||
self.set_title_field()
|
||||
|
||||
def load_doc_before_save(self):
|
||||
'''Save load document from db before saving'''
|
||||
"""Save load document from db before saving"""
|
||||
self._doc_before_save = None
|
||||
if not self.is_new():
|
||||
try:
|
||||
|
|
@ -955,7 +956,7 @@ class Document(BaseDocument):
|
|||
# make event update log for doctypes having event consumers
|
||||
if not frappe.flags.in_install and not frappe.flags.in_migrate and check_doctype_has_consumers(self.doctype):
|
||||
if self.flags.update_log_for_doc_creation:
|
||||
make_event_update_log(self, update_type = 'Create')
|
||||
make_event_update_log(self, update_type='Create')
|
||||
self.flags.update_log_for_doc_creation = False
|
||||
else:
|
||||
from frappe.event_streaming.doctype.event_update_log.event_update_log import get_update
|
||||
|
|
@ -963,7 +964,7 @@ class Document(BaseDocument):
|
|||
if diff:
|
||||
doc = self
|
||||
doc.diff = diff
|
||||
make_event_update_log(doc, update_type = 'Update')
|
||||
make_event_update_log(doc, update_type='Update')
|
||||
|
||||
self.latest = None
|
||||
|
||||
|
|
@ -971,7 +972,7 @@ class Document(BaseDocument):
|
|||
frappe.clear_document_cache(self.doctype, self.name)
|
||||
|
||||
def reset_seen(self):
|
||||
'''Clear _seen property and set current user as seen'''
|
||||
"""Clear _seen property and set current user as seen"""
|
||||
if getattr(self.meta, 'track_seen', False):
|
||||
self._seen = json.dumps([frappe.session.user])
|
||||
|
||||
|
|
@ -990,7 +991,7 @@ class Document(BaseDocument):
|
|||
frappe.publish_realtime("list_update", data, after_commit=True)
|
||||
|
||||
def db_set(self, fieldname, value=None, update_modified=True, notify=False, commit=False):
|
||||
'''Set a value in the document object, update the timestamp and update the database.
|
||||
"""Set a value in the document object, update the timestamp and update the database.
|
||||
|
||||
WARNING: This method does not trigger controller validations and should
|
||||
be used very carefully.
|
||||
|
|
@ -1000,7 +1001,7 @@ class Document(BaseDocument):
|
|||
:param update_modified: default True. updates the `modified` and `modified_by` properties
|
||||
:param notify: default False. run doc.notify_updated() to send updates via socketio
|
||||
:param commit: default False. run frappe.db.commit()
|
||||
'''
|
||||
"""
|
||||
if isinstance(fieldname, dict):
|
||||
self.update(fieldname)
|
||||
else:
|
||||
|
|
@ -1029,7 +1030,7 @@ class Document(BaseDocument):
|
|||
frappe.db.commit()
|
||||
|
||||
def db_get(self, fieldname):
|
||||
'''get database value for this fieldname'''
|
||||
"""get database value for this fieldname"""
|
||||
return frappe.db.get_value(self.doctype, self.name, fieldname)
|
||||
|
||||
def check_no_back_links_exist(self):
|
||||
|
|
@ -1040,7 +1041,7 @@ class Document(BaseDocument):
|
|||
check_if_doc_is_dynamically_linked(self, method="Cancel")
|
||||
|
||||
def save_version(self):
|
||||
'''Save version info'''
|
||||
"""Save version info"""
|
||||
version = frappe.new_doc('Version')
|
||||
if version.set_diff(self._doc_before_save, self):
|
||||
version.insert(ignore_permissions=True)
|
||||
|
|
@ -1161,7 +1162,7 @@ class Document(BaseDocument):
|
|||
return out
|
||||
|
||||
def add_seen(self, user=None):
|
||||
'''add the given/current user to list of users who have seen this document (_seen)'''
|
||||
"""add the given/current user to list of users who have seen this document (_seen)"""
|
||||
if not user:
|
||||
user = frappe.session.user
|
||||
|
||||
|
|
@ -1175,7 +1176,7 @@ class Document(BaseDocument):
|
|||
frappe.local.flags.commit = True
|
||||
|
||||
def add_viewed(self, user=None):
|
||||
'''add log to communication when a user views a document'''
|
||||
"""add log to communication when a user views a document"""
|
||||
if not user:
|
||||
user = frappe.session.user
|
||||
|
||||
|
|
@ -1211,8 +1212,8 @@ class Document(BaseDocument):
|
|||
return self.get('__onload')[key]
|
||||
|
||||
def queue_action(self, action, **kwargs):
|
||||
'''Run an action in background. If the action has an inner function,
|
||||
like _submit for submit, it will call that instead'''
|
||||
"""Run an action in background. If the action has an inner function,
|
||||
like _submit for submit, it will call that instead"""
|
||||
# call _submit instead of submit, so you can override submit to call
|
||||
# run_delayed based on some action
|
||||
# See: Stock Reconciliation
|
||||
|
|
@ -1228,10 +1229,10 @@ class Document(BaseDocument):
|
|||
action=action, **kwargs)
|
||||
|
||||
def lock(self, timeout=None):
|
||||
'''Creates a lock file for the given document. If timeout is set,
|
||||
"""Creates a lock file for the given document. If timeout is set,
|
||||
it will retry every 1 second for acquiring the lock again
|
||||
|
||||
:param timeout: Timeout in seconds, default 0'''
|
||||
:param timeout: Timeout in seconds, default 0"""
|
||||
signature = self.get_signature()
|
||||
if file_lock.lock_exists(signature):
|
||||
lock_exists = True
|
||||
|
|
@ -1246,14 +1247,14 @@ class Document(BaseDocument):
|
|||
file_lock.create_lock(signature)
|
||||
|
||||
def unlock(self):
|
||||
'''Delete the lock file for this document'''
|
||||
"""Delete the lock file for this document"""
|
||||
file_lock.delete_lock(self.get_signature())
|
||||
|
||||
# validation helpers
|
||||
def validate_from_to_dates(self, from_date_field, to_date_field):
|
||||
'''
|
||||
"""
|
||||
Generic validation to verify date sequence
|
||||
'''
|
||||
"""
|
||||
if date_diff(self.get(to_date_field), self.get(from_date_field)) < 0:
|
||||
frappe.throw(_('{0} must be after {1}').format(
|
||||
frappe.bold(self.meta.get_label(to_date_field)),
|
||||
|
|
@ -1273,7 +1274,7 @@ class Document(BaseDocument):
|
|||
return users
|
||||
|
||||
def execute_action(doctype, name, action, **kwargs):
|
||||
'''Execute an action on a document (called by background worker)'''
|
||||
"""Execute an action on a document (called by background worker)"""
|
||||
doc = frappe.get_doc(doctype, name)
|
||||
doc.unlock()
|
||||
try:
|
||||
|
|
@ -1290,10 +1291,11 @@ def execute_action(doctype, name, action, **kwargs):
|
|||
doc.add_comment('Comment', _('Action Failed') + '<br><br>' + msg)
|
||||
doc.notify_update()
|
||||
|
||||
|
||||
def make_event_update_log(doc, update_type):
|
||||
'''Save update info for doctypes that have event consumers'''
|
||||
"""Save update info for doctypes that have event consumers"""
|
||||
if update_type != 'Delete':
|
||||
#diff for update type, doc for create type
|
||||
# diff for update type, doc for create type
|
||||
data = frappe.as_json(doc) if not doc.get('diff') else frappe.as_json(doc.diff)
|
||||
else:
|
||||
data = None
|
||||
|
|
@ -1304,14 +1306,16 @@ def make_event_update_log(doc, update_type):
|
|||
'docname': doc.name,
|
||||
'data': data
|
||||
})
|
||||
log_doc.insert(ignore_permissions = True)
|
||||
log_doc.insert(ignore_permissions=True)
|
||||
frappe.db.commit()
|
||||
|
||||
|
||||
def check_doctype_has_consumers(doctype):
|
||||
"""Check if doctype has event consumers for event streaming"""
|
||||
event_consumers = frappe.get_all('Event Consumer')
|
||||
for event_consumer in event_consumers:
|
||||
consumer = frappe.get_doc('Event Consumer', event_consumer.name)
|
||||
for entry in consumer.consumer_doctypes:
|
||||
if doctype == entry.ref_doctype and entry.status == 'Approved':
|
||||
return True
|
||||
return False
|
||||
return False
|
||||
|
|
|
|||
|
|
@ -119,7 +119,7 @@ def executed(patchmodule):
|
|||
# print "Patch %s already executed in %s" % (patchmodule, frappe.db.cur_db_name)
|
||||
return done
|
||||
|
||||
def block_user(block, msg = None):
|
||||
def block_user(block, msg=None):
|
||||
"""stop/start execution till patch is run"""
|
||||
frappe.local.flags.in_patch = block
|
||||
frappe.db.begin()
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue